diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 67f37ee519..186209dfcf 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1600,9 +1600,7 @@ pub fn create_test_timeline( pg_version: u32, ctx: &RequestContext, ) -> anyhow::Result> { - let tline = tenant - .create_empty_timeline(timeline_id, Lsn(8), pg_version, ctx)? - .initialize(ctx)?; + let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?; let mut m = tline.begin_modification(Lsn(8)); m.init_empty()?; m.commit()?; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8349e1993f..2b787faf69 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -184,24 +184,14 @@ impl UninitializedTimeline<'_> { /// Ensures timeline data is valid, loads it into pageserver's memory and removes /// uninit mark file on success. /// - /// The new timeline is initialized in Active state, and its background jobs are - /// started - pub fn initialize(self, ctx: &RequestContext) -> anyhow::Result> { - let mut timelines = self.owning_tenant.timelines.lock().unwrap(); - self.initialize_with_lock(ctx, &mut timelines, true, true) - } - - /// Like `initialize`, but the caller is already holding lock on Tenant::timelines. - /// If `launch_wal_receiver` is false, the WAL receiver not launched, even though - /// timeline is initialized in Active state. This is used during tenant load and - /// attach, where the WAL receivers are launched only after all the timelines have - /// been initialized. + /// This function launches the flush loop if not already done. + /// + /// The caller is responsible for activating the timeline (function `.activate()`). fn initialize_with_lock( mut self, - ctx: &RequestContext, + _ctx: &RequestContext, timelines: &mut HashMap>, load_layer_map: bool, - activate: bool, ) -> anyhow::Result> { let timeline_id = self.timeline_id; let tenant_id = self.owning_tenant.tenant_id; @@ -237,12 +227,6 @@ impl UninitializedTimeline<'_> { v.insert(Arc::clone(&new_timeline)); new_timeline.maybe_spawn_flush_loop(); - - if activate { - new_timeline - .activate(ctx) - .context("initializing timeline activation")?; - } } } @@ -279,7 +263,9 @@ impl UninitializedTimeline<'_> { // Initialize without loading the layer map. We started with an empty layer map, and already // updated it for the layers that we created during the import. let mut timelines = self.owning_tenant.timelines.lock().unwrap(); - self.initialize_with_lock(ctx, &mut timelines, false, true) + let tl = self.initialize_with_lock(ctx, &mut timelines, false)?; + tl.activate(ctx)?; + Ok(tl) } fn raw_timeline(&self) -> anyhow::Result<&Arc> { @@ -519,7 +505,7 @@ impl Tenant { // Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote // But we shouldnt start walreceiver before we have all the data locally, because working walreceiver // will ingest data which may require looking at the layers which are not yet available locally - match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true, false) { + match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true) { Ok(new_timeline) => new_timeline, Err(e) => { error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}"); @@ -628,8 +614,10 @@ impl Tenant { "attach tenant", false, async move { - match tenant_clone.attach(ctx).await { - Ok(_) => {} + match tenant_clone.attach(&ctx).await { + Ok(_) => { + tenant_clone.activate(&ctx).unwrap(); // WIP + } Err(e) => { tenant_clone.set_broken(e.to_string()); error!("error attaching tenant: {:?}", e); @@ -645,7 +633,7 @@ impl Tenant { /// Background task that downloads all data for a tenant and brings it to Active state. /// #[instrument(skip_all, fields(tenant_id=%self.tenant_id))] - async fn attach(self: &Arc, ctx: RequestContext) -> anyhow::Result<()> { + async fn attach(self: &Arc, ctx: &RequestContext) -> anyhow::Result<()> { let marker_file = self.conf.tenant_attaching_mark_file_path(&self.tenant_id); if !tokio::fs::try_exists(&marker_file) .await @@ -735,20 +723,14 @@ impl Tenant { .expect("just put it in above"); // TODO again handle early failure - self.load_remote_timeline( - timeline_id, - index_part, - remote_metadata, - remote_client, - &ctx, - ) - .await - .with_context(|| { - format!( - "failed to load remote timeline {} for tenant {}", - timeline_id, self.tenant_id - ) - })?; + self.load_remote_timeline(timeline_id, index_part, remote_metadata, remote_client, ctx) + .await + .with_context(|| { + format!( + "failed to load remote timeline {} for tenant {}", + timeline_id, self.tenant_id + ) + })?; } std::fs::remove_file(&marker_file) @@ -758,10 +740,6 @@ impl Tenant { utils::failpoint_sleep_millis_async!("attach-before-activate"); - // Start background operations and open the tenant for business. - // The loops will shut themselves down when they notice that the tenant is inactive. - self.activate(&ctx)?; - info!("Done"); Ok(()) @@ -902,7 +880,9 @@ impl Tenant { false, async move { match tenant_clone.load(&ctx).await { - Ok(()) => {} + Ok(()) => { + tenant_clone.activate(&ctx).unwrap(); // WIP + } Err(err) => { tenant_clone.set_broken(err.to_string()); error!("could not load tenant {tenant_id}: {err:?}"); @@ -1039,10 +1019,6 @@ impl Tenant { .with_context(|| format!("load local timeline {timeline_id}"))?; } - // Start background operations and open the tenant for business. - // The loops will shut themselves down when they notice that the tenant is inactive. - self.activate(ctx)?; - info!("Done"); Ok(()) @@ -1206,6 +1182,27 @@ impl Tenant { ) } + /// Helper for unit tests to create an emtpy timeline. + /// + /// The timeline is has state value `Active` but its background loops are not running. + // This makes the various functions which anyhow::ensure! for Active state work in tests. + // Our current tests don't need the background loops. + #[cfg(test)] + pub fn create_test_timeline( + &self, + new_timeline_id: TimelineId, + initdb_lsn: Lsn, + pg_version: u32, + ctx: &RequestContext, + ) -> anyhow::Result> { + let uninit_tl = self.create_empty_timeline(new_timeline_id, initdb_lsn, pg_version, ctx)?; + let mut timelines = self.timelines.lock().unwrap(); + let tl = uninit_tl.initialize_with_lock(ctx, &mut timelines, true)?; + // The non-test code would call tl.activate() here. + tl.set_state(TimelineState::Active); + Ok(tl) + } + /// Create a new timeline. /// /// Returns the new timeline ID and reference to its Timeline object. @@ -1285,6 +1282,8 @@ impl Tenant { } }; + loaded_timeline.activate(ctx).context("activate timeline")?; + if let Some(remote_client) = loaded_timeline.remote_client.as_ref() { // Wait for the upload of the 'index_part.json` file to finish, so that when we return // Ok, the timeline is durable in remote storage. @@ -2278,13 +2277,45 @@ impl Tenant { Ok(gc_timelines) } - /// Branch an existing timeline + /// A substitute for `branch_timeline` for use in unit tests. + /// The returned timeline will have state value `Active` to make various `anyhow::ensure!()` + /// calls pass, but, we do not actually call `.activate()` under the hood. So, none of the + /// timeline background tasks are launched, except the flush loop. + #[cfg(test)] + async fn branch_timeline_test( + &self, + src_timeline: &Arc, + dst_id: TimelineId, + start_lsn: Option, + ctx: &RequestContext, + ) -> anyhow::Result> { + let tl = self + .branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx) + .await?; + tl.set_state(TimelineState::Active); + Ok(tl) + } + + /// Branch an existing timeline. + /// + /// The caller is responsible for activating the returned timeline. async fn branch_timeline( &self, src_timeline: &Arc, dst_id: TimelineId, start_lsn: Option, ctx: &RequestContext, + ) -> anyhow::Result> { + self.branch_timeline_impl(src_timeline, dst_id, start_lsn, ctx) + .await + } + + async fn branch_timeline_impl( + &self, + src_timeline: &Arc, + dst_id: TimelineId, + start_lsn: Option, + ctx: &RequestContext, ) -> anyhow::Result> { let src_id = src_timeline.timeline_id; @@ -2378,7 +2409,7 @@ impl Tenant { false, Some(Arc::clone(src_timeline)), )? - .initialize_with_lock(ctx, &mut timelines, true, true)? + .initialize_with_lock(ctx, &mut timelines, true)? }; // Root timeline gets its layers during creation and uploads them along with the metadata. @@ -2399,6 +2430,8 @@ impl Tenant { /// - run initdb to init temporary instance and get bootstrap data /// - after initialization complete, remove the temp dir. + /// + /// The caller is responsible for activating the returned timeline. async fn bootstrap_timeline( &self, timeline_id: TimelineId, @@ -2493,7 +2526,7 @@ impl Tenant { // map above, when we imported the datadir. let timeline = { let mut timelines = self.timelines.lock().unwrap(); - raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)? + raw_timeline.initialize_with_lock(ctx, &mut timelines, false)? }; info!( @@ -3134,8 +3167,11 @@ pub mod harness { let timeline_metadata = load_metadata(self.conf, timeline_id, self.tenant_id)?; timelines_to_load.insert(timeline_id, timeline_metadata); } - // FIXME starts background jobs tenant.load(ctx).await?; + tenant.state.send_replace(TenantState::Active); + for timeline in tenant.timelines.lock().unwrap().values() { + timeline.set_state(TimelineState::Active); + } Ok(tenant) } @@ -3193,8 +3229,7 @@ mod tests { #[tokio::test] async fn test_basic() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await; - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let tline = tline.initialize(&ctx)?; + let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; let writer = tline.writer(); writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; @@ -3227,9 +3262,7 @@ mod tests { let (tenant, ctx) = TenantHarness::create("no_duplicate_timelines")? .load() .await; - let timeline = - tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let _ = timeline.initialize(&ctx)?; + let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) { Ok(_) => panic!("duplicate timeline creation should fail"), @@ -3260,8 +3293,7 @@ mod tests { use std::str::from_utf8; let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await; - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let tline = tline.initialize(&ctx)?; + let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; let writer = tline.writer(); #[allow(non_snake_case)] @@ -3283,7 +3315,7 @@ mod tests { // Branch the history, modify relation differently on the new timeline tenant - .branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx) + .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x30)), &ctx) .await?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) @@ -3358,8 +3390,7 @@ mod tests { TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")? .load() .await; - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let tline = tline.initialize(&ctx)?; + let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 @@ -3372,7 +3403,7 @@ mod tests { // try to branch at lsn 25, should fail because we already garbage collected the data match tenant - .branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx) + .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx) .await { Ok(_) => panic!("branching should have failed"), @@ -3396,12 +3427,11 @@ mod tests { .load() .await; - let tline = tenant - .create_empty_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)? - .initialize(&ctx)?; + let tline = + tenant.create_test_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION, &ctx)?; // try to branch at lsn 0x25, should fail because initdb lsn is 0x50 match tenant - .branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx) + .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x25)), &ctx) .await { Ok(_) => panic!("branching should have failed"), @@ -3447,13 +3477,11 @@ mod tests { TenantHarness::create("test_get_branchpoints_from_an_inactive_timeline")? .load() .await; - let tline = tenant - .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)? - .initialize(&ctx)?; + let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; tenant - .branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx) + .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx) .await?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) @@ -3497,12 +3525,11 @@ mod tests { TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")? .load() .await; - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let tline = tline.initialize(&ctx)?; + let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; tenant - .branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx) + .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx) .await?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) @@ -3521,12 +3548,11 @@ mod tests { TenantHarness::create("test_parent_keeps_data_forever_after_branching")? .load() .await; - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let tline = tline.initialize(&ctx)?; + let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; tenant - .branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx) + .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx) .await?; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) @@ -3555,8 +3581,7 @@ mod tests { { let (tenant, ctx) = harness.load().await; let tline = - tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?; - let tline = tline.initialize(&ctx)?; + tenant.create_test_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION, &ctx)?; make_some_layers(tline.as_ref(), Lsn(0x8000)).await?; } @@ -3576,14 +3601,14 @@ mod tests { { let (tenant, ctx) = harness.load().await; let tline = - tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let tline = tline.initialize(&ctx)?; + tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; make_some_layers(tline.as_ref(), Lsn(0x20)).await?; - tenant - .branch_timeline(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx) + let child_tline = tenant + .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx) .await?; + child_tline.set_state(TimelineState::Active); let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) @@ -3613,9 +3638,8 @@ mod tests { let harness = TenantHarness::create(TEST_NAME)?; let (tenant, ctx) = harness.load().await; - tenant - .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)? - .initialize(&ctx)?; + let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; + drop(tline); drop(tenant); let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME); @@ -3652,8 +3676,7 @@ mod tests { #[tokio::test] async fn test_images() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_images")?.load().await; - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let tline = tline.initialize(&ctx)?; + let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; let writer = tline.writer(); writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; @@ -3718,8 +3741,7 @@ mod tests { #[tokio::test] async fn test_bulk_insert() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_bulk_insert")?.load().await; - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let tline = tline.initialize(&ctx)?; + let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; let mut lsn = Lsn(0x10); @@ -3761,8 +3783,7 @@ mod tests { #[tokio::test] async fn test_random_updates() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await; - let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let tline = tline.initialize(&ctx)?; + let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; const NUM_KEYS: usize = 1000; @@ -3835,9 +3856,8 @@ mod tests { let (tenant, ctx) = TenantHarness::create("test_traverse_branches")? .load() .await; - let mut tline = tenant - .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)? - .initialize(&ctx)?; + let mut tline = + tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; const NUM_KEYS: usize = 1000; @@ -3870,7 +3890,7 @@ mod tests { for _ in 0..50 { let new_tline_id = TimelineId::generate(); tenant - .branch_timeline(&tline, new_tline_id, Some(lsn), &ctx) + .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx) .await?; tline = tenant .get_timeline(new_tline_id, true) @@ -3919,9 +3939,8 @@ mod tests { let (tenant, ctx) = TenantHarness::create("test_traverse_ancestors")? .load() .await; - let mut tline = tenant - .create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)? - .initialize(&ctx)?; + let mut tline = + tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; const NUM_KEYS: usize = 100; const NUM_TLINES: usize = 50; @@ -3936,7 +3955,7 @@ mod tests { for idx in 0..NUM_TLINES { let new_tline_id = TimelineId::generate(); tenant - .branch_timeline(&tline, new_tline_id, Some(lsn), &ctx) + .branch_timeline_test(&tline, new_tline_id, Some(lsn), &ctx) .await?; tline = tenant .get_timeline(new_tline_id, true) diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 96aabd7945..c4640307d0 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1264,9 +1264,7 @@ mod tests { let harness = TenantHarness::create(test_name)?; let (tenant, ctx) = runtime.block_on(harness.load()); // create an empty timeline directory - let timeline = - tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let _ = timeline.initialize(&ctx).unwrap(); + let _ = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; let remote_fs_dir = harness.conf.workdir.join("remote_fs"); std::fs::create_dir_all(remote_fs_dir)?; diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 2305844d75..3da1f023e1 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -1309,9 +1309,8 @@ mod tests { async fn dummy_state(harness: &TenantHarness<'_>) -> ConnectionManagerState { let (tenant, ctx) = harness.load().await; let timeline = tenant - .create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx) + .create_test_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION, &ctx) .expect("Failed to create an empty timeline for dummy wal connection manager"); - let timeline = timeline.initialize(&ctx).unwrap(); ConnectionManagerState { id: TenantTimelineId {