mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 06:52:55 +00:00
refactor responsibility for tenant/timeline activation (#4317)
(This is prep work to make `Timeline::activate()` infallible.) The current possibility for failure in `Timeline::activate()` is the broker client's presence / absence. It should be an assert, but we're careful with these. So, I'm planning to pass in the broker client to activate(), thereby eliminating the possiblity of its absence. In the unit tests, we don't have a broker client. So, I thought I'd be in trouble because the unit tests also called `activate()` before this PR. However, closer inspection reveals a long-standing FIXME about this, which is addressed by this patch. It turns out that the unit tests don't actually need the background loops to be running. They just need the state value to be `Active`. So, for the tests, we just set it to that value but don't spawn the background loops. We'll need to revisit this if we ever do more Rust unit tests in the future. But right now, this refactoring improves the code, so, let's revisit when we get there. Patch series: - #4316 - #4317 - #4318 - #4319
This commit is contained in:
committed by
GitHub
parent
df52587bef
commit
afc48e2cd9
@@ -1600,9 +1600,7 @@ pub fn create_test_timeline(
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<std::sync::Arc<Timeline>> {
|
||||
let tline = tenant
|
||||
.create_empty_timeline(timeline_id, Lsn(8), pg_version, ctx)?
|
||||
.initialize(ctx)?;
|
||||
let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?;
|
||||
let mut m = tline.begin_modification(Lsn(8));
|
||||
m.init_empty()?;
|
||||
m.commit()?;
|
||||
|
||||
@@ -184,24 +184,14 @@ impl UninitializedTimeline<'_> {
|
||||
/// Ensures timeline data is valid, loads it into pageserver's memory and removes
|
||||
/// uninit mark file on success.
|
||||
///
|
||||
/// The new timeline is initialized in Active state, and its background jobs are
|
||||
/// started
|
||||
pub fn initialize(self, ctx: &RequestContext) -> anyhow::Result<Arc<Timeline>> {
|
||||
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
|
||||
self.initialize_with_lock(ctx, &mut timelines, true, true)
|
||||
}
|
||||
|
||||
/// Like `initialize`, but the caller is already holding lock on Tenant::timelines.
|
||||
/// If `launch_wal_receiver` is false, the WAL receiver not launched, even though
|
||||
/// timeline is initialized in Active state. This is used during tenant load and
|
||||
/// attach, where the WAL receivers are launched only after all the timelines have
|
||||
/// been initialized.
|
||||
/// This function launches the flush loop if not already done.
|
||||
///
|
||||
/// The caller is responsible for activating the timeline (function `.activate()`).
|
||||
fn initialize_with_lock(
|
||||
mut self,
|
||||
ctx: &RequestContext,
|
||||
_ctx: &RequestContext,
|
||||
timelines: &mut HashMap<TimelineId, Arc<Timeline>>,
|
||||
load_layer_map: bool,
|
||||
activate: bool,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let timeline_id = self.timeline_id;
|
||||
let tenant_id = self.owning_tenant.tenant_id;
|
||||
@@ -237,12 +227,6 @@ impl UninitializedTimeline<'_> {
|
||||
v.insert(Arc::clone(&new_timeline));
|
||||
|
||||
new_timeline.maybe_spawn_flush_loop();
|
||||
|
||||
if activate {
|
||||
new_timeline
|
||||
.activate(ctx)
|
||||
.context("initializing timeline activation")?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -279,7 +263,9 @@ impl UninitializedTimeline<'_> {
|
||||
// Initialize without loading the layer map. We started with an empty layer map, and already
|
||||
// updated it for the layers that we created during the import.
|
||||
let mut timelines = self.owning_tenant.timelines.lock().unwrap();
|
||||
self.initialize_with_lock(ctx, &mut timelines, false, true)
|
||||
let tl = self.initialize_with_lock(ctx, &mut timelines, false)?;
|
||||
tl.activate(ctx)?;
|
||||
Ok(tl)
|
||||
}
|
||||
|
||||
fn raw_timeline(&self) -> anyhow::Result<&Arc<Timeline>> {
|
||||
@@ -519,7 +505,7 @@ impl Tenant {
|
||||
// Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote
|
||||
// But we shouldnt start walreceiver before we have all the data locally, because working walreceiver
|
||||
// will ingest data which may require looking at the layers which are not yet available locally
|
||||
match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true, false) {
|
||||
match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true) {
|
||||
Ok(new_timeline) => new_timeline,
|
||||
Err(e) => {
|
||||
error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}");
|
||||
@@ -628,7 +614,12 @@ impl Tenant {
|
||||
"attach tenant",
|
||||
false,
|
||||
async move {
|
||||
match tenant_clone.attach(ctx).await {
|
||||
let doit = async {
|
||||
tenant_clone.attach(&ctx).await?;
|
||||
tenant_clone.activate(&ctx)?;
|
||||
anyhow::Ok(())
|
||||
};
|
||||
match doit.await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
tenant_clone.set_broken(e.to_string());
|
||||
@@ -636,7 +627,12 @@ impl Tenant {
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
}
|
||||
.instrument({
|
||||
let span = tracing::info_span!(parent: None, "attach", tenant_id=%tenant_id);
|
||||
span.follows_from(Span::current());
|
||||
span
|
||||
}),
|
||||
);
|
||||
Ok(tenant)
|
||||
}
|
||||
@@ -644,8 +640,9 @@ impl Tenant {
|
||||
///
|
||||
/// Background task that downloads all data for a tenant and brings it to Active state.
|
||||
///
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_id))]
|
||||
async fn attach(self: &Arc<Tenant>, ctx: RequestContext) -> anyhow::Result<()> {
|
||||
async fn attach(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id);
|
||||
if !tokio::fs::try_exists(&marker_file)
|
||||
.await
|
||||
@@ -735,20 +732,14 @@ impl Tenant {
|
||||
.expect("just put it in above");
|
||||
|
||||
// TODO again handle early failure
|
||||
self.load_remote_timeline(
|
||||
timeline_id,
|
||||
index_part,
|
||||
remote_metadata,
|
||||
remote_client,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to load remote timeline {} for tenant {}",
|
||||
timeline_id, self.tenant_id
|
||||
)
|
||||
})?;
|
||||
self.load_remote_timeline(timeline_id, index_part, remote_metadata, remote_client, ctx)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"failed to load remote timeline {} for tenant {}",
|
||||
timeline_id, self.tenant_id
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
std::fs::remove_file(&marker_file)
|
||||
@@ -758,10 +749,6 @@ impl Tenant {
|
||||
|
||||
utils::failpoint_sleep_millis_async!("attach-before-activate");
|
||||
|
||||
// Start background operations and open the tenant for business.
|
||||
// The loops will shut themselves down when they notice that the tenant is inactive.
|
||||
self.activate(&ctx)?;
|
||||
|
||||
info!("Done");
|
||||
|
||||
Ok(())
|
||||
@@ -901,7 +888,12 @@ impl Tenant {
|
||||
"initial tenant load",
|
||||
false,
|
||||
async move {
|
||||
match tenant_clone.load(&ctx).await {
|
||||
let doit = async {
|
||||
tenant_clone.load(&ctx).await?;
|
||||
tenant_clone.activate(&ctx)?;
|
||||
anyhow::Ok(())
|
||||
};
|
||||
match doit.await {
|
||||
Ok(()) => {}
|
||||
Err(err) => {
|
||||
tenant_clone.set_broken(err.to_string());
|
||||
@@ -910,7 +902,12 @@ impl Tenant {
|
||||
}
|
||||
info!("initial load for tenant {tenant_id} finished!");
|
||||
Ok(())
|
||||
},
|
||||
}
|
||||
.instrument({
|
||||
let span = tracing::info_span!(parent: None, "load", tenant_id=%tenant_id);
|
||||
span.follows_from(Span::current());
|
||||
span
|
||||
}),
|
||||
);
|
||||
|
||||
info!("spawned load into background");
|
||||
@@ -922,8 +919,9 @@ impl Tenant {
|
||||
/// Background task to load in-memory data structures for this tenant, from
|
||||
/// files on disk. Used at pageserver startup.
|
||||
///
|
||||
#[instrument(skip(self, ctx), fields(tenant_id=%self.tenant_id))]
|
||||
async fn load(self: &Arc<Tenant>, ctx: &RequestContext) -> anyhow::Result<()> {
|
||||
debug_assert_current_span_has_tenant_id();
|
||||
|
||||
info!("loading tenant task");
|
||||
|
||||
utils::failpoint_sleep_millis_async!("before-loading-tenant");
|
||||
@@ -1039,10 +1037,6 @@ impl Tenant {
|
||||
.with_context(|| format!("load local timeline {timeline_id}"))?;
|
||||
}
|
||||
|
||||
// Start background operations and open the tenant for business.
|
||||
// The loops will shut themselves down when they notice that the tenant is inactive.
|
||||
self.activate(ctx)?;
|
||||
|
||||
info!("Done");
|
||||
|
||||
Ok(())
|
||||
@@ -1206,6 +1200,27 @@ impl Tenant {
|
||||
)
|
||||
}
|
||||
|
||||
/// Helper for unit tests to create an emtpy timeline.
|
||||
///
|
||||
/// The timeline is has state value `Active` but its background loops are not running.
|
||||
// This makes the various functions which anyhow::ensure! for Active state work in tests.
|
||||
// Our current tests don't need the background loops.
|
||||
#[cfg(test)]
|
||||
pub fn create_test_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
initdb_lsn: Lsn,
|
||||
pg_version: u32,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?;
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, true)?;
|
||||
// The non-test code would call tl.activate() here.
|
||||
tl.set_state(TimelineState::Active);
|
||||
Ok(tl)
|
||||
}
|
||||
|
||||
/// Create a new timeline.
|
||||
///
|
||||
/// Returns the new timeline ID and reference to its Timeline object.
|
||||
@@ -1285,6 +1300,8 @@ impl Tenant {
|
||||
}
|
||||
};
|
||||
|
||||
loaded_timeline.activate(ctx).context("activate timeline")?;
|
||||
|
||||
if let Some(remote_client) = loaded_timeline.remote_client.as_ref() {
|
||||
// Wait for the upload of the 'index_part.json` file to finish, so that when we return
|
||||
// Ok, the timeline is durable in remote storage.
|
||||
@@ -2278,13 +2295,45 @@ impl Tenant {
|
||||
Ok(gc_timelines)
|
||||
}
|
||||
|
||||
/// Branch an existing timeline
|
||||
/// A substitute for `branch_timeline` for use in unit tests.
|
||||
/// The returned timeline will have state value `Active` to make various `anyhow::ensure!()`
|
||||
/// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the
|
||||
/// timeline background tasks are launched, except the flush loop.
|
||||
#[cfg(test)]
|
||||
async fn branch_timeline_test(
|
||||
&self,
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let tl = self
|
||||
.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
|
||||
.await?;
|
||||
tl.set_state(TimelineState::Active);
|
||||
Ok(tl)
|
||||
}
|
||||
|
||||
/// Branch an existing timeline.
|
||||
///
|
||||
/// The caller is responsible for activating the returned timeline.
|
||||
async fn branch_timeline(
|
||||
&self,
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn branch_timeline_impl(
|
||||
&self,
|
||||
src_timeline: &Arc<Timeline>,
|
||||
dst_id: TimelineId,
|
||||
start_lsn: Option<Lsn>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Arc<Timeline>> {
|
||||
let src_id = src_timeline.timeline_id;
|
||||
|
||||
@@ -2378,7 +2427,7 @@ impl Tenant {
|
||||
false,
|
||||
Some(Arc::clone(src_timeline)),
|
||||
)?
|
||||
.initialize_with_lock(ctx, &mut timelines, true, true)?
|
||||
.initialize_with_lock(ctx, &mut timelines, true)?
|
||||
};
|
||||
|
||||
// Root timeline gets its layers during creation and uploads them along with the metadata.
|
||||
@@ -2399,6 +2448,8 @@ impl Tenant {
|
||||
|
||||
/// - run initdb to init temporary instance and get bootstrap data
|
||||
/// - after initialization complete, remove the temp dir.
|
||||
///
|
||||
/// The caller is responsible for activating the returned timeline.
|
||||
async fn bootstrap_timeline(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
@@ -2493,7 +2544,7 @@ impl Tenant {
|
||||
// map above, when we imported the datadir.
|
||||
let timeline = {
|
||||
let mut timelines = self.timelines.lock().unwrap();
|
||||
raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)?
|
||||
raw_timeline.initialize_with_lock(ctx, &mut timelines, false)?
|
||||
};
|
||||
|
||||
info!(
|
||||
@@ -3134,8 +3185,14 @@ pub mod harness {
|
||||
let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?;
|
||||
timelines_to_load.insert(timeline_id, timeline_metadata);
|
||||
}
|
||||
// FIXME starts background jobs
|
||||
tenant.load(ctx).await?;
|
||||
tenant
|
||||
.load(ctx)
|
||||
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
|
||||
.await?;
|
||||
tenant.state.send_replace(TenantState::Active);
|
||||
for timeline in tenant.timelines.lock().unwrap().values() {
|
||||
timeline.set_state(TimelineState::Active);
|
||||
}
|
||||
Ok(tenant)
|
||||
}
|
||||
|
||||
@@ -3193,8 +3250,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_basic() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await;
|
||||
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tline.initialize(&ctx)?;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
@@ -3227,9 +3283,7 @@ mod tests {
|
||||
let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")?
|
||||
.load()
|
||||
.await;
|
||||
let timeline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = timeline.initialize(&ctx)?;
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) {
|
||||
Ok(_) => panic!("duplicate timeline creation should fail"),
|
||||
@@ -3260,8 +3314,7 @@ mod tests {
|
||||
use std::str::from_utf8;
|
||||
|
||||
let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await;
|
||||
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tline.initialize(&ctx)?;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let writer = tline.writer();
|
||||
|
||||
#[allow(non_snake_case)]
|
||||
@@ -3283,7 +3336,7 @@ mod tests {
|
||||
|
||||
// Branch the history, modify relation differently on the new timeline
|
||||
tenant
|
||||
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
|
||||
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx)
|
||||
.await?;
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
@@ -3358,8 +3411,7 @@ mod tests {
|
||||
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tline.initialize(&ctx)?;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
|
||||
@@ -3372,7 +3424,7 @@ mod tests {
|
||||
|
||||
// try to branch at lsn 25, should fail because we already garbage collected the data
|
||||
match tenant
|
||||
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
|
||||
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("branching should have failed"),
|
||||
@@ -3396,12 +3448,11 @@ mod tests {
|
||||
.load()
|
||||
.await;
|
||||
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?
|
||||
.initialize(&ctx)?;
|
||||
let tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?;
|
||||
// try to branch at lsn 0x25, should fail because initdb lsn is 0x50
|
||||
match tenant
|
||||
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
|
||||
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx)
|
||||
.await
|
||||
{
|
||||
Ok(_) => panic!("branching should have failed"),
|
||||
@@ -3447,13 +3498,11 @@ mod tests {
|
||||
TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?
|
||||
.initialize(&ctx)?;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
|
||||
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
|
||||
.await?;
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
@@ -3497,12 +3546,11 @@ mod tests {
|
||||
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tline.initialize(&ctx)?;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
|
||||
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
|
||||
.await?;
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
@@ -3521,12 +3569,11 @@ mod tests {
|
||||
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?
|
||||
.load()
|
||||
.await;
|
||||
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tline.initialize(&ctx)?;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
|
||||
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
|
||||
.await?;
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
@@ -3555,8 +3602,7 @@ mod tests {
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tline.initialize(&ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?;
|
||||
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
|
||||
}
|
||||
|
||||
@@ -3576,14 +3622,14 @@ mod tests {
|
||||
{
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let tline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tline.initialize(&ctx)?;
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
|
||||
|
||||
tenant
|
||||
.branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
|
||||
let child_tline = tenant
|
||||
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
|
||||
.await?;
|
||||
child_tline.set_state(TimelineState::Active);
|
||||
|
||||
let newtline = tenant
|
||||
.get_timeline(NEW_TIMELINE_ID, true)
|
||||
@@ -3613,9 +3659,8 @@ mod tests {
|
||||
let harness = TenantHarness::create(TEST_NAME)?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?
|
||||
.initialize(&ctx)?;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
drop(tline);
|
||||
drop(tenant);
|
||||
|
||||
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
|
||||
@@ -3652,8 +3697,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_images() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_images")?.load().await;
|
||||
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tline.initialize(&ctx)?;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let writer = tline.writer();
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
@@ -3718,8 +3762,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await;
|
||||
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tline.initialize(&ctx)?;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let mut lsn = Lsn(0x10);
|
||||
|
||||
@@ -3761,8 +3804,7 @@ mod tests {
|
||||
#[tokio::test]
|
||||
async fn test_random_updates() -> anyhow::Result<()> {
|
||||
let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
|
||||
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let tline = tline.initialize(&ctx)?;
|
||||
let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -3835,9 +3877,8 @@ mod tests {
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_branches")?
|
||||
.load()
|
||||
.await;
|
||||
let mut tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?
|
||||
.initialize(&ctx)?;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
const NUM_KEYS: usize = 1000;
|
||||
|
||||
@@ -3870,7 +3911,7 @@ mod tests {
|
||||
for _ in 0..50 {
|
||||
let new_tline_id = TimelineId::generate();
|
||||
tenant
|
||||
.branch_timeline(&tline, new_tline_id, Some(lsn), &ctx)
|
||||
.branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
|
||||
.await?;
|
||||
tline = tenant
|
||||
.get_timeline(new_tline_id, true)
|
||||
@@ -3919,9 +3960,8 @@ mod tests {
|
||||
let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")?
|
||||
.load()
|
||||
.await;
|
||||
let mut tline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?
|
||||
.initialize(&ctx)?;
|
||||
let mut tline =
|
||||
tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
const NUM_KEYS: usize = 100;
|
||||
const NUM_TLINES: usize = 50;
|
||||
@@ -3936,7 +3976,7 @@ mod tests {
|
||||
for idx in 0..NUM_TLINES {
|
||||
let new_tline_id = TimelineId::generate();
|
||||
tenant
|
||||
.branch_timeline(&tline, new_tline_id, Some(lsn), &ctx)
|
||||
.branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx)
|
||||
.await?;
|
||||
tline = tenant
|
||||
.get_timeline(new_tline_id, true)
|
||||
|
||||
@@ -1264,9 +1264,7 @@ mod tests {
|
||||
let harness = TenantHarness::create(test_name)?;
|
||||
let (tenant, ctx) = runtime.block_on(harness.load());
|
||||
// create an empty timeline directory
|
||||
let timeline =
|
||||
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
let _ = timeline.initialize(&ctx).unwrap();
|
||||
let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?;
|
||||
|
||||
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
|
||||
std::fs::create_dir_all(remote_fs_dir)?;
|
||||
|
||||
@@ -1309,9 +1309,8 @@ mod tests {
|
||||
async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState {
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
let timeline = tenant
|
||||
.create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx)
|
||||
.expect("Failed to create an empty timeline for dummy wal connection manager");
|
||||
let timeline = timeline.initialize(&ctx).unwrap();
|
||||
|
||||
ConnectionManagerState {
|
||||
id: TenantTimelineId {
|
||||
|
||||
Reference in New Issue
Block a user