From 9fe7b7a079e8ae6c8b543e9034763f1fc7253ec6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 12 May 2023 11:39:13 +0200 Subject: [PATCH] turn Timeline::layers into tokio::sync::RwLock --- pageserver/src/disk_usage_eviction_task.rs | 2 +- pageserver/src/http/routes.rs | 11 ++- pageserver/src/pgdatadir_mapping.rs | 6 +- pageserver/src/tenant.rs | 32 +++++-- .../tenant/storage_layer/inmemory_layer.rs | 2 +- pageserver/src/tenant/timeline.rs | 96 ++++++++++--------- .../src/tenant/timeline/eviction_task.rs | 2 +- 7 files changed, 84 insertions(+), 67 deletions(-) diff --git a/pageserver/src/disk_usage_eviction_task.rs b/pageserver/src/disk_usage_eviction_task.rs index 7f8691d81e..9e5f644759 100644 --- a/pageserver/src/disk_usage_eviction_task.rs +++ b/pageserver/src/disk_usage_eviction_task.rs @@ -512,7 +512,7 @@ async fn collect_eviction_candidates( if !tl.is_active() { continue; } - let info = tl.get_local_layers_for_disk_usage_eviction(); + let info = tl.get_local_layers_for_disk_usage_eviction().await; debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len()); tenant_candidates.extend( info.resident_layers diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index e7192f5589..bdea0d256f 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -171,7 +171,7 @@ async fn build_timeline_info( ) -> anyhow::Result { crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id(); - let mut info = build_timeline_info_common(timeline, ctx)?; + let mut info = build_timeline_info_common(timeline, ctx).await?; if include_non_incremental_logical_size { // XXX we should be using spawn_ondemand_logical_size_calculation here. // Otherwise, if someone deletes the timeline / detaches the tenant while @@ -189,7 +189,7 @@ async fn build_timeline_info( Ok(info) } -fn build_timeline_info_common( +async fn build_timeline_info_common( timeline: &Arc, ctx: &RequestContext, ) -> anyhow::Result { @@ -220,7 +220,7 @@ fn build_timeline_info_common( None } }; - let current_physical_size = Some(timeline.layer_size_sum()); + let current_physical_size = Some(timeline.layer_size_sum().await); let state = timeline.current_state(); let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0)); @@ -279,6 +279,7 @@ async fn timeline_create_handler(mut request: Request) -> Result { // Created. Construct a TimelineInfo for it. let timeline_info = build_timeline_info_common(&new_timeline, &ctx) + .await .map_err(ApiError::InternalServerError)?; json_response(StatusCode::CREATED, timeline_info) } @@ -492,7 +493,7 @@ async fn tenant_status(request: Request) -> Result, ApiErro // Calculate total physical size of all timelines let mut current_physical_size = 0; for timeline in tenant.list_timelines().await.iter() { - current_physical_size += timeline.layer_size_sum(); + current_physical_size += timeline.layer_size_sum().await; } let state = tenant.current_state(); @@ -593,7 +594,7 @@ async fn layer_map_info_handler(request: Request) -> Result check_permission(&request, Some(tenant_id))?; let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?; - let layer_map_info = timeline.layer_map_info(reset); + let layer_map_info = timeline.layer_map_info(reset).await; json_response(StatusCode::OK, layer_map_info) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 545dfe3151..f18ceff252 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1118,7 +1118,7 @@ impl<'a> DatadirModification<'a> { let writer = self.tline.writer().await; - let mut layer_map = self.tline.layers.write().unwrap(); + let mut layer_map = self.tline.layers.write().await; // Flush relation and SLRU data blocks, keep metadata. let mut result: anyhow::Result<()> = Ok(()); @@ -1152,10 +1152,10 @@ impl<'a> DatadirModification<'a> { self.pending_nblocks = 0; for (key, value) in self.pending_updates.drain() { - writer.put(key, lsn, &value)?; + writer.put(key, lsn, &value).await?; } for key_range in self.pending_deletions.drain(..) { - writer.delete(key_range, lsn)?; + writer.delete(key_range, lsn).await?; } writer.finish_write(lsn); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index fd43739a9d..acf02449b3 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -188,6 +188,7 @@ impl UninitializedTimeline<'_> { pub async fn initialize(self, ctx: &RequestContext) -> anyhow::Result> { let mut timelines = self.owning_tenant.timelines.lock().await; self.initialize_with_lock(ctx, &mut timelines, true, true) + .await } /// Like `initialize`, but the caller is already holding lock on Tenant::timelines. @@ -195,7 +196,7 @@ impl UninitializedTimeline<'_> { /// timeline is initialized in Active state. This is used during tenant load and /// attach, where the WAL receivers are launched only after all the timelines have /// been initialized. - fn initialize_with_lock( + async fn initialize_with_lock( mut self, ctx: &RequestContext, timelines: &mut HashMap>, @@ -222,6 +223,7 @@ impl UninitializedTimeline<'_> { if load_layer_map { new_timeline .load_layer_map(new_disk_consistent_lsn) + .await .with_context(|| { format!( "Failed to load layermap for timeline {tenant_id}/{timeline_id}" @@ -279,6 +281,7 @@ impl UninitializedTimeline<'_> { // updated it for the layers that we created during the import. let mut timelines = self.owning_tenant.timelines.lock().await; self.initialize_with_lock(ctx, &mut timelines, false, true) + .await } fn raw_timeline(&self) -> anyhow::Result<&Arc> { @@ -518,7 +521,10 @@ impl Tenant { // Do not start walreceiver here. We do need loaded layer map for reconcile_with_remote // But we shouldnt start walreceiver before we have all the data locally, because working walreceiver // will ingest data which may require looking at the layers which are not yet available locally - match timeline.initialize_with_lock(ctx, &mut timelines_accessor, true, false) { + match timeline + .initialize_with_lock(ctx, &mut timelines_accessor, true, false) + .await + { Ok(new_timeline) => new_timeline, Err(e) => { error!("Failed to initialize timeline {tenant_id}/{timeline_id}: {e:?}"); @@ -563,7 +569,7 @@ impl Tenant { || timeline .layers .read() - .unwrap() + .await .iter_historic_layers() .next() .is_some(), @@ -1241,6 +1247,7 @@ impl Tenant { true, None, ) + .await } /// Create a new timeline. @@ -2433,8 +2440,10 @@ impl Tenant { timeline_uninit_mark, false, Some(Arc::clone(src_timeline)), - )? - .initialize_with_lock(ctx, &mut timelines, true, true)?; + ) + .await? + .initialize_with_lock(ctx, &mut timelines, true, true) + .await?; drop(timelines); // Root timeline gets its layers during creation and uploads them along with the metadata. @@ -2509,8 +2518,9 @@ impl Tenant { pgdata_lsn, pg_version, ); - let raw_timeline = - self.prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None)?; + let raw_timeline = self + .prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None) + .await?; let tenant_id = raw_timeline.owning_tenant.tenant_id; let unfinished_timeline = raw_timeline.raw_timeline()?; @@ -2549,7 +2559,9 @@ impl Tenant { // map above, when we imported the datadir. let timeline = { let mut timelines = self.timelines.lock().await; - raw_timeline.initialize_with_lock(ctx, &mut timelines, false, true)? + raw_timeline + .initialize_with_lock(ctx, &mut timelines, false, true) + .await? }; info!( @@ -2563,7 +2575,7 @@ impl Tenant { /// Creates intermediate timeline structure and its files, without loading it into memory. /// It's up to the caller to import the necesary data and import the timeline into memory. - fn prepare_timeline( + async fn prepare_timeline( &self, new_timeline_id: TimelineId, new_metadata: &TimelineMetadata, @@ -2595,7 +2607,7 @@ impl Tenant { ) { Ok(new_timeline) => { if init_layers { - new_timeline.layers.write().unwrap().next_open_layer_at = + new_timeline.layers.write().await.next_open_layer_at = Some(new_timeline.initdb_lsn); } debug!( diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index c453683fea..78bcfdafc0 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -304,7 +304,7 @@ impl InMemoryLayer { Ok(()) } - pub fn put_tombstone(&self, _key_range: Range, _lsn: Lsn) -> Result<()> { + pub async fn put_tombstone(&self, _key_range: Range, _lsn: Lsn) -> Result<()> { // TODO: Currently, we just leak the storage for any deleted keys Ok(()) diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 67bc1e640b..f57dfef205 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -121,7 +121,7 @@ pub struct Timeline { pub pg_version: u32, - pub(crate) layers: RwLock>, + pub(crate) layers: tokio::sync::RwLock>, last_freeze_at: AtomicLsn, // Atomic would be more appropriate here. @@ -229,7 +229,7 @@ pub struct Timeline { eviction_task_timeline_state: tokio::sync::Mutex, } -type LayerMapWriteLockGuard<'t> = std::sync::RwLockWriteGuard<'t, LayerMap>; +type LayerMapWriteLockGuard<'t> = tokio::sync::RwLockWriteGuard<'t, LayerMap>; /// Internal structure to hold all data needed for logical size calculation. /// @@ -548,8 +548,8 @@ impl Timeline { /// The sum of the file size of all historic layers in the layer map. /// This method makes no distinction between local and remote layers. /// Hence, the result **does not represent local filesystem usage**. - pub fn layer_size_sum(&self) -> u64 { - let layer_map = self.layers.read().unwrap(); + pub async fn layer_size_sum(&self) -> u64 { + let layer_map = self.layers.read().await; let mut size = 0; for l in layer_map.iter_historic_layers() { size += l.file_size(); @@ -849,7 +849,7 @@ impl Timeline { /// safekeepers to regard pageserver as caught up and suspend activity. pub async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { let last_lsn = self.get_last_record_lsn(); - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; if let Some(open_layer) = &layers.open_layer { let open_layer_size = open_layer.size()?; drop(layers); @@ -928,8 +928,8 @@ impl Timeline { self.state.subscribe() } - pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { - let layer_map = self.layers.read().unwrap(); + pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo { + let layer_map = self.layers.read().await; let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1); if let Some(open_layer) = &layer_map.open_layer { in_memory_layers.push(open_layer.info()); @@ -951,7 +951,7 @@ impl Timeline { #[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))] pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) }; if self.remote_client.is_none() { return Ok(Some(false)); @@ -964,7 +964,7 @@ impl Timeline { /// Like [`evict_layer_batch`], but for just one layer. /// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`. pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result> { - let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) }; + let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) }; let remote_client = self .remote_client .as_ref() @@ -1049,7 +1049,7 @@ impl Timeline { } // start the batch update - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write().await; let mut batch_updates = layer_map.batch_update(); let mut results = Vec::with_capacity(layers_to_evict.len()); @@ -1313,7 +1313,7 @@ impl Timeline { timeline_id, tenant_id, pg_version, - layers: RwLock::new(LayerMap::default()), + layers: tokio::sync::RwLock::new(LayerMap::default()), walredo_mgr, walreceiver, @@ -1453,8 +1453,8 @@ impl Timeline { /// Scan the timeline directory to populate the layer map. /// Returns all timeline-related files that were found and loaded. /// - pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { - let mut layers = self.layers.write().unwrap(); + pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> { + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let mut num_layers = 0; @@ -1583,7 +1583,7 @@ impl Timeline { // We're holding a layer map lock for a while but this // method is only called during init so it's fine. - let mut layer_map = self.layers.write().unwrap(); + let mut layer_map = self.layers.write().await; let mut updates = layer_map.batch_update(); for remote_layer_name in &index_part.timeline_layers { let local_layer = local_only_layers.remove(remote_layer_name); @@ -1736,7 +1736,7 @@ impl Timeline { let local_layers = self .layers .read() - .unwrap() + .await .iter_historic_layers() .map(|l| (l.filename(), l)) .collect::>(); @@ -2070,8 +2070,8 @@ impl Timeline { } } - fn find_layer(&self, layer_file_name: &str) -> Option> { - for historic_layer in self.layers.read().unwrap().iter_historic_layers() { + async fn find_layer(&self, layer_file_name: &str) -> Option> { + for historic_layer in self.layers.read().await.iter_historic_layers() { let historic_layer_name = historic_layer.filename().file_name(); if layer_file_name == historic_layer_name { return Some(historic_layer); @@ -2238,7 +2238,7 @@ impl Timeline { #[allow(clippy::never_loop)] // see comment at bottom of this loop 'layer_map_search: loop { let remote_layer = { - let layers = timeline.layers.read().unwrap(); + let layers = timeline.layers.read().await; // Check the open and frozen in-memory layers first, in order from newest // to oldest. @@ -2417,8 +2417,8 @@ impl Timeline { /// /// Get a handle to the latest layer for appending. /// - fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { - let mut layers = self.layers.write().unwrap(); + async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result> { + let mut layers = self.layers.write().await; self.get_layer_for_write_locked(lsn, &mut layers) } @@ -2469,9 +2469,9 @@ impl Timeline { Ok(layer) } - fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { + async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> { //info!("PUT: key {} at {}", key, lsn); - let layer = self.get_layer_for_write(lsn)?; + let layer = self.get_layer_for_write(lsn).await?; layer.put_value(key, lsn, val)?; Ok(()) } @@ -2489,9 +2489,9 @@ impl Timeline { Ok(()) } - fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - let layer = self.get_layer_for_write(lsn)?; - layer.put_tombstone(key_range, lsn)?; + async fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { + let layer = self.get_layer_for_write(lsn).await?; + layer.put_tombstone(key_range, lsn).await?; Ok(()) } @@ -2511,7 +2511,7 @@ impl Timeline { } else { Some(self.write_lock.lock().await) }; - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; if let Some(open_layer) = &layers.open_layer { let open_layer_rc = Arc::clone(open_layer); // Does this layer need freezing? @@ -2549,7 +2549,7 @@ impl Timeline { let flush_counter = *layer_flush_start_rx.borrow(); let result = loop { let layer_to_flush = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; layers.frozen_layers.front().cloned() // drop 'layers' lock to allow concurrent reads and writes }; @@ -2641,7 +2641,7 @@ impl Timeline { .await? } else { // normal case, write out a L0 delta layer file. - let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?; + let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?; HashMap::from([(delta_path, metadata)]) }; @@ -2650,7 +2650,7 @@ impl Timeline { // The new on-disk layers are now in the layer map. We can remove the // in-memory layer from the map now. { - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let l = layers.frozen_layers.pop_front(); // Only one thread may call this function at a time (for this @@ -2744,7 +2744,7 @@ impl Timeline { } // Write out the given frozen in-memory layer as a new L0 delta file - fn create_delta_layer( + async fn create_delta_layer( &self, frozen_layer: &InMemoryLayer, ) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> { @@ -2768,7 +2768,7 @@ impl Timeline { // Add it to the layer map let l = Arc::new(new_delta); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut batch_updates = layers.batch_update(); l.access_stats().record_residence_event( &batch_updates, @@ -2820,10 +2820,14 @@ impl Timeline { } // Is it time to create a new image layer for the given partition? - fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result { + async fn time_for_new_image_layer( + &self, + partition: &KeySpace, + lsn: Lsn, + ) -> anyhow::Result { let threshold = self.get_image_creation_threshold(); - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut max_deltas = 0; @@ -2894,7 +2898,7 @@ impl Timeline { for partition in partitioning.parts.iter() { let img_range = start..partition.ranges.last().unwrap().end; start = img_range.end; - if force || self.time_for_new_image_layer(partition, lsn)? { + if force || self.time_for_new_image_layer(partition, lsn).await? { let mut image_layer_writer = ImageLayerWriter::new( self.conf, self.timeline_id, @@ -2966,7 +2970,7 @@ impl Timeline { let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len()); - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); for l in image_layers { @@ -3033,7 +3037,7 @@ impl Timeline { target_file_size: u64, ctx: &RequestContext, ) -> Result { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut level0_deltas = layers.get_level0_deltas()?; // Only compact if enough layers have accumulated. @@ -3391,7 +3395,7 @@ impl Timeline { .context("wait for layer upload ops to complete")?; } - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; let mut updates = layers.batch_update(); let mut new_layer_paths = HashMap::with_capacity(new_layers.len()); for l in new_layers { @@ -3650,7 +3654,7 @@ impl Timeline { // 4. newer on-disk image layers cover the layer's whole key range // // TODO holding a write lock is too agressive and avoidable - let mut layers = self.layers.write().unwrap(); + let mut layers = self.layers.write().await; 'outer: for l in layers.iter_historic_layers() { result.layers_total += 1; @@ -3933,7 +3937,7 @@ impl Timeline { // Download complete. Replace the RemoteLayer with the corresponding // Delta- or ImageLayer in the layer map. - let mut layers = self_clone.layers.write().unwrap(); + let mut layers = self_clone.layers.write().await; let mut updates = layers.batch_update(); let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size); { @@ -4091,7 +4095,7 @@ impl Timeline { ) { let mut downloads = Vec::new(); { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; layers .iter_historic_layers() .filter_map(|l| l.downcast_remote_layer()) @@ -4193,8 +4197,8 @@ impl LocalLayerInfoForDiskUsageEviction { } impl Timeline { - pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { - let layers = self.layers.read().unwrap(); + pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo { + let layers = self.layers.read().await; let mut max_layer_size: Option = None; let mut resident_layers = Vec::new(); @@ -4282,8 +4286,8 @@ impl<'a> TimelineWriter<'a> { /// /// This will implicitly extend the relation, if the page is beyond the /// current end-of-file. - pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { - self.tl.put_value(key, lsn, value) + pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> { + self.tl.put_value(key, lsn, value).await } pub fn put_locked( @@ -4297,8 +4301,8 @@ impl<'a> TimelineWriter<'a> { .put_value_locked(key, lsn, value, pre_locked_layer_map) } - pub fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { - self.tl.put_tombstone(key_range, lsn) + pub async fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { + self.tl.put_tombstone(key_range, lsn).await } /// Track the end of the latest digested WAL record. diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 5ea3d5b14d..64c44a1974 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -185,7 +185,7 @@ impl Timeline { // We don't want to hold the layer map lock during eviction. // So, we just need to deal with this. let candidates: Vec> = { - let layers = self.layers.read().unwrap(); + let layers = self.layers.read().await; let mut candidates = Vec::new(); for hist_layer in layers.iter_historic_layers() { if hist_layer.is_remote_layer() {