diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 936de35eb9..9ad0124a80 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir( { pg_control = Some(control_file); } - modification.flush()?; + modification.flush().await?; } } // We're done importing all the data files. - modification.commit()?; + modification.commit().await?; // We expect the Postgres server to be shut down cleanly. let pg_control = pg_control.context("pg_control file not found")?; @@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar( // We found the pg_control file. pg_control = Some(res); } - modification.flush()?; + modification.flush().await?; } tokio_tar::EntryType::Directory => { debug!("directory {:?}", file_path); @@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar( // sanity check: ensure that pg_control is loaded let _pg_control = pg_control.context("pg_control file not found")?; - modification.commit()?; + modification.commit().await?; Ok(()) } @@ -594,7 +594,7 @@ async fn import_file( // zenith.signal is not necessarily the last file, that we handle // but it is ok to call `finish_write()`, because final `modification.commit()` // will update lsn once more to the final one. - let writer = modification.tline.writer(); + let writer = modification.tline.writer().await; writer.finish_write(prev_lsn); debug!("imported zenith signal {}", prev_lsn); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 769d02af0b..188b5d15ce 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1108,7 +1108,7 @@ impl<'a> DatadirModification<'a> { /// retains all the metadata, but data pages are flushed. That's again OK /// for bulk import, where you are just loading data pages and won't try to /// modify the same pages twice. - pub fn flush(&mut self) -> anyhow::Result<()> { + pub async fn flush(&mut self) -> anyhow::Result<()> { // Unless we have accumulated a decent amount of changes, it's not worth it // to scan through the pending_updates list. let pending_nblocks = self.pending_nblocks; @@ -1116,7 +1116,7 @@ impl<'a> DatadirModification<'a> { return Ok(()); } - let writer = self.tline.writer(); + let writer = self.tline.writer().await; let mut layer_map = self.tline.layers.write().unwrap(); @@ -1145,8 +1145,8 @@ impl<'a> DatadirModification<'a> { /// underlying timeline. /// All the modifications in this atomic update are stamped by the specified LSN. /// - pub fn commit(&mut self) -> anyhow::Result<()> { - let writer = self.tline.writer(); + pub async fn commit(&mut self) -> anyhow::Result<()> { + let writer = self.tline.writer().await; let lsn = self.lsn; let pending_nblocks = self.pending_nblocks; self.pending_nblocks = 0; @@ -1596,7 +1596,7 @@ fn is_slru_block_key(key: Key) -> bool { } #[cfg(test)] -pub fn create_test_timeline( +pub async fn create_test_timeline( tenant: &crate::tenant::Tenant, timeline_id: utils::id::TimelineId, pg_version: u32, @@ -1605,7 +1605,7 @@ pub fn create_test_timeline( let tline = tenant.create_test_timeline(timeline_id, Lsn(8), pg_version, ctx)?; let mut m = tline.begin_modification(Lsn(8)); m.init_empty()?; - m.commit()?; + m.commit().await?; Ok(tline) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index c5d9cbf03d..8aac7d869a 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3326,12 +3326,12 @@ mod tests { let (tenant, ctx) = TenantHarness::create("test_basic")?.load().await; let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; writer.finish_write(Lsn(0x10)); drop(writer); - let writer = tline.writer(); + let writer = tline.writer().await; writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -3389,7 +3389,7 @@ mod tests { let (tenant, ctx) = TenantHarness::create("test_branch")?.load().await; let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let writer = tline.writer(); + let writer = tline.writer().await; #[allow(non_snake_case)] let TEST_KEY_A: Key = Key::from_hex("112222222233333333444444445500000001").unwrap(); @@ -3415,7 +3415,7 @@ mod tests { let newtline = tenant .get_timeline(NEW_TIMELINE_ID, true) .expect("Should have a local timeline"); - let new_writer = newtline.writer(); + let new_writer = newtline.writer().await; new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?; new_writer.finish_write(Lsn(0x40)); @@ -3442,7 +3442,7 @@ mod tests { let mut lsn = start_lsn; #[allow(non_snake_case)] { - let writer = tline.writer(); + let writer = tline.writer().await; // Create a relation on the timeline writer.put( *TEST_KEY, @@ -3461,7 +3461,7 @@ mod tests { } tline.freeze_and_flush().await?; { - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( *TEST_KEY, lsn, @@ -3773,7 +3773,7 @@ mod tests { let (tenant, ctx) = TenantHarness::create("test_images")?.load().await; let tline = tenant.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)?; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; writer.finish_write(Lsn(0x10)); drop(writer); @@ -3781,7 +3781,7 @@ mod tests { tline.freeze_and_flush().await?; tline.compact(&ctx).await?; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -3789,7 +3789,7 @@ mod tests { tline.freeze_and_flush().await?; tline.compact(&ctx).await?; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?; writer.finish_write(Lsn(0x30)); drop(writer); @@ -3797,7 +3797,7 @@ mod tests { tline.freeze_and_flush().await?; tline.compact(&ctx).await?; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?; writer.finish_write(Lsn(0x40)); drop(writer); @@ -3847,7 +3847,7 @@ mod tests { for _ in 0..50 { for _ in 0..10000 { test_key.field6 = blknum; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( test_key, lsn, @@ -3895,7 +3895,7 @@ mod tests { for blknum in 0..NUM_KEYS { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( test_key, lsn, @@ -3913,7 +3913,7 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( test_key, lsn, @@ -3969,7 +3969,7 @@ mod tests { for blknum in 0..NUM_KEYS { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( test_key, lsn, @@ -3995,7 +3995,7 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( test_key, lsn, @@ -4060,7 +4060,7 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( test_key, lsn, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 763e159849..45ed18aebd 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -28,7 +28,7 @@ use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; use std::pin::pin; use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak}; +use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; use crate::context::{DownloadBehavior, RequestContext}; @@ -179,7 +179,7 @@ pub struct Timeline { /// Locked automatically by [`TimelineWriter`] and checkpointer. /// Must always be acquired before the layer map/individual layer lock /// to avoid deadlock. - write_lock: Mutex<()>, + write_lock: tokio::sync::Mutex<()>, /// Used to avoid multiple `flush_loop` tasks running flush_loop_state: Mutex, @@ -666,7 +666,7 @@ impl Timeline { /// Flush to disk all data that was written with the put_* functions #[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))] pub async fn freeze_and_flush(&self) -> anyhow::Result<()> { - self.freeze_inmem_layer(false); + self.freeze_inmem_layer(false).await; self.flush_frozen_layers_and_wait().await } @@ -846,10 +846,10 @@ impl Timeline { } /// Mutate the timeline with a [`TimelineWriter`]. - pub fn writer(&self) -> TimelineWriter<'_> { + pub async fn writer(&self) -> TimelineWriter<'_> { TimelineWriter { tl: self, - _write_guard: self.write_lock.lock().unwrap(), + _write_guard: self.write_lock.lock().await, } } @@ -883,7 +883,7 @@ impl Timeline { /// /// Also flush after a period of time without new data -- it helps /// safekeepers to regard pageserver as caught up and suspend activity. - pub fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { + pub async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { let last_lsn = self.get_last_record_lsn(); let layers = self.layers.read().unwrap(); if let Some(open_layer) = &layers.open_layer { @@ -907,7 +907,7 @@ impl Timeline { last_freeze_ts.elapsed() ); - self.freeze_inmem_layer(true); + self.freeze_inmem_layer(true).await; self.last_freeze_at.store(last_lsn); *(self.last_freeze_ts.write().unwrap()) = Instant::now(); @@ -1381,7 +1381,7 @@ impl Timeline { layer_flush_start_tx, layer_flush_done_tx, - write_lock: Mutex::new(()), + write_lock: tokio::sync::Mutex::new(()), layer_removal_cs: Default::default(), gc_info: std::sync::RwLock::new(GcInfo { @@ -2627,13 +2627,13 @@ impl Timeline { self.last_record_lsn.advance(new_lsn); } - fn freeze_inmem_layer(&self, write_lock_held: bool) { + async fn freeze_inmem_layer(&self, write_lock_held: bool) { // Freeze the current open in-memory layer. It will be written to disk on next // iteration. let _write_guard = if write_lock_held { None } else { - Some(self.write_lock.lock().unwrap()) + Some(self.write_lock.lock().await) }; let mut layers = self.layers.write().unwrap(); if let Some(open_layer) = &layers.open_layer { @@ -4434,7 +4434,7 @@ fn layer_traversal_error(msg: String, path: Vec) -> PageRecon // but will cause large code changes. pub struct TimelineWriter<'a> { tl: &'a Timeline, - _write_guard: MutexGuard<'a, ()>, + _write_guard: tokio::sync::MutexGuard<'a, ()>, } impl Deref for TimelineWriter<'_> { diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 1cbed3416c..91ff60603a 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -313,12 +313,15 @@ pub(super) async fn handle_walreceiver_connection( } } - timeline.check_checkpoint_distance().with_context(|| { - format!( - "Failed to check checkpoint distance for timeline {}", - timeline.timeline_id - ) - })?; + timeline + .check_checkpoint_distance() + .await + .with_context(|| { + format!( + "Failed to check checkpoint distance for timeline {}", + timeline.timeline_id + ) + })?; if let Some(last_lsn) = status_update { let timeline_remote_consistent_lsn = diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 4b8e6aa515..fc0cc5c81e 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -333,7 +333,7 @@ impl<'a> WalIngest<'a> { // Now that this record has been fully handled, including updating the // checkpoint data, let the repository know that it is up-to-date to this LSN - modification.commit()?; + modification.commit().await?; Ok(()) } @@ -1200,7 +1200,7 @@ mod tests { let mut m = tline.begin_modification(Lsn(0x10)); m.put_checkpoint(ZERO_CHECKPOINT.clone())?; m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file - m.commit()?; + m.commit().await?; let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?; Ok(walingest) @@ -1209,7 +1209,7 @@ mod tests { #[tokio::test] async fn test_relsize() -> Result<()> { let (tenant, ctx) = TenantHarness::create("test_relsize")?.load().await; - let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?; + let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?; let mut walingest = init_walingest_test(&tline, &ctx).await?; let mut m = tline.begin_modification(Lsn(0x20)); @@ -1217,22 +1217,22 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx) .await?; - m.commit()?; + m.commit().await?; let mut m = tline.begin_modification(Lsn(0x30)); walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx) .await?; - m.commit()?; + m.commit().await?; let mut m = tline.begin_modification(Lsn(0x40)); walingest .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx) .await?; - m.commit()?; + m.commit().await?; let mut m = tline.begin_modification(Lsn(0x50)); walingest .put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx) .await?; - m.commit()?; + m.commit().await?; assert_current_logical_size(&tline, Lsn(0x50)); @@ -1318,7 +1318,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx) .await?; - m.commit()?; + m.commit().await?; assert_current_logical_size(&tline, Lsn(0x60)); // Check reported size and contents after truncation @@ -1360,7 +1360,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx) .await?; - m.commit()?; + m.commit().await?; assert_eq!( tline .get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx) @@ -1373,7 +1373,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx) .await?; - m.commit()?; + m.commit().await?; assert_eq!( tline .get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx) @@ -1398,7 +1398,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx) .await?; - m.commit()?; + m.commit().await?; assert_eq!( tline .get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx) @@ -1428,14 +1428,14 @@ mod tests { #[tokio::test] async fn test_drop_extend() -> Result<()> { let (tenant, ctx) = TenantHarness::create("test_drop_extend")?.load().await; - let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?; + let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?; let mut walingest = init_walingest_test(&tline, &ctx).await?; let mut m = tline.begin_modification(Lsn(0x20)); walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx) .await?; - m.commit()?; + m.commit().await?; // Check that rel exists and size is correct assert_eq!( @@ -1454,7 +1454,7 @@ mod tests { // Drop rel let mut m = tline.begin_modification(Lsn(0x30)); walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?; - m.commit()?; + m.commit().await?; // Check that rel is not visible anymore assert_eq!( @@ -1472,7 +1472,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx) .await?; - m.commit()?; + m.commit().await?; // Check that rel exists and size is correct assert_eq!( @@ -1497,7 +1497,7 @@ mod tests { #[tokio::test] async fn test_truncate_extend() -> Result<()> { let (tenant, ctx) = TenantHarness::create("test_truncate_extend")?.load().await; - let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?; + let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?; let mut walingest = init_walingest_test(&tline, &ctx).await?; // Create a 20 MB relation (the size is arbitrary) @@ -1509,7 +1509,7 @@ mod tests { .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx) .await?; } - m.commit()?; + m.commit().await?; // The relation was created at LSN 20, not visible at LSN 1 yet. assert_eq!( @@ -1554,7 +1554,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx) .await?; - m.commit()?; + m.commit().await?; // Check reported size and contents after truncation assert_eq!( @@ -1603,7 +1603,7 @@ mod tests { .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx) .await?; } - m.commit()?; + m.commit().await?; assert_eq!( tline @@ -1637,7 +1637,7 @@ mod tests { #[tokio::test] async fn test_large_rel() -> Result<()> { let (tenant, ctx) = TenantHarness::create("test_large_rel")?.load().await; - let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx)?; + let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION, &ctx).await?; let mut walingest = init_walingest_test(&tline, &ctx).await?; let mut lsn = 0x10; @@ -1648,7 +1648,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx) .await?; - m.commit()?; + m.commit().await?; } assert_current_logical_size(&tline, Lsn(lsn)); @@ -1664,7 +1664,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx) .await?; - m.commit()?; + m.commit().await?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?, RELSEG_SIZE @@ -1677,7 +1677,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx) .await?; - m.commit()?; + m.commit().await?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?, RELSEG_SIZE - 1 @@ -1693,7 +1693,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx) .await?; - m.commit()?; + m.commit().await?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?, size as BlockNumber