make TimelineWriter Send by using tokio::sync Mutex internally (#4477)

This is preliminary work for/from #4220 (async
`Layer::get_value_reconstruct_data`).

There, we want to switch `Timeline::layers` to be a
`tokio::sync::RwLock`.

That will require the `TimelineWriter` to become async, because at times
its functions need to lock `Timeline::layers` in order to freeze the
open layer.

While doing that, rustc complains that we're now holding
`Timeline::write_lock` across await points (lock order is that
`write_lock` must be acquired before `Timelines::layers`).

So, we need to switch it over to an async primitive.
This commit is contained in:
Christian Schwarz
2023-06-13 10:15:25 +02:00
committed by GitHub
parent 143fa0da42
commit 754ceaefac
6 changed files with 75 additions and 62 deletions

View File

@@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir(
{
pg_control = Some(control_file);
}
modification.flush()?;
modification.flush().await?;
}
}
// We're done importing all the data files.
modification.commit()?;
modification.commit().await?;
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;
@@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar(
// We found the pg_control file.
pg_control = Some(res);
}
modification.flush()?;
modification.flush().await?;
}
tokio_tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
@@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit()?;
modification.commit().await?;
Ok(())
}
@@ -594,7 +594,7 @@ async fn import_file(
// zenith.signal is not necessarily the last file, that we handle
// but it is ok to call `finish_write()`, because final `modification.commit()`
// will update lsn once more to the final one.
let writer = modification.tline.writer();
let writer = modification.tline.writer().await;
writer.finish_write(prev_lsn);
debug!("imported zenith signal {}", prev_lsn);

View File

@@ -1122,7 +1122,7 @@ impl<'a> DatadirModification<'a> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub fn flush(&mut self) -> anyhow::Result<()> {
pub async fn flush(&mut self) -> anyhow::Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
@@ -1130,7 +1130,7 @@ impl<'a> DatadirModification<'a> {
return Ok(());
}
let writer = self.tline.writer();
let writer = self.tline.writer().await;
// Flush relation and SLRU data blocks, keep metadata.
let mut result: anyhow::Result<()> = Ok(());
@@ -1157,8 +1157,8 @@ impl<'a> DatadirModification<'a> {
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub fn commit(&mut self) -> anyhow::Result<()> {
let writer = self.tline.writer();
pub async fn commit(&mut self) -> anyhow::Result<()> {
let writer = self.tline.writer().await;
let lsn = self.lsn;
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;

View File

@@ -1304,6 +1304,7 @@ impl Tenant {
.context("init_empty_test_timeline")?;
modification
.commit()
.await
.context("commit init_empty_test_timeline modification")?;
// Flush to disk so that uninit_tl's check for valid disk_consistent_lsn passes.
@@ -3580,12 +3581,12 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
writer.finish_write(Lsn(0x10));
drop(writer);
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
writer.finish_write(Lsn(0x20));
drop(writer);
@@ -3647,7 +3648,7 @@ mod tests {
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
let writer = tline.writer();
let writer = tline.writer().await;
#[allow(non_snake_case)]
let TEST_KEY_A: Key = Key::from_hex("112222222233333333444444445500000001").unwrap();
@@ -3673,7 +3674,7 @@ mod tests {
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
let new_writer = newtline.writer();
let new_writer = newtline.writer().await;
new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?;
new_writer.finish_write(Lsn(0x40));
@@ -3700,7 +3701,7 @@ mod tests {
let mut lsn = start_lsn;
#[allow(non_snake_case)]
{
let writer = tline.writer();
let writer = tline.writer().await;
// Create a relation on the timeline
writer.put(
*TEST_KEY,
@@ -3719,7 +3720,7 @@ mod tests {
}
tline.freeze_and_flush().await?;
{
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
*TEST_KEY,
lsn,
@@ -4046,7 +4047,7 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x08), DEFAULT_PG_VERSION, &ctx)
.await?;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
writer.finish_write(Lsn(0x10));
drop(writer);
@@ -4054,7 +4055,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline.compact(&ctx).await?;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
writer.finish_write(Lsn(0x20));
drop(writer);
@@ -4062,7 +4063,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline.compact(&ctx).await?;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?;
writer.finish_write(Lsn(0x30));
drop(writer);
@@ -4070,7 +4071,7 @@ mod tests {
tline.freeze_and_flush().await?;
tline.compact(&ctx).await?;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?;
writer.finish_write(Lsn(0x40));
drop(writer);
@@ -4122,7 +4123,7 @@ mod tests {
for _ in 0..50 {
for _ in 0..10000 {
test_key.field6 = blknum;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
@@ -4172,7 +4173,7 @@ mod tests {
for blknum in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = blknum as u32;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
@@ -4190,7 +4191,7 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
@@ -4247,7 +4248,7 @@ mod tests {
for blknum in 0..NUM_KEYS {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = blknum as u32;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
@@ -4273,7 +4274,7 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
@@ -4339,7 +4340,7 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer();
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
@@ -4415,6 +4416,7 @@ mod tests {
.context("init_empty_test_timeline")?;
modification
.commit()
.await
.context("commit init_empty_test_timeline modification")?;
// Do the flush. The flush code will check the expectations that we set above.

View File

@@ -28,7 +28,7 @@ use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::sync::atomic::{AtomicI64, Ordering as AtomicOrdering};
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Weak};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use crate::context::{DownloadBehavior, RequestContext};
@@ -185,7 +185,7 @@ pub struct Timeline {
/// Locked automatically by [`TimelineWriter`] and checkpointer.
/// Must always be acquired before the layer map/individual layer lock
/// to avoid deadlock.
write_lock: Mutex<()>,
write_lock: tokio::sync::Mutex<()>,
/// Used to avoid multiple `flush_loop` tasks running
pub(super) flush_loop_state: Mutex<FlushLoopState>,
@@ -689,7 +689,7 @@ impl Timeline {
/// Flush to disk all data that was written with the put_* functions
#[instrument(skip(self), fields(tenant_id=%self.tenant_id, timeline_id=%self.timeline_id))]
pub async fn freeze_and_flush(&self) -> anyhow::Result<()> {
self.freeze_inmem_layer(false);
self.freeze_inmem_layer(false).await;
self.flush_frozen_layers_and_wait().await
}
@@ -868,10 +868,10 @@ impl Timeline {
}
/// Mutate the timeline with a [`TimelineWriter`].
pub fn writer(&self) -> TimelineWriter<'_> {
pub async fn writer(&self) -> TimelineWriter<'_> {
TimelineWriter {
tl: self,
_write_guard: self.write_lock.lock().unwrap(),
_write_guard: self.write_lock.lock().await,
}
}
@@ -905,7 +905,7 @@ impl Timeline {
///
/// Also flush after a period of time without new data -- it helps
/// safekeepers to regard pageserver as caught up and suspend activity.
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let open_layer_size = {
let layers = self.layers.read().unwrap();
@@ -932,7 +932,7 @@ impl Timeline {
last_freeze_ts.elapsed()
);
self.freeze_inmem_layer(true);
self.freeze_inmem_layer(true).await;
self.last_freeze_at.store(last_lsn);
*(self.last_freeze_ts.write().unwrap()) = Instant::now();
@@ -1452,7 +1452,7 @@ impl Timeline {
layer_flush_start_tx,
layer_flush_done_tx,
write_lock: Mutex::new(()),
write_lock: tokio::sync::Mutex::new(()),
layer_removal_cs: Default::default(),
gc_info: std::sync::RwLock::new(GcInfo {
@@ -2732,13 +2732,13 @@ impl Timeline {
self.last_record_lsn.advance(new_lsn);
}
fn freeze_inmem_layer(&self, write_lock_held: bool) {
async fn freeze_inmem_layer(&self, write_lock_held: bool) {
// Freeze the current open in-memory layer. It will be written to disk on next
// iteration.
let _write_guard = if write_lock_held {
None
} else {
Some(self.write_lock.lock().unwrap())
Some(self.write_lock.lock().await)
};
let mut layers = self.layers.write().unwrap();
if let Some(open_layer) = &layers.open_layer {
@@ -4595,7 +4595,7 @@ fn layer_traversal_error(msg: String, path: Vec<TraversalPathItem>) -> PageRecon
// but will cause large code changes.
pub struct TimelineWriter<'a> {
tl: &'a Timeline,
_write_guard: MutexGuard<'a, ()>,
_write_guard: tokio::sync::MutexGuard<'a, ()>,
}
impl Deref for TimelineWriter<'_> {
@@ -4636,6 +4636,14 @@ impl<'a> TimelineWriter<'a> {
}
}
// We need TimelineWriter to be send in upcoming conversion of
// Timeline::layers to tokio::sync::RwLock.
#[test]
fn is_send() {
fn _assert_send<T: Send>() {}
_assert_send::<TimelineWriter<'_>>();
}
/// Add a suffix to a layer file's name: .{num}.old
/// Uses the first available num (starts at 0)
fn rename_to_backup(path: &Path) -> anyhow::Result<()> {

View File

@@ -304,12 +304,15 @@ pub(super) async fn handle_walreceiver_connection(
}
}
timeline.check_checkpoint_distance().with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
timeline
.check_checkpoint_distance()
.await
.with_context(|| {
format!(
"Failed to check checkpoint distance for timeline {}",
timeline.timeline_id
)
})?;
if let Some(last_lsn) = status_update {
let timeline_remote_consistent_lsn =

View File

@@ -333,7 +333,7 @@ impl<'a> WalIngest<'a> {
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit()?;
modification.commit().await?;
Ok(())
}
@@ -1199,7 +1199,7 @@ mod tests {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
m.commit()?;
m.commit().await?;
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
Ok(walingest)
@@ -1218,22 +1218,22 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x30));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x40));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x50));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -1319,7 +1319,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
@@ -1361,7 +1361,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
@@ -1374,7 +1374,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
@@ -1399,7 +1399,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
@@ -1438,7 +1438,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1457,7 +1457,7 @@ mod tests {
// Drop rel
let mut m = tline.begin_modification(Lsn(0x30));
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
m.commit()?;
m.commit().await?;
// Check that rel is not visible anymore
assert_eq!(
@@ -1475,7 +1475,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1514,7 +1514,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit()?;
m.commit().await?;
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
@@ -1559,7 +1559,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
.await?;
m.commit()?;
m.commit().await?;
// Check reported size and contents after truncation
assert_eq!(
@@ -1608,7 +1608,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit()?;
m.commit().await?;
assert_eq!(
tline
@@ -1655,7 +1655,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
.await?;
m.commit()?;
m.commit().await?;
}
assert_current_logical_size(&tline, Lsn(lsn));
@@ -1671,7 +1671,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE
@@ -1684,7 +1684,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE - 1
@@ -1700,7 +1700,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
.await?;
m.commit()?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
size as BlockNumber