diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index f4a0f3f18e..6372178b2d 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -512,7 +512,7 @@ async fn collect_eviction_candidates( if !tl.is_active() { continue; } - let info = tl.get_local_layers_for_disk_usage_eviction(); + let info = tl.get_local_layers_for_disk_usage_eviction().await; debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len()); tenant_candidates.extend( info.resident_layers diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index ed0021d744..a4428b9f9b 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -212,7 +212,7 @@ async fn build_timeline_info( ) -> anyhow::Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); - let mut info = build_timeline_info_common(timeline, ctx)?; + let mut info = build_timeline_info_common(timeline, ctx).await?; if include_non_incremental_logical_size { // XXX we should be using spawn_ondemand_logical_size_calculation here. // Otherwise, if someone deletes the timeline / detaches the tenant while @@ -230,7 +230,7 @@ async fn build_timeline_info( Ok(info) } -fn build_timeline_info_common( +async fn build_timeline_info_common( timeline: &Arc, ctx: &RequestContext, ) -> anyhow::Result { @@ -261,7 +261,7 @@ fn build_timeline_info_common( None } }; - let current_physical_size = Some(timeline.layer_size_sum()); + let current_physical_size = Some(timeline.layer_size_sum().await); let state = timeline.current_state(); let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0)); @@ -321,6 +321,7 @@ async fn timeline_create_handler(mut request: Request) -> Result { // Created. Construct a TimelineInfo for it. let timeline_info = build_timeline_info_common(&new_timeline, &ctx) + .await .map_err(ApiError::InternalServerError)?; json_response(StatusCode::CREATED, timeline_info) } @@ -551,7 +552,7 @@ async fn tenant_status(request: Request) -> Result, ApiErro // Calculate total physical size of all timelines let mut current_physical_size = 0; for timeline in tenant.list_timelines().iter() { - current_physical_size += timeline.layer_size_sum(); + current_physical_size += timeline.layer_size_sum().await; } let state = tenant.current_state(); @@ -655,7 +656,7 @@ async fn layer_map_info_handler(request: Request) -> Result check_permission(&request, Some(tenant_id))?; let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; - let layer_map_info = timeline.layer_map_info(reset); + let layer_map_info = timeline.layer_map_info(reset).await; json_response(StatusCode::OK, layer_map_info) } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 050d3f642b..20013d01a9 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -565,7 +565,7 @@ impl PageServerHandler { // since we discard some log files. info!("done, activating timeline"); - real_timeline_not_in_tenants_map.activate(self.broker_client.clone(), &ctx); + real_timeline_not_in_tenants_map.activate(self.broker_client.clone(), &ctx).await; Ok(()) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 021b33e10a..b0355f86c5 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1118,7 +1118,7 @@ impl<'a> DatadirModification<'a> { let writer = self.tline.writer().await; - let mut layer_map = self.tline.layers.write().unwrap(); + let mut layer_map = self.tline.layers.write().await; // Flush relation and SLRU data blocks, keep metadata. let mut result: anyhow::Result<()> = Ok(()); @@ -1152,10 +1152,10 @@ impl<'a> DatadirModification<'a> { self.pending_nblocks = 0; for (key, value) in self.pending_updates.drain() { - writer.put(key, lsn, &value)?; + writer.put(key, lsn, &value).await?; } for key_range in self.pending_deletions.drain(..) { - writer.delete(key_range, lsn)?; + writer.delete(key_range, lsn).await?; } writer.finish_write(lsn); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index eb71293ebb..99a6e806e1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -541,6 +541,7 @@ impl Tenant { // "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn and cannot be initialized"); timeline .load_layer_map(new_disk_consistent_lsn) + .await .with_context(|| { format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}") })?; @@ -578,7 +579,7 @@ impl Tenant { || timeline .layers .read() - .unwrap() + .await .iter_historic_layers() .next() .is_some(), @@ -592,7 +593,7 @@ impl Tenant { let has_layers = timeline .layers .read() - .unwrap() + .await .iter_historic_layers() .next() .is_some(); @@ -664,7 +665,7 @@ impl Tenant { match tenant_clone.attach(&ctx).await { Ok(()) => { info!("attach finished, activating"); - tenant_clone.activate(broker_client, &ctx); + tenant_clone.activate(broker_client, &ctx).await; } Err(e) => { error!("attach failed, setting tenant state to Broken: {:?}", e); @@ -962,7 +963,7 @@ impl Tenant { match tenant_clone.load(cause, &ctx).await { Ok(()) => { info!("load finished, activating"); - tenant_clone.activate(broker_client, &ctx); + tenant_clone.activate(broker_client, &ctx).await; } Err(err) => { error!("load failed, setting tenant state to Broken: {err:?}"); @@ -1312,7 +1313,6 @@ impl Tenant { .context("wait for initial uploads to complete")?; } - // XXX do we need to remove uninit mark before starting uploads? // If we die with uninit mark present, we'll leak the uploaded state in S3. Ok(()) }; @@ -1404,19 +1404,19 @@ impl Tenant { .context("creation_complete_remove_uninit_marker_and_get_placeholder_timeline")?; match self.timelines.lock().unwrap().entry(new_timeline_id) { - Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"), - Entry::Occupied(mut o) => { - info!("replacing placeholder timeline with the real one"); - assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating); - assert!(compare_arced_timeline(&placeholder_timeline, o.get())); - let replaced_placeholder = o.insert(Arc::clone(&real_timeline)); - assert!(compare_arced_timeline(&replaced_placeholder, &placeholder_timeline)); - }, - } + Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"), + Entry::Occupied(mut o) => { + info!("replacing placeholder timeline with the real one"); + assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating); + assert!(compare_arced_timeline(&placeholder_timeline, o.get())); + let replaced_placeholder = o.insert(Arc::clone(&real_timeline)); + assert!(compare_arced_timeline(&replaced_placeholder, &placeholder_timeline)); + }, + } // The non-test code would call tl.activate() here. real_timeline.maybe_spawn_flush_loop(); - real_timeline.set_state(TimelineState::Active); + real_timeline.set_state(TimelineState::Active).await; Ok(real_timeline) } @@ -1605,7 +1605,7 @@ impl Tenant { }, } - real_timeline.activate(broker_client, ctx); + real_timeline.activate(broker_client, ctx).await; Ok(Some(real_timeline)) } @@ -1736,17 +1736,16 @@ impl Tenant { }; let timeline = Arc::clone(timeline_entry.get()); - if timeline.current_state() == TimelineState::Creating { - return Err(DeleteTimelineError::Other(anyhow::anyhow!( - "timeline is creating" - ))); - } - timeline.set_state(TimelineState::Stopping); - - drop(timelines); timeline }; + if timeline.current_state() == TimelineState::Creating { + return Err(DeleteTimelineError::Other(anyhow::anyhow!( + "timeline is creating" + ))); + } + timeline.set_state(TimelineState::Stopping).await; + // Now that the Timeline is in Stopping state, request all the related tasks to // shut down. // @@ -1917,7 +1916,7 @@ impl Tenant { } /// Changes tenant status to active, unless shutdown was already requested. - fn activate(self: &Arc, broker_client: BrokerClientChannel, ctx: &RequestContext) { + async fn activate(self: &Arc, broker_client: BrokerClientChannel, ctx: &RequestContext) { debug_assert_current_span_has_tenant_id(); let mut activating = false; @@ -1937,10 +1936,14 @@ impl Tenant { }); if activating { - let timelines_accessor = self.timelines.lock().unwrap(); - let not_broken_timelines = timelines_accessor - .values() - .filter(|timeline| timeline.current_state() != TimelineState::Broken); + let not_broken_timelines = { + let timelines_accessor = self.timelines.lock().unwrap(); + timelines_accessor + .values() + .filter(|timeline| timeline.current_state() != TimelineState::Broken) + .cloned() + .collect::>() + }; // Spawn gc and compaction loops. The loops will shut themselves // down when they notice that the tenant is inactive. @@ -1948,8 +1951,8 @@ impl Tenant { let mut activated_timelines = 0; - for timeline in not_broken_timelines { - timeline.activate(broker_client.clone(), ctx); + for timeline in ¬_broken_timelines { + timeline.activate(broker_client.clone(), ctx).await; activated_timelines += 1; } @@ -1962,7 +1965,7 @@ impl Tenant { *current_state = TenantState::Active; let elapsed = self.loading_started_at.elapsed(); - let total_timelines = timelines_accessor.len(); + let total_timelines = not_broken_timelines.len(); // log a lot of stuff, because some tenants sometimes suffer from user-visible // times to activate. see https://github.com/neondatabase/neon/issues/4025 @@ -2039,12 +2042,16 @@ impl Tenant { ), } - let timelines_accessor = self.timelines.lock().unwrap(); - let not_broken_timelines = timelines_accessor - .values() - .filter(|timeline| timeline.current_state() != TimelineState::Broken); + let not_broken_timelines = { + let timelines_accessor = self.timelines.lock().unwrap(); + timelines_accessor + .values() + .filter(|timeline| timeline.current_state() != TimelineState::Broken) + .cloned() + .collect::>() + }; for timeline in not_broken_timelines { - timeline.set_state(TimelineState::Stopping); + timeline.set_state(TimelineState::Stopping).await; } Ok(()) } @@ -2849,7 +2856,7 @@ impl Tenant { }, } - real_timeline.set_state(TimelineState::Active); + real_timeline.set_state(TimelineState::Active).await; real_timeline.maybe_spawn_flush_loop(); Ok(real_timeline) } @@ -3061,11 +3068,7 @@ impl Tenant { .create_timeline_data(timeline_id, &new_metadata, None, remote_client.clone()) .context("Failed to create timeline data structure")?; - unfinished_timeline - .layers - .write() - .unwrap() - .next_open_layer_at = Some(pgdata_lsn); // pgdata_lsn == initdb_lsn + unfinished_timeline.layers.write().await.next_open_layer_at = Some(pgdata_lsn); // pgdata_lsn == initdb_lsn import_datadir::import_timeline_from_postgres_datadir( &unfinished_timeline, @@ -3110,7 +3113,7 @@ impl Tenant { } // XXX this is same shutdown code as in Timeline::delete, share it. - unfinished_timeline.set_state(TimelineState::Stopping); + unfinished_timeline.set_state(TimelineState::Stopping).await; task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id)).await; // XXX log message is a little too early, see caller for context @@ -3645,8 +3648,15 @@ pub mod harness { .instrument(info_span!("try_load", tenant_id=%self.tenant_id)) .await?; tenant.state.send_replace(TenantState::Active); - for timeline in tenant.timelines.lock().unwrap().values() { - timeline.set_state(TimelineState::Active); + let timelines = tenant + .timelines + .lock() + .unwrap() + .values() + .cloned() + .collect::>(); + for timeline in timelines { + timeline.set_state(TimelineState::Active).await; } Ok(tenant) } @@ -3710,12 +3720,16 @@ mod tests { .await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; + writer + .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) + .await?; writer.finish_write(Lsn(0x10)); drop(writer); let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; + writer + .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) + .await?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -3787,13 +3801,21 @@ mod tests { let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap(); // Insert a value on the timeline - writer.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))?; - writer.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))?; + writer + .put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20")) + .await?; + writer + .put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20")) + .await?; writer.finish_write(Lsn(0x20)); - writer.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))?; + writer + .put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30")) + .await?; writer.finish_write(Lsn(0x30)); - writer.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))?; + writer + .put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40")) + .await?; writer.finish_write(Lsn(0x40)); //assert_current_logical_size(&tline, Lsn(0x40)); @@ -3806,7 +3828,9 @@ mod tests { .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); let new_writer = newtline.writer().await; - new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?; + new_writer + .put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40")) + .await?; new_writer.finish_write(Lsn(0x40)); // Check page contents on both branches @@ -3834,36 +3858,44 @@ mod tests { { let writer = tline.writer().await; // Create a relation on the timeline - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; } tline.freeze_and_flush().await?; { let writer = tline.writer().await; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); lsn += 0x10; - writer.put( - *TEST_KEY, - lsn, - &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), - )?; + writer + .put( + *TEST_KEY, + lsn, + &Value::Image(TEST_IMG(&format!("foo at {}", lsn))), + ) + .await?; writer.finish_write(lsn); } tline.freeze_and_flush().await @@ -3975,7 +4007,7 @@ mod tests { make_some_layers(newtline.as_ref(), Lsn(0x60)).await?; - tline.set_state(TimelineState::Broken); + tline.set_state(TimelineState::Broken).await; tenant .gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx) @@ -4100,7 +4132,7 @@ mod tests { let child_tline = tenant .branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx) .await?; - child_tline.set_state(TimelineState::Active); + child_tline.set_state(TimelineState::Active).await; let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) @@ -4175,7 +4207,9 @@ mod tests { .await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; + writer + .put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10"))) + .await?; writer.finish_write(Lsn(0x10)); drop(writer); @@ -4183,7 +4217,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; + writer + .put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20"))) + .await?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -4191,7 +4227,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?; + writer + .put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30"))) + .await?; writer.finish_write(Lsn(0x30)); drop(writer); @@ -4199,7 +4237,9 @@ mod tests { tline.compact(&ctx).await?; let writer = tline.writer().await; - writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?; + writer + .put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40"))) + .await?; writer.finish_write(Lsn(0x40)); drop(writer); @@ -4251,11 +4291,13 @@ mod tests { for _ in 0..10000 { test_key.field6 = blknum; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); drop(writer); @@ -4301,11 +4343,13 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); updated[blknum] = lsn; drop(writer); @@ -4319,11 +4363,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); drop(writer); updated[blknum] = lsn; @@ -4376,11 +4422,13 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; writer.finish_write(lsn); updated[blknum] = lsn; drop(writer); @@ -4402,11 +4450,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + ) + .await?; println!("updating {} at {}", blknum, lsn); writer.finish_write(lsn); drop(writer); @@ -4468,11 +4518,13 @@ mod tests { let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; let writer = tline.writer().await; - writer.put( - test_key, - lsn, - &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))), - )?; + writer + .put( + test_key, + lsn, + &Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))), + ) + .await?; println!("updating [{}][{}] at {}", idx, blknum, lsn); writer.finish_write(lsn); drop(writer); diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index c453683fea..78bcfdafc0 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -304,7 +304,7 @@ impl InMemoryLayer { Ok(()) } - pub fn put_tombstone(&self, _key_range: Range, _lsn: Lsn) -> Result<()> { + pub async fn put_tombstone(&self, _key_range: Range, _lsn: Lsn) -> Result<()> { // TODO: Currently, we just leak the storage for any deleted keys Ok(()) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 08a06b7812..0dd9674ad8 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -119,7 +119,7 @@ pub struct Timeline { pub pg_version: u32, - pub(crate) layers: RwLock>, + pub(crate) layers: tokio::sync::RwLock>, /// Set of key ranges which should be covered by image layers to /// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored. @@ -238,7 +238,7 @@ pub struct Timeline { eviction_task_timeline_state: tokio::sync::Mutex, } -type LayerMapWriteLockGuard<'t> = std::sync::RwLockWriteGuard<'t, LayerMap>; +type LayerMapWriteLockGuard<'t> = tokio::sync::RwLockWriteGuard<'t, LayerMap>; /// Internal structure to hold all data needed for logical size calculation. /// @@ -574,8 +574,8 @@ impl Timeline { /// The sum of the file size of all historic layers in the layer map. /// This method makes no distinction between local and remote layers. /// Hence, the result **does not represent local filesystem usage**. - pub fn layer_size_sum(&self) -> u64 { - let layer_map = self.layers.read().unwrap(); + pub async fn layer_size_sum(&self) -> u64 { + let layer_map = self.layers.read().await; let mut size = 0; for l in layer_map.iter_historic_layers() { size += l.file_size(); @@ -669,7 +669,7 @@ impl Timeline { if self.current_state() == TimelineState::Creating { debug!("timelines in Creating state are never written to"); assert!( - self.layers.read().unwrap().open_layer.is_none(), + self.layers.read().await.open_layer.is_none(), "would have nothing to flush anyways" ); return Ok(()); @@ -898,7 +898,7 @@ impl Timeline { /// safekeepers to regard pageserver as caught up and suspend activity. pub async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { let last_lsn = self.get_last_record_lsn(); - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; if let Some(open_layer) = &layers.open_layer { let open_layer_size = open_layer.size()?; drop(layers); @@ -931,17 +931,17 @@ impl Timeline { Ok(()) } - pub fn activate(self: &Arc, broker_client: BrokerClientChannel, ctx: &RequestContext) { + pub async fn activate(self: &Arc, broker_client: BrokerClientChannel, ctx: &RequestContext) { if self.current_state() == TimelineState::Creating { panic!("timelines in Creating state are never activated"); } self.maybe_spawn_flush_loop(); self.launch_wal_receiver(ctx, broker_client); - self.set_state(TimelineState::Active); + self.set_state(TimelineState::Active).await; self.launch_eviction_task(); } - pub fn set_state(&self, new_state: TimelineState) { + pub async fn set_state(&self, new_state: TimelineState) { if self.current_state() == TimelineState::Creating { info!("timelines in Creating state are never activated, nothing to stop"); assert_eq!( @@ -949,7 +949,7 @@ impl Timeline { FlushLoopState::NotStarted ); assert!( - self.layers.read().unwrap().open_layer.is_none(), + self.layers.read().await.open_layer.is_none(), "would have nothing to flush anyways" ); assert!(self.walreceiver.lock().unwrap().is_none()); @@ -1020,8 +1020,8 @@ impl Timeline { } } - pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { - let layer_map = self.layers.read().unwrap(); + pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { + let layer_map = self.layers.read().await; let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { in_memory_layers.push(open_layer.info()); @@ -1043,7 +1043,7 @@ impl Timeline { #[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))] pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) }; if self.remote_client.is_none() { return Ok(Some(false)); @@ -1056,7 +1056,7 @@ impl Timeline { /// Like [`evict_layer_batch`], but for just one layer. /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`. pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; let remote_client = self .remote_client .as_ref() @@ -1141,7 +1141,7 @@ impl Timeline { } // start the batch update - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write().await; let mut batch_updates = layer_map.batch_update(); let mut results = Vec::with_capacity(layers_to_evict.len()); @@ -1389,7 +1389,7 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: RwLock::new(LayerMap::default()), + layers: tokio::sync::RwLock::new(LayerMap::default()), wanted_image_layers: Mutex::new(None), walredo_mgr, @@ -1590,8 +1590,8 @@ impl Timeline { /// Scan the timeline directory to populate the layer map. /// Returns all timeline-related files that were found and loaded. /// - pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { - let mut layers = self.layers.write().unwrap(); + pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let mut num_layers = 0; @@ -1720,7 +1720,7 @@ impl Timeline { // We're holding a layer map lock for a while but this // method is only called during init so it's fine. - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write().await; let mut updates = layer_map.batch_update(); for remote_layer_name in &index_part.timeline_layers { let local_layer = local_only_layers.remove(remote_layer_name); @@ -1873,7 +1873,7 @@ impl Timeline { let local_layers = self .layers .read() - .unwrap() + .await .iter_historic_layers() .map(|l| (l.filename(), l)) .collect::>(); @@ -2232,8 +2232,8 @@ impl Timeline { } } - fn find_layer(&self, layer_file_name: &str) -> Option> { - for historic_layer in self.layers.read().unwrap().iter_historic_layers() { + async fn find_layer(&self, layer_file_name: &str) -> Option> { + for historic_layer in self.layers.read().await.iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); if layer_file_name == historic_layer_name { return Some(historic_layer); @@ -2440,7 +2440,7 @@ impl Timeline { #[allow(clippy::never_loop)] // see comment at bottom of this loop 'layer_map_search: loop { let remote_layer = { - let layers = timeline.layers.read().unwrap(); + let layers = timeline.layers.read().await; // Check the open and frozen in-memory layers first, in order from newest // to oldest. @@ -2619,8 +2619,8 @@ impl Timeline { /// /// Get a handle to the latest layer for appending. /// - fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { - let mut layers = self.layers.write().unwrap(); + async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { + let mut layers = self.layers.write().await; self.get_layer_for_write_locked(lsn, &mut layers) } @@ -2671,9 +2671,9 @@ impl Timeline { Ok(layer) } - fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { + async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { //info!("PUT: key {} at {}", key, lsn); - let layer = self.get_layer_for_write(lsn)?; + let layer = self.get_layer_for_write(lsn).await?; layer.put_value(key, lsn, val)?; Ok(()) } @@ -2691,9 +2691,9 @@ impl Timeline { Ok(()) } - fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - let layer = self.get_layer_for_write(lsn)?; - layer.put_tombstone(key_range, lsn)?; + async fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { + let layer = self.get_layer_for_write(lsn).await?; + layer.put_tombstone(key_range, lsn).await?; Ok(()) } @@ -2713,7 +2713,7 @@ impl Timeline { } else { Some(self.write_lock.lock().await) }; - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; if let Some(open_layer) = &layers.open_layer { let open_layer_rc = Arc::clone(open_layer); // Does this layer need freezing? @@ -2751,7 +2751,7 @@ impl Timeline { let flush_counter = *layer_flush_start_rx.borrow(); let result = loop { let layer_to_flush = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; layers.frozen_layers.front().cloned() // drop 'layers' lock to allow concurrent reads and writes }; @@ -2843,7 +2843,7 @@ impl Timeline { .await? } else { // normal case, write out a L0 delta layer file. - let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?; + let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?; HashMap::from([(delta_path, metadata)]) }; @@ -2852,7 +2852,7 @@ impl Timeline { // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. { - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let l = layers.frozen_layers.pop_front(); // Only one thread may call this function at a time (for this @@ -2946,7 +2946,7 @@ impl Timeline { } // Write out the given frozen in-memory layer as a new L0 delta file - fn create_delta_layer( + async fn create_delta_layer( &self, frozen_layer: &InMemoryLayer, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { @@ -2970,7 +2970,7 @@ impl Timeline { // Add it to the layer map let l = Arc::new(new_delta); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut batch_updates = layers.batch_update(); l.access_stats().record_residence_event( &batch_updates, @@ -3022,10 +3022,14 @@ impl Timeline { } // Is it time to create a new image layer for the given partition? - fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result { + async fn time_for_new_image_layer( + &self, + partition: &KeySpace, + lsn: Lsn, + ) -> anyhow::Result { let threshold = self.get_image_creation_threshold(); - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut max_deltas = 0; { @@ -3120,7 +3124,7 @@ impl Timeline { for partition in partitioning.parts.iter() { let img_range = start..partition.ranges.last().unwrap().end; start = img_range.end; - if force || self.time_for_new_image_layer(partition, lsn)? { + if force || self.time_for_new_image_layer(partition, lsn).await? { let mut image_layer_writer = ImageLayerWriter::new( self.conf, self.timeline_id, @@ -3198,7 +3202,7 @@ impl Timeline { let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); for l in image_layers { @@ -3265,7 +3269,7 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut level0_deltas = layers.get_level0_deltas()?; // Only compact if enough layers have accumulated. @@ -3623,7 +3627,7 @@ impl Timeline { .context("wait for layer upload ops to complete")?; } - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); for l in new_layers { @@ -3888,7 +3892,7 @@ impl Timeline { // 4. newer on-disk image layers cover the layer's whole key range // // TODO holding a write lock is too agressive and avoidable - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; 'outer: for l in layers.iter_historic_layers() { result.layers_total += 1; @@ -4184,7 +4188,7 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let mut layers = self_clone.layers.write().unwrap(); + let mut layers = self_clone.layers.write().await; let mut updates = layers.batch_update(); let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); { @@ -4342,7 +4346,7 @@ impl Timeline { ) { let mut downloads = Vec::new(); { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; layers .iter_historic_layers() .filter_map(|l| l.downcast_remote_layer()) @@ -4444,8 +4448,8 @@ impl LocalLayerInfoForDiskUsageEviction { } impl Timeline { - pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { - let layers = self.layers.read().unwrap(); + pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { + let layers = self.layers.read().await; let mut max_layer_size: Option = None; let mut resident_layers = Vec::new(); @@ -4533,8 +4537,8 @@ impl<'a> TimelineWriter<'a> { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { - self.tl.put_value(key, lsn, value) + pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { + self.tl.put_value(key, lsn, value).await } pub fn put_locked( @@ -4548,8 +4552,8 @@ impl<'a> TimelineWriter<'a> { .put_value_locked(key, lsn, value, pre_locked_layer_map) } - pub fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - self.tl.put_tombstone(key_range, lsn) + pub async fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { + self.tl.put_tombstone(key_range, lsn).await } /// Track the end of the latest digested WAL record. diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 558600692e..a7f24c52ed 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -185,7 +185,7 @@ impl Timeline { // We don't want to hold the layer map lock during eviction. // So, we just need to deal with this. let candidates: Vec> = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { if hist_layer.is_remote_layer() {