mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-11 15:32:56 +00:00
test(pageserver): add test interface to create artificial layers (#7899)
This pull request adds necessary interfaces to deterministically create scenarios we want to test. Simplify some test cases to use this interface to make it stable + reproducible. Compaction test will be able to use this interface. Also the upcoming delete tombstone tests will use this interface to make test reproducible. ## Summary of changes * `force_create_image_layer` * `force_create_delta_layer` * `force_advance_lsn` * `create_test_timeline_with_states` * `branch_timeline_test_with_states` --------- Signed-off-by: Alex Chi Z <chi@neon.tech>
This commit is contained in:
@@ -1393,6 +1393,36 @@ impl Tenant {
|
||||
Ok(tl)
|
||||
}
|
||||
|
||||
/// Helper for unit tests to create a timeline with some pre-loaded states.
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn create_test_timeline_with_layers(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
delta_layer_desc: Vec<Vec<(pageserver_api::key::Key, Lsn, crate::repository::Value)>>,
|
||||
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
|
||||
end_lsn: Lsn,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let tline = self
|
||||
.create_test_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)
|
||||
.await?;
|
||||
tline.force_advance_lsn(end_lsn);
|
||||
for deltas in delta_layer_desc {
|
||||
tline
|
||||
.force_create_delta_layer(deltas, Some(initdb_lsn), ctx)
|
||||
.await?;
|
||||
}
|
||||
for (lsn, images) in image_layer_desc {
|
||||
tline
|
||||
.force_create_image_layer(lsn, images, Some(initdb_lsn), ctx)
|
||||
.await?;
|
||||
}
|
||||
Ok(tline)
|
||||
}
|
||||
|
||||
/// Create a new timeline.
|
||||
///
|
||||
/// Returns the new timeline ID and reference to its Timeline object.
|
||||
@@ -2992,17 +3022,53 @@ impl Tenant {
|
||||
&self,
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
ancestor_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<Arc<Timeline>, CreateTimelineError> {
|
||||
let create_guard = self.create_timeline_create_guard(dst_id).unwrap();
|
||||
let tl = self
|
||||
.branch_timeline_impl(src_timeline, dst_id, start_lsn, create_guard, ctx)
|
||||
.branch_timeline_impl(src_timeline, dst_id, ancestor_lsn, create_guard, ctx)
|
||||
.await?;
|
||||
tl.set_state(TimelineState::Active);
|
||||
Ok(tl)
|
||||
}
|
||||
|
||||
/// Helper for unit tests to branch a timeline with some pre-loaded states.
|
||||
#[cfg(test)]
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn branch_timeline_test_with_layers(
|
||||
&self,
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
ancestor_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
delta_layer_desc: Vec<Vec<(pageserver_api::key::Key, Lsn, crate::repository::Value)>>,
|
||||
image_layer_desc: Vec<(Lsn, Vec<(pageserver_api::key::Key, bytes::Bytes)>)>,
|
||||
end_lsn: Lsn,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let tline = self
|
||||
.branch_timeline_test(src_timeline, dst_id, ancestor_lsn, ctx)
|
||||
.await?;
|
||||
let ancestor_lsn = if let Some(ancestor_lsn) = ancestor_lsn {
|
||||
ancestor_lsn
|
||||
} else {
|
||||
tline.get_last_record_lsn()
|
||||
};
|
||||
assert!(end_lsn >= ancestor_lsn);
|
||||
tline.force_advance_lsn(end_lsn);
|
||||
for deltas in delta_layer_desc {
|
||||
tline
|
||||
.force_create_delta_layer(deltas, Some(ancestor_lsn), ctx)
|
||||
.await?;
|
||||
}
|
||||
for (lsn, images) in image_layer_desc {
|
||||
tline
|
||||
.force_create_image_layer(lsn, images, Some(ancestor_lsn), ctx)
|
||||
.await?;
|
||||
}
|
||||
Ok(tline)
|
||||
}
|
||||
|
||||
/// Branch an existing timeline.
|
||||
///
|
||||
/// The caller is responsible for activating the returned timeline.
|
||||
@@ -6206,75 +6272,36 @@ mod tests {
|
||||
async fn test_vectored_missing_data_key_reads() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_vectored_missing_data_key_reads")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
|
||||
let base_key_child = Key::from_hex("000000000033333333444444445500000001").unwrap();
|
||||
let base_key_nonexist = Key::from_hex("000000000033333333444444445500000002").unwrap();
|
||||
|
||||
let mut lsn = Lsn(0x20);
|
||||
|
||||
{
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(base_key, lsn, &Value::Image(test_img("data key 1")), &ctx)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?; // this will create a image layer
|
||||
}
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // delta layers
|
||||
vec![(Lsn(0x20), vec![(base_key, test_img("data key 1"))])], // image layers
|
||||
Lsn(0x20), // it's fine to not advance LSN to 0x30 while using 0x30 to get below because `get_vectored_impl` does not wait for LSN
|
||||
)
|
||||
.await?;
|
||||
|
||||
let child = tenant
|
||||
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(lsn), &ctx)
|
||||
.branch_timeline_test_with_layers(
|
||||
&tline,
|
||||
NEW_TIMELINE_ID,
|
||||
Some(Lsn(0x20)),
|
||||
&ctx,
|
||||
Vec::new(), // delta layers
|
||||
vec![(Lsn(0x30), vec![(base_key_child, test_img("data key 2"))])], // image layers
|
||||
Lsn(0x30),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
lsn.0 += 0x10;
|
||||
|
||||
{
|
||||
let mut writer = child.writer().await;
|
||||
writer
|
||||
.put(
|
||||
base_key_child,
|
||||
lsn,
|
||||
&Value::Image(test_img("data key 2")),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
|
||||
child.freeze_and_flush().await?; // this will create a delta
|
||||
|
||||
{
|
||||
// update the partitioning to include the test key space, otherwise they
|
||||
// will be dropped by image layer creation
|
||||
let mut guard = child.partitioning.lock().await;
|
||||
let ((partitioning, _), partition_lsn) = &mut *guard;
|
||||
partitioning
|
||||
.parts
|
||||
.push(KeySpace::single(base_key..base_key_nonexist)); // exclude the nonexist key
|
||||
*partition_lsn = lsn;
|
||||
}
|
||||
|
||||
child
|
||||
.compact(
|
||||
&cancel,
|
||||
{
|
||||
let mut set = EnumSet::empty();
|
||||
set.insert(CompactFlags::ForceImageLayerCreation);
|
||||
set
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?; // force create an image layer for the keys, TODO: check if the image layer is created
|
||||
}
|
||||
|
||||
async fn get_vectored_impl_wrapper(
|
||||
tline: &Arc<Timeline>,
|
||||
key: Key,
|
||||
@@ -6296,6 +6323,8 @@ mod tests {
|
||||
}))
|
||||
}
|
||||
|
||||
let lsn = Lsn(0x30);
|
||||
|
||||
// test vectored get on parent timeline
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&tline, base_key, lsn, &ctx).await?,
|
||||
@@ -6333,94 +6362,42 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_vectored_missing_metadata_key_reads() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_vectored_missing_metadata_key_reads")?;
|
||||
let harness = TenantHarness::create("test_vectored_missing_data_key_reads")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let base_key = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
let base_key_child = Key::from_hex("620000000033333333444444445500000001").unwrap();
|
||||
let base_key_nonexist = Key::from_hex("620000000033333333444444445500000002").unwrap();
|
||||
assert_eq!(base_key.field1, AUX_KEY_PREFIX); // in case someone accidentally changed the prefix...
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
Vec::new(), // delta layers
|
||||
vec![(Lsn(0x20), vec![(base_key, test_img("metadata key 1"))])], // image layers
|
||||
Lsn(0x20), // it's fine to not advance LSN to 0x30 while using 0x30 to get below because `get_vectored_impl` does not wait for LSN
|
||||
)
|
||||
.await?;
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
let mut base_key = Key::from_hex("000000000033333333444444445500000000").unwrap();
|
||||
let mut base_key_child = Key::from_hex("000000000033333333444444445500000001").unwrap();
|
||||
let mut base_key_nonexist = Key::from_hex("000000000033333333444444445500000002").unwrap();
|
||||
base_key.field1 = AUX_KEY_PREFIX;
|
||||
base_key_child.field1 = AUX_KEY_PREFIX;
|
||||
base_key_nonexist.field1 = AUX_KEY_PREFIX;
|
||||
|
||||
let mut lsn = Lsn(0x20);
|
||||
|
||||
{
|
||||
let mut writer = tline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
base_key,
|
||||
lsn,
|
||||
&Value::Image(test_img("metadata key 1")),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
|
||||
tline.freeze_and_flush().await?; // this will create an image layer
|
||||
|
||||
tline
|
||||
.compact(
|
||||
&cancel,
|
||||
{
|
||||
let mut set = EnumSet::empty();
|
||||
set.insert(CompactFlags::ForceImageLayerCreation);
|
||||
set.insert(CompactFlags::ForceRepartition);
|
||||
set
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?; // force create an image layer for metadata keys
|
||||
tenant
|
||||
.gc_iteration(Some(tline.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let child = tenant
|
||||
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(lsn), &ctx)
|
||||
.branch_timeline_test_with_layers(
|
||||
&tline,
|
||||
NEW_TIMELINE_ID,
|
||||
Some(Lsn(0x20)),
|
||||
&ctx,
|
||||
Vec::new(), // delta layers
|
||||
vec![(
|
||||
Lsn(0x30),
|
||||
vec![(base_key_child, test_img("metadata key 2"))],
|
||||
)], // image layers
|
||||
Lsn(0x30),
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
lsn.0 += 0x10;
|
||||
|
||||
{
|
||||
let mut writer = child.writer().await;
|
||||
writer
|
||||
.put(
|
||||
base_key_child,
|
||||
lsn,
|
||||
&Value::Image(test_img("metadata key 2")),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
|
||||
child.freeze_and_flush().await?;
|
||||
|
||||
child
|
||||
.compact(
|
||||
&cancel,
|
||||
{
|
||||
let mut set = EnumSet::empty();
|
||||
set.insert(CompactFlags::ForceImageLayerCreation);
|
||||
set.insert(CompactFlags::ForceRepartition);
|
||||
set
|
||||
},
|
||||
&ctx,
|
||||
)
|
||||
.await?; // force create an image layer for metadata keys
|
||||
tenant
|
||||
.gc_iteration(Some(child.timeline_id), 0, Duration::ZERO, &cancel, &ctx)
|
||||
.await?;
|
||||
}
|
||||
|
||||
async fn get_vectored_impl_wrapper(
|
||||
tline: &Arc<Timeline>,
|
||||
key: Key,
|
||||
@@ -6442,6 +6419,8 @@ mod tests {
|
||||
}))
|
||||
}
|
||||
|
||||
let lsn = Lsn(0x30);
|
||||
|
||||
// test vectored get on parent timeline
|
||||
assert_eq!(
|
||||
get_vectored_impl_wrapper(&tline, base_key, lsn, &ctx).await?,
|
||||
|
||||
@@ -5371,6 +5371,102 @@ impl Timeline {
|
||||
shard_count: self.tenant_shard_id.shard_count,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(super) fn force_advance_lsn(self: &Arc<Timeline>, new_lsn: Lsn) {
|
||||
self.last_record_lsn.advance(new_lsn);
|
||||
}
|
||||
|
||||
/// Force create an image layer and place it into the layer map.
|
||||
///
|
||||
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]
|
||||
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are placed into the layer map in one run.
|
||||
#[cfg(test)]
|
||||
pub(super) async fn force_create_image_layer(
|
||||
self: &Arc<Timeline>,
|
||||
lsn: Lsn,
|
||||
mut images: Vec<(Key, Bytes)>,
|
||||
check_start_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
assert!(
|
||||
lsn <= last_record_lsn,
|
||||
"advance last record lsn before inserting a layer, lsn={lsn}, last_record_lsn={last_record_lsn}"
|
||||
);
|
||||
if let Some(check_start_lsn) = check_start_lsn {
|
||||
assert!(lsn >= check_start_lsn);
|
||||
}
|
||||
images.sort_unstable_by(|(ka, _), (kb, _)| ka.cmp(kb));
|
||||
let min_key = *images.first().map(|(k, _)| k).unwrap();
|
||||
let max_key = images.last().map(|(k, _)| k).unwrap().next();
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
&(min_key..max_key),
|
||||
lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
for (key, img) in images {
|
||||
image_layer_writer.put_image(key, img, ctx).await?;
|
||||
}
|
||||
let image_layer = image_layer_writer.finish(self, ctx).await?;
|
||||
|
||||
{
|
||||
let mut guard = self.layers.write().await;
|
||||
guard.force_insert_layer(image_layer);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Force create a delta layer and place it into the layer map.
|
||||
///
|
||||
/// DO NOT use this function directly. Use [`Tenant::branch_timeline_test_with_layers`]
|
||||
/// or [`Tenant::create_test_timeline_with_layers`] to ensure all these layers are placed into the layer map in one run.
|
||||
#[cfg(test)]
|
||||
pub(super) async fn force_create_delta_layer(
|
||||
self: &Arc<Timeline>,
|
||||
mut deltas: Vec<(Key, Lsn, Value)>,
|
||||
check_start_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
deltas.sort_unstable_by(|(ka, la, _), (kb, lb, _)| (ka, la).cmp(&(kb, lb)));
|
||||
let min_key = *deltas.first().map(|(k, _, _)| k).unwrap();
|
||||
let max_key = deltas.last().map(|(k, _, _)| k).unwrap().next();
|
||||
let min_lsn = *deltas.iter().map(|(_, lsn, _)| lsn).min().unwrap();
|
||||
let max_lsn = Lsn(deltas.iter().map(|(_, lsn, _)| lsn).max().unwrap().0 + 1);
|
||||
assert!(
|
||||
max_lsn <= last_record_lsn,
|
||||
"advance last record lsn before inserting a layer, max_lsn={max_lsn}, last_record_lsn={last_record_lsn}"
|
||||
);
|
||||
if let Some(check_start_lsn) = check_start_lsn {
|
||||
assert!(min_lsn >= check_start_lsn);
|
||||
}
|
||||
let mut delta_layer_writer = DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
min_key,
|
||||
min_lsn..max_lsn,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
for (key, lsn, val) in deltas {
|
||||
delta_layer_writer.put_value(key, lsn, val, ctx).await?;
|
||||
}
|
||||
let delta_layer = delta_layer_writer.finish(max_key, self, ctx).await?;
|
||||
|
||||
{
|
||||
let mut guard = self.layers.write().await;
|
||||
guard.force_insert_layer(delta_layer);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
type TraversalPathItem = (ValueReconstructResult, Lsn, TraversalId);
|
||||
|
||||
@@ -255,6 +255,13 @@ impl LayerManager {
|
||||
updates.flush()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn force_insert_layer(&mut self, layer: ResidentLayer) {
|
||||
let mut updates = self.layer_map.batch_update();
|
||||
Self::insert_historic_layer(layer.as_ref().clone(), &mut updates, &mut self.layer_fmgr);
|
||||
updates.flush()
|
||||
}
|
||||
|
||||
/// Helper function to insert a layer into the layer map and file manager.
|
||||
fn insert_historic_layer(
|
||||
layer: Layer,
|
||||
|
||||
Reference in New Issue
Block a user