create_test_timeline: do DatadirModification::init_empty

See the added comment on `create_empty_timeline`.

Rough context: https://github.com/neondatabase/neon/pull/4364#discussion_r1221995691
This commit is contained in:
Christian Schwarz
2023-06-09 13:02:30 +02:00
parent cdce04d721
commit b460f617e9
2 changed files with 54 additions and 5 deletions

View File

@@ -1268,6 +1268,18 @@ impl Tenant {
/// This is used to create the initial 'main' timeline during bootstrapping,
/// or when importing a new base backup. The caller is expected to load an
/// initial image of the datadir to the new timeline after this.
///
/// Until that happens, the on-disk state is invalid (disk_consistent_lsn=Lsn(0))
/// and the timeline will fail to load at a restart.
///
/// That's why we add an uninit mark file, and wrap it together witht the Timeline
/// in-memory object into UninitializedTimeline.
/// Once the caller is done setting up the timeline, they should call
/// `UninitializedTimeline::initialize_with_lock` to remove the uninit mark.
///
/// For tests, use `DatadirModification::init_empty` + `commit` to setup the
/// minimum amount of keys required to get a working timeline.
/// (Without it, `put` might fail due to `repartition` failing.)
pub fn create_empty_timeline(
&self,
new_timeline_id: TimelineId,
@@ -1316,7 +1328,19 @@ impl Tenant {
ctx: &RequestContext,
) -> anyhow::Result<Arc<Timeline>> {
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
// Setup minimum keys required for the timeline to be usable.
let mut modification = uninit_tl
.raw_timeline()
.expect("we just created it")
.begin_modification(initdb_lsn);
modification.init_empty().context("init_empty")?;
modification
.commit()
.context("commit init_empty modification")?;
let mut timelines = self.timelines.lock().unwrap();
// Call with `load_layers=true` to get next_open_layer set up.
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, true)?;
// The non-test code would call tl.activate() here.
tl.set_state(TimelineState::Active);
@@ -4353,6 +4377,28 @@ mod tests {
}
Ok(())
}
#[tokio::test]
async fn test_empty_test_timeline_is_usable() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_empty_test_timeline_is_usable")?
.load()
.await;
let tline =
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x20), DEFAULT_PG_VERSION, &ctx)?;
// Make sure the timeline has the minimum set of required keys for operation.
// The only operation you can do on an empty timeline is to write new data.
// Repartition is the only code on the write path that requires other keys to be present.
// Make sure it works.
let cache = tline.partitioning.lock().unwrap();
assert_eq!(cache.1, Lsn(0), "must not have repartitioned yet, otherwise the repartition call below might just use the cache");
drop(cache);
tline
.repartition(Lsn(0x20), tline.get_compaction_target_size(), &ctx)
.await?;
Ok(())
}
}
#[cfg(not(debug_assertions))]

View File

@@ -216,7 +216,7 @@ pub struct Timeline {
pub initdb_lsn: Lsn,
/// When did we last calculate the partitioning?
partitioning: Mutex<(KeyPartitioning, Lsn)>,
pub(super) partitioning: Mutex<(KeyPartitioning, Lsn)>,
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
@@ -1306,7 +1306,7 @@ impl Timeline {
.unwrap_or(self.conf.default_tenant_conf.checkpoint_timeout)
}
fn get_compaction_target_size(&self) -> u64 {
pub(super) fn get_compaction_target_size(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
.compaction_target_size
@@ -2386,8 +2386,11 @@ impl Timeline {
ValueReconstructResult::Missing => {
return Err(layer_traversal_error(
format!(
"could not find data for key {} at LSN {}, for request at LSN {}",
key, cont_lsn, request_lsn
"could not find data for key {} at LSN {}, for request at LSN {}\n{}",
key,
cont_lsn,
request_lsn,
std::backtrace::Backtrace::force_capture(),
),
traversal_path,
));
@@ -3000,7 +3003,7 @@ impl Timeline {
Ok((new_delta_filename, LayerFileMetadata::new(sz)))
}
async fn repartition(
pub(super) async fn repartition(
&self,
lsn: Lsn,
partition_size: u64,