turn Timeline::layers into tokio::sync::RwLock

This commit is contained in:
Christian Schwarz
2023-05-26 18:49:39 +02:00
parent 20a9356729
commit c1d9cb88ae
8 changed files with 229 additions and 172 deletions

View File

@@ -512,7 +512,7 @@ async fn collect_eviction_candidates(
if !tl.is_active() {
continue;
}
let info = tl.get_local_layers_for_disk_usage_eviction();
let info = tl.get_local_layers_for_disk_usage_eviction().await;
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
tenant_candidates.extend(
info.resident_layers

View File

@@ -212,7 +212,7 @@ async fn build_timeline_info(
) -> anyhow::Result<TimelineInfo> {
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
let mut info = build_timeline_info_common(timeline, ctx)?;
let mut info = build_timeline_info_common(timeline, ctx).await?;
if include_non_incremental_logical_size {
// XXX we should be using spawn_ondemand_logical_size_calculation here.
// Otherwise, if someone deletes the timeline / detaches the tenant while
@@ -230,7 +230,7 @@ async fn build_timeline_info(
Ok(info)
}
fn build_timeline_info_common(
async fn build_timeline_info_common(
timeline: &Arc<Timeline>,
ctx: &RequestContext,
) -> anyhow::Result<TimelineInfo> {
@@ -261,7 +261,7 @@ fn build_timeline_info_common(
None
}
};
let current_physical_size = Some(timeline.layer_size_sum());
let current_physical_size = Some(timeline.layer_size_sum().await);
let state = timeline.current_state();
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
@@ -321,6 +321,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
.await
.map_err(ApiError::InternalServerError)?;
json_response(StatusCode::CREATED, timeline_info)
}
@@ -551,7 +552,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
// Calculate total physical size of all timelines
let mut current_physical_size = 0;
for timeline in tenant.list_timelines().iter() {
current_physical_size += timeline.layer_size_sum();
current_physical_size += timeline.layer_size_sum().await;
}
let state = tenant.current_state();
@@ -655,7 +656,7 @@ async fn layer_map_info_handler(request: Request<Body>) -> Result<Response<Body>
check_permission(&request, Some(tenant_id))?;
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
let layer_map_info = timeline.layer_map_info(reset);
let layer_map_info = timeline.layer_map_info(reset).await;
json_response(StatusCode::OK, layer_map_info)
}

View File

@@ -565,7 +565,7 @@ impl PageServerHandler {
// since we discard some log files.
info!("done, activating timeline");
real_timeline_not_in_tenants_map.activate(self.broker_client.clone(), &ctx);
real_timeline_not_in_tenants_map.activate(self.broker_client.clone(), &ctx).await;
Ok(())
}

View File

@@ -1118,7 +1118,7 @@ impl<'a> DatadirModification<'a> {
let writer = self.tline.writer().await;
let mut layer_map = self.tline.layers.write().unwrap();
let mut layer_map = self.tline.layers.write().await;
// Flush relation and SLRU data blocks, keep metadata.
let mut result: anyhow::Result<()> = Ok(());
@@ -1152,10 +1152,10 @@ impl<'a> DatadirModification<'a> {
self.pending_nblocks = 0;
for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, &value)?;
writer.put(key, lsn, &value).await?;
}
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn)?;
writer.delete(key_range, lsn).await?;
}
writer.finish_write(lsn);

View File

@@ -541,6 +541,7 @@ impl Tenant {
// "Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn and cannot be initialized");
timeline
.load_layer_map(new_disk_consistent_lsn)
.await
.with_context(|| {
format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}")
})?;
@@ -578,7 +579,7 @@ impl Tenant {
|| timeline
.layers
.read()
.unwrap()
.await
.iter_historic_layers()
.next()
.is_some(),
@@ -592,7 +593,7 @@ impl Tenant {
let has_layers = timeline
.layers
.read()
.unwrap()
.await
.iter_historic_layers()
.next()
.is_some();
@@ -664,7 +665,7 @@ impl Tenant {
match tenant_clone.attach(&ctx).await {
Ok(()) => {
info!("attach finished, activating");
tenant_clone.activate(broker_client, &ctx);
tenant_clone.activate(broker_client, &ctx).await;
}
Err(e) => {
error!("attach failed, setting tenant state to Broken: {:?}", e);
@@ -962,7 +963,7 @@ impl Tenant {
match tenant_clone.load(cause, &ctx).await {
Ok(()) => {
info!("load finished, activating");
tenant_clone.activate(broker_client, &ctx);
tenant_clone.activate(broker_client, &ctx).await;
}
Err(err) => {
error!("load failed, setting tenant state to Broken: {err:?}");
@@ -1312,7 +1313,6 @@ impl Tenant {
.context("wait for initial uploads to complete")?;
}
// XXX do we need to remove uninit mark before starting uploads?
// If we die with uninit mark present, we'll leak the uploaded state in S3.
Ok(())
};
@@ -1404,19 +1404,19 @@ impl Tenant {
.context("creation_complete_remove_uninit_marker_and_get_placeholder_timeline")?;
match self.timelines.lock().unwrap().entry(new_timeline_id) {
Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"),
Entry::Occupied(mut o) => {
info!("replacing placeholder timeline with the real one");
assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating);
assert!(compare_arced_timeline(&placeholder_timeline, o.get()));
let replaced_placeholder = o.insert(Arc::clone(&real_timeline));
assert!(compare_arced_timeline(&replaced_placeholder, &placeholder_timeline));
},
}
Entry::Vacant(_) => unreachable!("we created a placeholder earlier, and load_local_timeline should have inserted the real timeline"),
Entry::Occupied(mut o) => {
info!("replacing placeholder timeline with the real one");
assert_eq!(placeholder_timeline.current_state(), TimelineState::Creating);
assert!(compare_arced_timeline(&placeholder_timeline, o.get()));
let replaced_placeholder = o.insert(Arc::clone(&real_timeline));
assert!(compare_arced_timeline(&replaced_placeholder, &placeholder_timeline));
},
}
// The non-test code would call tl.activate() here.
real_timeline.maybe_spawn_flush_loop();
real_timeline.set_state(TimelineState::Active);
real_timeline.set_state(TimelineState::Active).await;
Ok(real_timeline)
}
@@ -1605,7 +1605,7 @@ impl Tenant {
},
}
real_timeline.activate(broker_client, ctx);
real_timeline.activate(broker_client, ctx).await;
Ok(Some(real_timeline))
}
@@ -1736,17 +1736,16 @@ impl Tenant {
};
let timeline = Arc::clone(timeline_entry.get());
if timeline.current_state() == TimelineState::Creating {
return Err(DeleteTimelineError::Other(anyhow::anyhow!(
"timeline is creating"
)));
}
timeline.set_state(TimelineState::Stopping);
drop(timelines);
timeline
};
if timeline.current_state() == TimelineState::Creating {
return Err(DeleteTimelineError::Other(anyhow::anyhow!(
"timeline is creating"
)));
}
timeline.set_state(TimelineState::Stopping).await;
// Now that the Timeline is in Stopping state, request all the related tasks to
// shut down.
//
@@ -1917,7 +1916,7 @@ impl Tenant {
}
/// Changes tenant status to active, unless shutdown was already requested.
fn activate(self: &Arc<Self>, broker_client: BrokerClientChannel, ctx: &RequestContext) {
async fn activate(self: &Arc<Self>, broker_client: BrokerClientChannel, ctx: &RequestContext) {
debug_assert_current_span_has_tenant_id();
let mut activating = false;
@@ -1937,10 +1936,14 @@ impl Tenant {
});
if activating {
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
let not_broken_timelines = {
let timelines_accessor = self.timelines.lock().unwrap();
timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken)
.cloned()
.collect::<Vec<_>>()
};
// Spawn gc and compaction loops. The loops will shut themselves
// down when they notice that the tenant is inactive.
@@ -1948,8 +1951,8 @@ impl Tenant {
let mut activated_timelines = 0;
for timeline in not_broken_timelines {
timeline.activate(broker_client.clone(), ctx);
for timeline in &not_broken_timelines {
timeline.activate(broker_client.clone(), ctx).await;
activated_timelines += 1;
}
@@ -1962,7 +1965,7 @@ impl Tenant {
*current_state = TenantState::Active;
let elapsed = self.loading_started_at.elapsed();
let total_timelines = timelines_accessor.len();
let total_timelines = not_broken_timelines.len();
// log a lot of stuff, because some tenants sometimes suffer from user-visible
// times to activate. see https://github.com/neondatabase/neon/issues/4025
@@ -2039,12 +2042,16 @@ impl Tenant {
),
}
let timelines_accessor = self.timelines.lock().unwrap();
let not_broken_timelines = timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken);
let not_broken_timelines = {
let timelines_accessor = self.timelines.lock().unwrap();
timelines_accessor
.values()
.filter(|timeline| timeline.current_state() != TimelineState::Broken)
.cloned()
.collect::<Vec<_>>()
};
for timeline in not_broken_timelines {
timeline.set_state(TimelineState::Stopping);
timeline.set_state(TimelineState::Stopping).await;
}
Ok(())
}
@@ -2849,7 +2856,7 @@ impl Tenant {
},
}
real_timeline.set_state(TimelineState::Active);
real_timeline.set_state(TimelineState::Active).await;
real_timeline.maybe_spawn_flush_loop();
Ok(real_timeline)
}
@@ -3061,11 +3068,7 @@ impl Tenant {
.create_timeline_data(timeline_id, &new_metadata, None, remote_client.clone())
.context("Failed to create timeline data structure")?;
unfinished_timeline
.layers
.write()
.unwrap()
.next_open_layer_at = Some(pgdata_lsn); // pgdata_lsn == initdb_lsn
unfinished_timeline.layers.write().await.next_open_layer_at = Some(pgdata_lsn); // pgdata_lsn == initdb_lsn
import_datadir::import_timeline_from_postgres_datadir(
&unfinished_timeline,
@@ -3110,7 +3113,7 @@ impl Tenant {
}
// XXX this is same shutdown code as in Timeline::delete, share it.
unfinished_timeline.set_state(TimelineState::Stopping);
unfinished_timeline.set_state(TimelineState::Stopping).await;
task_mgr::shutdown_tasks(None, Some(self.tenant_id), Some(timeline_id)).await;
// XXX log message is a little too early, see caller for context
@@ -3645,8 +3648,15 @@ pub mod harness {
.instrument(info_span!("try_load", tenant_id=%self.tenant_id))
.await?;
tenant.state.send_replace(TenantState::Active);
for timeline in tenant.timelines.lock().unwrap().values() {
timeline.set_state(TimelineState::Active);
let timelines = tenant
.timelines
.lock()
.unwrap()
.values()
.cloned()
.collect::<Vec<_>>();
for timeline in timelines {
timeline.set_state(TimelineState::Active).await;
}
Ok(tenant)
}
@@ -3710,12 +3720,16 @@ mod tests {
.await?;
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
writer
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
.await?;
writer.finish_write(Lsn(0x10));
drop(writer);
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
writer
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
.await?;
writer.finish_write(Lsn(0x20));
drop(writer);
@@ -3787,13 +3801,21 @@ mod tests {
let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap();
// Insert a value on the timeline
writer.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))?;
writer.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))?;
writer
.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))
.await?;
writer
.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))
.await?;
writer.finish_write(Lsn(0x20));
writer.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))?;
writer
.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))
.await?;
writer.finish_write(Lsn(0x30));
writer.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))?;
writer
.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))
.await?;
writer.finish_write(Lsn(0x40));
//assert_current_logical_size(&tline, Lsn(0x40));
@@ -3806,7 +3828,9 @@ mod tests {
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
let new_writer = newtline.writer().await;
new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?;
new_writer
.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))
.await?;
new_writer.finish_write(Lsn(0x40));
// Check page contents on both branches
@@ -3834,36 +3858,44 @@ mod tests {
{
let writer = tline.writer().await;
// Create a relation on the timeline
writer.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer
.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.finish_write(lsn);
lsn += 0x10;
writer.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer
.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.finish_write(lsn);
lsn += 0x10;
}
tline.freeze_and_flush().await?;
{
let writer = tline.writer().await;
writer.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer
.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.finish_write(lsn);
lsn += 0x10;
writer.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)?;
writer
.put(
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
)
.await?;
writer.finish_write(lsn);
}
tline.freeze_and_flush().await
@@ -3975,7 +4007,7 @@ mod tests {
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
tline.set_state(TimelineState::Broken);
tline.set_state(TimelineState::Broken).await;
tenant
.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, &ctx)
@@ -4100,7 +4132,7 @@ mod tests {
let child_tline = tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
.await?;
child_tline.set_state(TimelineState::Active);
child_tline.set_state(TimelineState::Active).await;
let newtline = tenant
.get_timeline(NEW_TIMELINE_ID, true)
@@ -4175,7 +4207,9 @@ mod tests {
.await?;
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
writer
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
.await?;
writer.finish_write(Lsn(0x10));
drop(writer);
@@ -4183,7 +4217,9 @@ mod tests {
tline.compact(&ctx).await?;
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
writer
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
.await?;
writer.finish_write(Lsn(0x20));
drop(writer);
@@ -4191,7 +4227,9 @@ mod tests {
tline.compact(&ctx).await?;
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?;
writer
.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))
.await?;
writer.finish_write(Lsn(0x30));
drop(writer);
@@ -4199,7 +4237,9 @@ mod tests {
tline.compact(&ctx).await?;
let writer = tline.writer().await;
writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?;
writer
.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))
.await?;
writer.finish_write(Lsn(0x40));
drop(writer);
@@ -4251,11 +4291,13 @@ mod tests {
for _ in 0..10000 {
test_key.field6 = blknum;
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
writer.finish_write(lsn);
drop(writer);
@@ -4301,11 +4343,13 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
writer.finish_write(lsn);
updated[blknum] = lsn;
drop(writer);
@@ -4319,11 +4363,13 @@ mod tests {
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
writer.finish_write(lsn);
drop(writer);
updated[blknum] = lsn;
@@ -4376,11 +4422,13 @@ mod tests {
lsn = Lsn(lsn.0 + 0x10);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
writer.finish_write(lsn);
updated[blknum] = lsn;
drop(writer);
@@ -4402,11 +4450,13 @@ mod tests {
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)?;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
)
.await?;
println!("updating {} at {}", blknum, lsn);
writer.finish_write(lsn);
drop(writer);
@@ -4468,11 +4518,13 @@ mod tests {
let blknum = thread_rng().gen_range(0..NUM_KEYS);
test_key.field6 = blknum as u32;
let writer = tline.writer().await;
writer.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
)?;
writer
.put(
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
)
.await?;
println!("updating [{}][{}] at {}", idx, blknum, lsn);
writer.finish_write(lsn);
drop(writer);

View File

@@ -304,7 +304,7 @@ impl InMemoryLayer {
Ok(())
}
pub fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
// TODO: Currently, we just leak the storage for any deleted keys
Ok(())

View File

@@ -119,7 +119,7 @@ pub struct Timeline {
pub pg_version: u32,
pub(crate) layers: RwLock<LayerMap<dyn PersistentLayer>>,
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -238,7 +238,7 @@ pub struct Timeline {
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
}
type LayerMapWriteLockGuard<'t> = std::sync::RwLockWriteGuard<'t, LayerMap<dyn PersistentLayer>>;
type LayerMapWriteLockGuard<'t> = tokio::sync::RwLockWriteGuard<'t, LayerMap<dyn PersistentLayer>>;
/// Internal structure to hold all data needed for logical size calculation.
///
@@ -574,8 +574,8 @@ impl Timeline {
/// The sum of the file size of all historic layers in the layer map.
/// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**.
pub fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().unwrap();
pub async fn layer_size_sum(&self) -> u64 {
let layer_map = self.layers.read().await;
let mut size = 0;
for l in layer_map.iter_historic_layers() {
size += l.file_size();
@@ -669,7 +669,7 @@ impl Timeline {
if self.current_state() == TimelineState::Creating {
debug!("timelines in Creating state are never written to");
assert!(
self.layers.read().unwrap().open_layer.is_none(),
self.layers.read().await.open_layer.is_none(),
"would have nothing to flush anyways"
);
return Ok(());
@@ -898,7 +898,7 @@ impl Timeline {
/// safekeepers to regard pageserver as caught up and suspend activity.
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
let last_lsn = self.get_last_record_lsn();
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
if let Some(open_layer) = &layers.open_layer {
let open_layer_size = open_layer.size()?;
drop(layers);
@@ -931,17 +931,17 @@ impl Timeline {
Ok(())
}
pub fn activate(self: &Arc<Self>, broker_client: BrokerClientChannel, ctx: &RequestContext) {
pub async fn activate(self: &Arc<Self>, broker_client: BrokerClientChannel, ctx: &RequestContext) {
if self.current_state() == TimelineState::Creating {
panic!("timelines in Creating state are never activated");
}
self.maybe_spawn_flush_loop();
self.launch_wal_receiver(ctx, broker_client);
self.set_state(TimelineState::Active);
self.set_state(TimelineState::Active).await;
self.launch_eviction_task();
}
pub fn set_state(&self, new_state: TimelineState) {
pub async fn set_state(&self, new_state: TimelineState) {
if self.current_state() == TimelineState::Creating {
info!("timelines in Creating state are never activated, nothing to stop");
assert_eq!(
@@ -949,7 +949,7 @@ impl Timeline {
FlushLoopState::NotStarted
);
assert!(
self.layers.read().unwrap().open_layer.is_none(),
self.layers.read().await.open_layer.is_none(),
"would have nothing to flush anyways"
);
assert!(self.walreceiver.lock().unwrap().is_none());
@@ -1020,8 +1020,8 @@ impl Timeline {
}
}
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().unwrap();
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
let layer_map = self.layers.read().await;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
in_memory_layers.push(open_layer.info());
@@ -1043,7 +1043,7 @@ impl Timeline {
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
if self.remote_client.is_none() {
return Ok(Some(false));
@@ -1056,7 +1056,7 @@ impl Timeline {
/// Like [`evict_layer_batch`], but for just one layer.
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
let remote_client = self
.remote_client
.as_ref()
@@ -1141,7 +1141,7 @@ impl Timeline {
}
// start the batch update
let mut layer_map = self.layers.write().unwrap();
let mut layer_map = self.layers.write().await;
let mut batch_updates = layer_map.batch_update();
let mut results = Vec::with_capacity(layers_to_evict.len());
@@ -1389,7 +1389,7 @@ impl Timeline {
timeline_id,
tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()),
layers: tokio::sync::RwLock::new(LayerMap::default()),
wanted_image_layers: Mutex::new(None),
walredo_mgr,
@@ -1590,8 +1590,8 @@ impl Timeline {
/// Scan the timeline directory to populate the layer map.
/// Returns all timeline-related files that were found and loaded.
///
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().unwrap();
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let mut num_layers = 0;
@@ -1720,7 +1720,7 @@ impl Timeline {
// We're holding a layer map lock for a while but this
// method is only called during init so it's fine.
let mut layer_map = self.layers.write().unwrap();
let mut layer_map = self.layers.write().await;
let mut updates = layer_map.batch_update();
for remote_layer_name in &index_part.timeline_layers {
let local_layer = local_only_layers.remove(remote_layer_name);
@@ -1873,7 +1873,7 @@ impl Timeline {
let local_layers = self
.layers
.read()
.unwrap()
.await
.iter_historic_layers()
.map(|l| (l.filename(), l))
.collect::<HashMap<_, _>>();
@@ -2232,8 +2232,8 @@ impl Timeline {
}
}
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
for historic_layer in self.layers.read().await.iter_historic_layers() {
let historic_layer_name = historic_layer.filename().file_name();
if layer_file_name == historic_layer_name {
return Some(historic_layer);
@@ -2440,7 +2440,7 @@ impl Timeline {
#[allow(clippy::never_loop)] // see comment at bottom of this loop
'layer_map_search: loop {
let remote_layer = {
let layers = timeline.layers.read().unwrap();
let layers = timeline.layers.read().await;
// Check the open and frozen in-memory layers first, in order from newest
// to oldest.
@@ -2619,8 +2619,8 @@ impl Timeline {
///
/// Get a handle to the latest layer for appending.
///
fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().unwrap();
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut layers = self.layers.write().await;
self.get_layer_for_write_locked(lsn, &mut layers)
}
@@ -2671,9 +2671,9 @@ impl Timeline {
Ok(layer)
}
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write(lsn)?;
let layer = self.get_layer_for_write(lsn).await?;
layer.put_value(key, lsn, val)?;
Ok(())
}
@@ -2691,9 +2691,9 @@ impl Timeline {
Ok(())
}
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
let layer = self.get_layer_for_write(lsn)?;
layer.put_tombstone(key_range, lsn)?;
async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
let layer = self.get_layer_for_write(lsn).await?;
layer.put_tombstone(key_range, lsn).await?;
Ok(())
}
@@ -2713,7 +2713,7 @@ impl Timeline {
} else {
Some(self.write_lock.lock().await)
};
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
if let Some(open_layer) = &layers.open_layer {
let open_layer_rc = Arc::clone(open_layer);
// Does this layer need freezing?
@@ -2751,7 +2751,7 @@ impl Timeline {
let flush_counter = *layer_flush_start_rx.borrow();
let result = loop {
let layer_to_flush = {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
layers.frozen_layers.front().cloned()
// drop 'layers' lock to allow concurrent reads and writes
};
@@ -2843,7 +2843,7 @@ impl Timeline {
.await?
} else {
// normal case, write out a L0 delta layer file.
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?;
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?;
HashMap::from([(delta_path, metadata)])
};
@@ -2852,7 +2852,7 @@ impl Timeline {
// The new on-disk layers are now in the layer map. We can remove the
// in-memory layer from the map now.
{
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let l = layers.frozen_layers.pop_front();
// Only one thread may call this function at a time (for this
@@ -2946,7 +2946,7 @@ impl Timeline {
}
// Write out the given frozen in-memory layer as a new L0 delta file
fn create_delta_layer(
async fn create_delta_layer(
&self,
frozen_layer: &InMemoryLayer,
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
@@ -2970,7 +2970,7 @@ impl Timeline {
// Add it to the layer map
let l = Arc::new(new_delta);
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut batch_updates = layers.batch_update();
l.access_stats().record_residence_event(
&batch_updates,
@@ -3022,10 +3022,14 @@ impl Timeline {
}
// Is it time to create a new image layer for the given partition?
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
async fn time_for_new_image_layer(
&self,
partition: &KeySpace,
lsn: Lsn,
) -> anyhow::Result<bool> {
let threshold = self.get_image_creation_threshold();
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut max_deltas = 0;
{
@@ -3120,7 +3124,7 @@ impl Timeline {
for partition in partitioning.parts.iter() {
let img_range = start..partition.ranges.last().unwrap().end;
start = img_range.end;
if force || self.time_for_new_image_layer(partition, lsn)? {
if force || self.time_for_new_image_layer(partition, lsn).await? {
let mut image_layer_writer = ImageLayerWriter::new(
self.conf,
self.timeline_id,
@@ -3198,7 +3202,7 @@ impl Timeline {
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
for l in image_layers {
@@ -3265,7 +3269,7 @@ impl Timeline {
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut level0_deltas = layers.get_level0_deltas()?;
// Only compact if enough layers have accumulated.
@@ -3623,7 +3627,7 @@ impl Timeline {
.context("wait for layer upload ops to complete")?;
}
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
let mut updates = layers.batch_update();
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
for l in new_layers {
@@ -3888,7 +3892,7 @@ impl Timeline {
// 4. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut layers = self.layers.write().unwrap();
let mut layers = self.layers.write().await;
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -4184,7 +4188,7 @@ impl Timeline {
// Download complete. Replace the RemoteLayer with the corresponding
// Delta- or ImageLayer in the layer map.
let mut layers = self_clone.layers.write().unwrap();
let mut layers = self_clone.layers.write().await;
let mut updates = layers.batch_update();
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
{
@@ -4342,7 +4346,7 @@ impl Timeline {
) {
let mut downloads = Vec::new();
{
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
layers
.iter_historic_layers()
.filter_map(|l| l.downcast_remote_layer())
@@ -4444,8 +4448,8 @@ impl LocalLayerInfoForDiskUsageEviction {
}
impl Timeline {
pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().unwrap();
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let layers = self.layers.read().await;
let mut max_layer_size: Option<u64> = None;
let mut resident_layers = Vec::new();
@@ -4533,8 +4537,8 @@ impl<'a> TimelineWriter<'a> {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value)
pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value).await
}
pub fn put_locked(
@@ -4548,8 +4552,8 @@ impl<'a> TimelineWriter<'a> {
.put_value_locked(key, lsn, value, pre_locked_layer_map)
}
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
self.tl.put_tombstone(key_range, lsn)
pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
self.tl.put_tombstone(key_range, lsn).await
}
/// Track the end of the latest digested WAL record.

View File

@@ -185,7 +185,7 @@ impl Timeline {
// We don't want to hold the layer map lock during eviction.
// So, we just need to deal with this.
let candidates: Vec<Arc<dyn PersistentLayer>> = {
let layers = self.layers.read().unwrap();
let layers = self.layers.read().await;
let mut candidates = Vec::new();
for hist_layer in layers.iter_historic_layers() {
if hist_layer.is_remote_layer() {