(does not compile): make TimelineWriter Send by using tokio::sync Mutex internally

fails with

cs@devvm:[~/src/neon]: cargo check -p pageserver  --features testing
    Checking pageserver v0.1.0 (/home/cs/src/neon/pageserver)
error: future cannot be sent between threads safely
   --> pageserver/src/tenant/timeline/walreceiver/connection_manager.rs:426:33
    |
426 |         let connection_handle = TaskHandle::spawn(move |events_sender, cancellation| {
    |                                 ^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `Instrumented<[async block@pageserver/src/tenant/timeline/walreceiver/connection_manager.rs:427:13: 439:14]>`, the trait `std::marker::Send` is not implemented for `std::sync::RwLockReadGuard<'_, LayerMap<dyn PersistentLayer>>`
note: future is not `Send` as this value is used across an await
   --> pageserver/src/tenant/timeline.rs:872:46
    |
850 |         let layers = self.layers.read().unwrap();
    |             ------ has type `std::sync::RwLockReadGuard<'_, LayerMap<dyn PersistentLayer>>` which is not `Send`
...
872 |                 self.freeze_inmem_layer(true).await;
    |                                              ^^^^^^ await occurs here, with `layers` maybe used later
...
881 |     }
    |     - `layers` is later dropped here
note: required by a bound in `TaskHandle::<E>::spawn`
   --> pageserver/src/tenant/timeline/walreceiver.rs:196:52
    |
192 |     fn spawn<Fut>(
    |        ----- required by a bound in this
...
196 |         Fut: Future<Output = anyhow::Result<()>> + Send,
    |                                                    ^^^^ required by this bound in `TaskHandle::<E>::spawn`

error: could not compile `pageserver` due to previous error
This commit is contained in:
Christian Schwarz
2023-05-12 10:53:57 +02:00
parent a1680b185f
commit 7de3799e66
6 changed files with 66 additions and 63 deletions

View File

@@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir(
{
pg_control = Some(control_file);
}
modification.flush()?;
modification.flush().await?;
}
}
// We're done importing all the data files.
modification.commit()?;
modification.commit().await?;
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;
@@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar(
// We found the pg_control file.
pg_control = Some(res);
}
modification.flush()?;
modification.flush().await?;
}
tokio_tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
@@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit()?;
modification.commit().await?;
Ok(())
}
@@ -594,7 +594,7 @@ async fn import_file(
// zenith.signal is not necessarily the last file, that we handle
// but it is ok to call `finish_write()`, because final `modification.commit()`
// will update lsn once more to the final one.
let writer = modification.tline.writer();
let writer = modification.tline.writer().await;
writer.finish_write(prev_lsn);
debug!("imported zenith signal {}", prev_lsn);

View File

@@ -1108,7 +1108,7 @@ impl<'a> DatadirModification<'a> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub fn flush(&mut self) -> anyhow::Result<()> {
pub async fn flush(&mut self) -> anyhow::Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
@@ -1116,7 +1116,7 @@ impl<'a> DatadirModification<'a> {
return Ok(());
}
let writer = self.tline.writer();
let writer = self.tline.writer().await;
let mut layer_map = self.tline.layers.write().unwrap();
@@ -1145,8 +1145,8 @@ impl<'a> DatadirModification<'a> {
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub fn commit(&mut self) -> anyhow::Result<()> {
let writer = self.tline.writer();
pub async fn commit(&mut self) -> anyhow::Result<()> {
let writer = self.tline.writer().await;
let lsn = self.lsn;
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
@@ -1607,7 +1607,7 @@ pub async fn create_test_timeline(
.await?;
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit()?;
m.commit().await?;
Ok(tline)
}

View File

@@ -3306,12 +3306,12 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
writer.finish_write(Lsn(0x10));
drop(writer);
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
writer.finish_write(Lsn(0x20));
drop(writer);
@@ -3376,7 +3376,7 @@ mod tests {
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
let writer = tline.writer();
let writer = tline.writer().await;
#[allow(non_snake_case)]
let TEST_KEY_A: Key = Key::from_hex("112222222233333333444444445500000001").unwrap();
@@ -3403,7 +3403,7 @@ mod tests {
.get_timeline(NEW_TIMELINE_ID, true)
.await
.expect("Should have a local timeline");
let new_writer = newtline.writer();
let new_writer = newtline.writer().await;
new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?;
new_writer.finish_write(Lsn(0x40));
@@ -3430,7 +3430,7 @@ mod tests {
let mut lsn = start_lsn;
#[allow(non_snake_case)]
{
let writer = tline.writer();
let writer = tline.writer().await;
// Create a relation on the timeline
writer.put(
*TEST_KEY,
@@ -3449,7 +3449,7 @@ mod tests {
}
tline.freeze_and_flush().await?;
{
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
*TEST_KEY,
lsn,
@@ -3783,7 +3783,7 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION, &ctx)
.await?;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
writer.finish_write(Lsn(0x10));
drop(writer);
@@ -3791,7 +3791,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline.compact(&ctx).await?;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
writer.finish_write(Lsn(0x20));
drop(writer);
@@ -3799,7 +3799,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline.compact(&ctx).await?;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?;
writer.finish_write(Lsn(0x30));
drop(writer);
@@ -3807,7 +3807,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline.compact(&ctx).await?;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?;
writer.finish_write(Lsn(0x40));
drop(writer);
@@ -3859,7 +3859,7 @@ mod tests {
for _ in 0..50 {
for _ in 0..10000 {
test_key.field6 = blknum;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
@@ -3909,7 +3909,7 @@ mod tests {
for blknum in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = blknum as u32;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
@@ -3927,7 +3927,7 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
@@ -3984,7 +3984,7 @@ mod tests {
for blknum in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = blknum as u32;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
@@ -4011,7 +4011,7 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
@@ -4078,7 +4078,7 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
test_key,
lsn,

View File

@@ -28,7 +28,7 @@ use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::context::{DownloadBehavior, RequestContext};
@@ -179,7 +179,7 @@ pub struct Timeline {
/// Locked automatically by [`TimelineWriter`] and checkpointer.
/// Must always be acquired before the layer map/individual layer lock
/// to avoid deadlock.
write_lock: Mutex<()>,
write_lock: tokio::sync::Mutex<()>,
/// Used to avoid multiple `flush_loop` tasks running
flush_loop_state: Mutex<FlushLoopState>,
@@ -666,7 +666,7 @@ impl Timeline {
/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
self.freeze_inmem_layer(false);
self.freeze_inmem_layer(false).await;
self.flush_frozen_layers_and_wait().await
}
@@ -846,10 +846,10 @@ impl Timeline {
}
/// Mutate the timeline with a [`TimelineWriter`].
pub fn writer(&self) -> TimelineWriter<'_> {
pub async fn writer(&self) -> TimelineWriter<'_> {
TimelineWriter {
tl: self,
_write_guard: self.write_lock.lock().unwrap(),
_write_guard: self.write_lock.lock().await,
}
}
@@ -883,7 +883,7 @@ impl Timeline {
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
if let Some(open_layer) = &layers.open_layer {
@@ -907,7 +907,7 @@ impl Timeline {
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.freeze_inmem_layer(true).await;
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
@@ -1381,7 +1381,7 @@ impl Timeline {
layer_flush_start_tx,
layer_flush_done_tx,
write_lock: Mutex::new(()),
write_lock: tokio::sync::Mutex::new(()),
layer_removal_cs: Default::default(),
gc_info: std::sync::RwLock::new(GcInfo {
@@ -2627,13 +2627,13 @@ impl Timeline {
self.last_record_lsn.advance(new_lsn);
}
fn freeze_inmem_layer(&self, write_lock_held: bool) {
async fn freeze_inmem_layer(&self, write_lock_held: bool) {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.
let _write_guard = if write_lock_held {
None
} else {
Some(self.write_lock.lock().unwrap())
Some(self.write_lock.lock().await)
};
let mut layers = self.layers.write().unwrap();
if let Some(open_layer) = &layers.open_layer {
@@ -4434,7 +4434,7 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
// but will cause large code changes.
pub struct TimelineWriter<'a> {
tl: &'a Timeline,
_write_guard: MutexGuard<'a, ()>,
_write_guard: tokio::sync::MutexGuard<'a, ()>,
}
impl Deref for TimelineWriter<'_> {

View File

@@ -313,12 +313,15 @@ pub(super) async fn handle_walreceiver_connection(
}
}
timeline.check_checkpoint_distance().with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
timeline
.check_checkpoint_distance()
.await
.with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
if let Some(last_lsn) = status_update {
let timeline_remote_consistent_lsn =

View File

@@ -333,7 +333,7 @@ impl<'a> WalIngest<'a> {
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit()?;
modification.commit().await?;
Ok(())
}
@@ -1200,7 +1200,7 @@ mod tests {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
m.commit()?;
m.commit().await?;
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
Ok(walingest)
@@ -1217,22 +1217,22 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x30));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x40));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x50));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -1318,7 +1318,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
@@ -1360,7 +1360,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
@@ -1373,7 +1373,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
@@ -1398,7 +1398,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
@@ -1435,7 +1435,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1454,7 +1454,7 @@ mod tests {
// Drop rel
let mut m = tline.begin_modification(Lsn(0x30));
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
m.commit()?;
m.commit().await?;
// Check that rel is not visible anymore
assert_eq!(
@@ -1472,7 +1472,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1509,7 +1509,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit()?;
m.commit().await?;
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
@@ -1554,7 +1554,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check reported size and contents after truncation
assert_eq!(
@@ -1603,7 +1603,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit()?;
m.commit().await?;
assert_eq!(
tline
@@ -1648,7 +1648,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
.await?;
m.commit()?;
m.commit().await?;
}
assert_current_logical_size(&tline, Lsn(lsn));
@@ -1664,7 +1664,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE
@@ -1677,7 +1677,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE - 1
@@ -1693,7 +1693,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
size as BlockNumber