From 7de3799e66946556e5ea3d2ac77fe594a4964316 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Fri, 12 May 2023 10:53:57 +0200 Subject: [PATCH] (does not compile): make TimelineWriter `Send` by using tokio::sync Mutex internally fails with cs@devvm:[~/src/neon]: cargo check -p pageserver --features testing Checking pageserver v0.1.0 (/home/cs/src/neon/pageserver) error: future cannot be sent between threads safely --> pageserver/src/tenant/timeline/walreceiver/connection_manager.rs:426:33 | 426 | let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| { | ^^^^^^^^^^^^^^^^^ future created by async block is not `Send` | = help: within `Instrumented<[async block@pageserver/src/tenant/timeline/walreceiver/connection_manager.rs:427:13: 439:14]>`, the trait `std::marker::Send` is not implemented for `std::sync::RwLockReadGuard<'_, LayerMap>` note: future is not `Send` as this value is used across an await --> pageserver/src/tenant/timeline.rs:872:46 | 850 | let layers = self.layers.read().unwrap(); | ------ has type `std::sync::RwLockReadGuard<'_, LayerMap>` which is not `Send` ... 872 | self.freeze_inmem_layer(true).await; | ^^^^^^ await occurs here, with `layers` maybe used later ... 881 | } | - `layers` is later dropped here note: required by a bound in `TaskHandle::::spawn` --> pageserver/src/tenant/timeline/walreceiver.rs:196:52 | 192 | fn spawn( | ----- required by a bound in this ... 196 | Fut: Future> + Send, | ^^^^ required by this bound in `TaskHandle::::spawn` error: could not compile `pageserver` due to previous error --- pageserver/src/import_datadir.rs | 10 ++--- pageserver/src/pgdatadir_mapping.rs | 10 ++--- pageserver/src/tenant.rs | 32 +++++++-------- pageserver/src/tenant/timeline.rs | 22 +++++----- .../walreceiver/walreceiver_connection.rs | 15 ++++--- pageserver/src/walingest.rs | 40 +++++++++---------- 6 files changed, 66 insertions(+), 63 deletions(-) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 936de35eb9..9ad0124a80 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir( { pg_control = Some(control_file); } - modification.flush()?; + modification.flush().await?; } } // We're done importing all the data files. - modification.commit()?; + modification.commit().await?; // We expect the Postgres server to be shut down cleanly. let pg_control = pg_control.context("pg_control file not found")?; @@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar( // We found the pg_control file. pg_control = Some(res); } - modification.flush()?; + modification.flush().await?; } tokio_tar::EntryType::Directory => { debug!("directory {:?}", file_path); @@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar( // sanity check: ensure that pg_control is loaded let _pg_control = pg_control.context("pg_control file not found")?; - modification.commit()?; + modification.commit().await?; Ok(()) } @@ -594,7 +594,7 @@ async fn import_file( // zenith.signal is not necessarily the last file, that we handle // but it is ok to call `finish_write()`, because final `modification.commit()` // will update lsn once more to the final one. - let writer = modification.tline.writer(); + let writer = modification.tline.writer().await; writer.finish_write(prev_lsn); debug!("imported zenith signal {}", prev_lsn); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 0b20186efb..89f58c049e 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1108,7 +1108,7 @@ impl<'a> DatadirModification<'a> { /// retains all the metadata, but data pages are flushed. That's again OK /// for bulk import, where you are just loading data pages and won't try to /// modify the same pages twice. - pub fn flush(&mut self) -> anyhow::Result<()> { + pub async fn flush(&mut self) -> anyhow::Result<()> { // Unless we have accumulated a decent amount of changes, it's not worth it // to scan through the pending_updates list. let pending_nblocks = self.pending_nblocks; @@ -1116,7 +1116,7 @@ impl<'a> DatadirModification<'a> { return Ok(()); } - let writer = self.tline.writer(); + let writer = self.tline.writer().await; let mut layer_map = self.tline.layers.write().unwrap(); @@ -1145,8 +1145,8 @@ impl<'a> DatadirModification<'a> { /// underlying timeline. /// All the modifications in this atomic update are stamped by the specified LSN. /// - pub fn commit(&mut self) -> anyhow::Result<()> { - let writer = self.tline.writer(); + pub async fn commit(&mut self) -> anyhow::Result<()> { + let writer = self.tline.writer().await; let lsn = self.lsn; let pending_nblocks = self.pending_nblocks; self.pending_nblocks = 0; @@ -1607,7 +1607,7 @@ pub async fn create_test_timeline( .await?; let mut m = tline.begin_modification(Lsn(8)); m.init_empty()?; - m.commit()?; + m.commit().await?; Ok(tline) } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 15c4207018..9352168778 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3306,12 +3306,12 @@ mod tests { .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) .await?; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; writer.finish_write(Lsn(0x10)); drop(writer); - let writer = tline.writer(); + let writer = tline.writer().await; writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -3376,7 +3376,7 @@ mod tests { let tline = tenant .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) .await?; - let writer = tline.writer(); + let writer = tline.writer().await; #[allow(non_snake_case)] let TEST_KEY_A: Key = Key::from_hex("112222222233333333444444445500000001").unwrap(); @@ -3403,7 +3403,7 @@ mod tests { .get_timeline(NEW_TIMELINE_ID, true) .await .expect("Should have a local timeline"); - let new_writer = newtline.writer(); + let new_writer = newtline.writer().await; new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?; new_writer.finish_write(Lsn(0x40)); @@ -3430,7 +3430,7 @@ mod tests { let mut lsn = start_lsn; #[allow(non_snake_case)] { - let writer = tline.writer(); + let writer = tline.writer().await; // Create a relation on the timeline writer.put( *TEST_KEY, @@ -3449,7 +3449,7 @@ mod tests { } tline.freeze_and_flush().await?; { - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( *TEST_KEY, lsn, @@ -3783,7 +3783,7 @@ mod tests { .create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx) .await?; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; writer.finish_write(Lsn(0x10)); drop(writer); @@ -3791,7 +3791,7 @@ mod tests { tline.freeze_and_flush().await?; tline.compact(&ctx).await?; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?; writer.finish_write(Lsn(0x20)); drop(writer); @@ -3799,7 +3799,7 @@ mod tests { tline.freeze_and_flush().await?; tline.compact(&ctx).await?; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?; writer.finish_write(Lsn(0x30)); drop(writer); @@ -3807,7 +3807,7 @@ mod tests { tline.freeze_and_flush().await?; tline.compact(&ctx).await?; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?; writer.finish_write(Lsn(0x40)); drop(writer); @@ -3859,7 +3859,7 @@ mod tests { for _ in 0..50 { for _ in 0..10000 { test_key.field6 = blknum; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( test_key, lsn, @@ -3909,7 +3909,7 @@ mod tests { for blknum in 0..NUM_KEYS { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( test_key, lsn, @@ -3927,7 +3927,7 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( test_key, lsn, @@ -3984,7 +3984,7 @@ mod tests { for blknum in 0..NUM_KEYS { lsn = Lsn(lsn.0 + 0x10); test_key.field6 = blknum as u32; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( test_key, lsn, @@ -4011,7 +4011,7 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( test_key, lsn, @@ -4078,7 +4078,7 @@ mod tests { lsn = Lsn(lsn.0 + 0x10); let blknum = thread_rng().gen_range(0..NUM_KEYS); test_key.field6 = blknum as u32; - let writer = tline.writer(); + let writer = tline.writer().await; writer.put( test_key, lsn, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 440a3dcd56..0908119b63 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -28,7 +28,7 @@ use std::ops::{Deref, Range}; use std::path::{Path, PathBuf}; use std::pin::pin; use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering}; -use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak}; +use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; use crate::context::{DownloadBehavior, RequestContext}; @@ -179,7 +179,7 @@ pub struct Timeline { /// Locked automatically by [`TimelineWriter`] and checkpointer. /// Must always be acquired before the layer map/individual layer lock /// to avoid deadlock. - write_lock: Mutex<()>, + write_lock: tokio::sync::Mutex<()>, /// Used to avoid multiple `flush_loop` tasks running flush_loop_state: Mutex, @@ -666,7 +666,7 @@ impl Timeline { /// Flush to disk all data that was written with the put_* functions #[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))] pub async fn freeze_and_flush(&self) -> anyhow::Result<()> { - self.freeze_inmem_layer(false); + self.freeze_inmem_layer(false).await; self.flush_frozen_layers_and_wait().await } @@ -846,10 +846,10 @@ impl Timeline { } /// Mutate the timeline with a [`TimelineWriter`]. - pub fn writer(&self) -> TimelineWriter<'_> { + pub async fn writer(&self) -> TimelineWriter<'_> { TimelineWriter { tl: self, - _write_guard: self.write_lock.lock().unwrap(), + _write_guard: self.write_lock.lock().await, } } @@ -883,7 +883,7 @@ impl Timeline { /// /// Also flush after a period of time without new data -- it helps /// safekeepers to regard pageserver as caught up and suspend activity. - pub fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { + pub async fn check_checkpoint_distance(self: &Arc) -> anyhow::Result<()> { let last_lsn = self.get_last_record_lsn(); let layers = self.layers.read().unwrap(); if let Some(open_layer) = &layers.open_layer { @@ -907,7 +907,7 @@ impl Timeline { last_freeze_ts.elapsed() ); - self.freeze_inmem_layer(true); + self.freeze_inmem_layer(true).await; self.last_freeze_at.store(last_lsn); *(self.last_freeze_ts.write().unwrap()) = Instant::now(); @@ -1381,7 +1381,7 @@ impl Timeline { layer_flush_start_tx, layer_flush_done_tx, - write_lock: Mutex::new(()), + write_lock: tokio::sync::Mutex::new(()), layer_removal_cs: Default::default(), gc_info: std::sync::RwLock::new(GcInfo { @@ -2627,13 +2627,13 @@ impl Timeline { self.last_record_lsn.advance(new_lsn); } - fn freeze_inmem_layer(&self, write_lock_held: bool) { + async fn freeze_inmem_layer(&self, write_lock_held: bool) { // Freeze the current open in-memory layer. It will be written to disk on next // iteration. let _write_guard = if write_lock_held { None } else { - Some(self.write_lock.lock().unwrap()) + Some(self.write_lock.lock().await) }; let mut layers = self.layers.write().unwrap(); if let Some(open_layer) = &layers.open_layer { @@ -4434,7 +4434,7 @@ fn layer_traversal_error(msg: String, path: Vec) -> PageRecon // but will cause large code changes. pub struct TimelineWriter<'a> { tl: &'a Timeline, - _write_guard: MutexGuard<'a, ()>, + _write_guard: tokio::sync::MutexGuard<'a, ()>, } impl Deref for TimelineWriter<'_> { diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 1cbed3416c..91ff60603a 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -313,12 +313,15 @@ pub(super) async fn handle_walreceiver_connection( } } - timeline.check_checkpoint_distance().with_context(|| { - format!( - "Failed to check checkpoint distance for timeline {}", - timeline.timeline_id - ) - })?; + timeline + .check_checkpoint_distance() + .await + .with_context(|| { + format!( + "Failed to check checkpoint distance for timeline {}", + timeline.timeline_id + ) + })?; if let Some(last_lsn) = status_update { let timeline_remote_consistent_lsn = diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index eb74f75846..fc0cc5c81e 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -333,7 +333,7 @@ impl<'a> WalIngest<'a> { // Now that this record has been fully handled, including updating the // checkpoint data, let the repository know that it is up-to-date to this LSN - modification.commit()?; + modification.commit().await?; Ok(()) } @@ -1200,7 +1200,7 @@ mod tests { let mut m = tline.begin_modification(Lsn(0x10)); m.put_checkpoint(ZERO_CHECKPOINT.clone())?; m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file - m.commit()?; + m.commit().await?; let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?; Ok(walingest) @@ -1217,22 +1217,22 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx) .await?; - m.commit()?; + m.commit().await?; let mut m = tline.begin_modification(Lsn(0x30)); walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx) .await?; - m.commit()?; + m.commit().await?; let mut m = tline.begin_modification(Lsn(0x40)); walingest .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx) .await?; - m.commit()?; + m.commit().await?; let mut m = tline.begin_modification(Lsn(0x50)); walingest .put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx) .await?; - m.commit()?; + m.commit().await?; assert_current_logical_size(&tline, Lsn(0x50)); @@ -1318,7 +1318,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, 2, &ctx) .await?; - m.commit()?; + m.commit().await?; assert_current_logical_size(&tline, Lsn(0x60)); // Check reported size and contents after truncation @@ -1360,7 +1360,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, 0, &ctx) .await?; - m.commit()?; + m.commit().await?; assert_eq!( tline .get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx) @@ -1373,7 +1373,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx) .await?; - m.commit()?; + m.commit().await?; assert_eq!( tline .get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx) @@ -1398,7 +1398,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx) .await?; - m.commit()?; + m.commit().await?; assert_eq!( tline .get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx) @@ -1435,7 +1435,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx) .await?; - m.commit()?; + m.commit().await?; // Check that rel exists and size is correct assert_eq!( @@ -1454,7 +1454,7 @@ mod tests { // Drop rel let mut m = tline.begin_modification(Lsn(0x30)); walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?; - m.commit()?; + m.commit().await?; // Check that rel is not visible anymore assert_eq!( @@ -1472,7 +1472,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx) .await?; - m.commit()?; + m.commit().await?; // Check that rel exists and size is correct assert_eq!( @@ -1509,7 +1509,7 @@ mod tests { .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx) .await?; } - m.commit()?; + m.commit().await?; // The relation was created at LSN 20, not visible at LSN 1 yet. assert_eq!( @@ -1554,7 +1554,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, 1, &ctx) .await?; - m.commit()?; + m.commit().await?; // Check reported size and contents after truncation assert_eq!( @@ -1603,7 +1603,7 @@ mod tests { .put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx) .await?; } - m.commit()?; + m.commit().await?; assert_eq!( tline @@ -1648,7 +1648,7 @@ mod tests { walingest .put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx) .await?; - m.commit()?; + m.commit().await?; } assert_current_logical_size(&tline, Lsn(lsn)); @@ -1664,7 +1664,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx) .await?; - m.commit()?; + m.commit().await?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?, RELSEG_SIZE @@ -1677,7 +1677,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx) .await?; - m.commit()?; + m.commit().await?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?, RELSEG_SIZE - 1 @@ -1693,7 +1693,7 @@ mod tests { walingest .put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx) .await?; - m.commit()?; + m.commit().await?; assert_eq!( tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?, size as BlockNumber