mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
turn Timeline::layers into tokio::sync::RwLock
This commit is contained in:
@@ -512,7 +512,7 @@ async fn collect_eviction_candidates(
|
||||
if !tl.is_active() {
|
||||
continue;
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction();
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction().await;
|
||||
debug!(tenant_id=%tl.tenant_id, timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
tenant_candidates.extend(
|
||||
info.resident_layers
|
||||
|
||||
@@ -180,7 +180,7 @@ async fn build_timeline_info(
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
crate::tenant::debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let mut info = build_timeline_info_common(timeline, ctx)?;
|
||||
let mut info = build_timeline_info_common(timeline, ctx).await?;
|
||||
if include_non_incremental_logical_size {
|
||||
// XXX we should be using spawn_ondemand_logical_size_calculation here.
|
||||
// Otherwise, if someone deletes the timeline / detaches the tenant while
|
||||
@@ -198,7 +198,7 @@ async fn build_timeline_info(
|
||||
Ok(info)
|
||||
}
|
||||
|
||||
fn build_timeline_info_common(
|
||||
async fn build_timeline_info_common(
|
||||
timeline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<TimelineInfo> {
|
||||
@@ -229,7 +229,7 @@ fn build_timeline_info_common(
|
||||
None
|
||||
}
|
||||
};
|
||||
let current_physical_size = Some(timeline.layer_size_sum());
|
||||
let current_physical_size = Some(timeline.layer_size_sum().await);
|
||||
let state = timeline.current_state();
|
||||
let remote_consistent_lsn = timeline.get_remote_consistent_lsn().unwrap_or(Lsn(0));
|
||||
|
||||
@@ -291,6 +291,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
|
||||
Ok(Some(new_timeline)) => {
|
||||
// Created. Construct a TimelineInfo for it.
|
||||
let timeline_info = build_timeline_info_common(&new_timeline, &ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
json_response(StatusCode::CREATED, timeline_info)
|
||||
}
|
||||
@@ -522,7 +523,7 @@ async fn tenant_status(request: Request<Body>) -> Result<Response<Body>, ApiErro
|
||||
// Calculate total physical size of all timelines
|
||||
let mut current_physical_size = 0;
|
||||
for timeline in tenant.list_timelines().await.iter() {
|
||||
current_physical_size += timeline.layer_size_sum();
|
||||
current_physical_size += timeline.layer_size_sum().await;
|
||||
}
|
||||
|
||||
let state = tenant.current_state();
|
||||
@@ -627,7 +628,7 @@ async fn layer_map_info_handler(request: Request<Body>) -> Result<Response<Body>
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
let layer_map_info = timeline.layer_map_info(reset);
|
||||
let layer_map_info = timeline.layer_map_info(reset).await;
|
||||
|
||||
json_response(StatusCode::OK, layer_map_info)
|
||||
}
|
||||
|
||||
@@ -1118,7 +1118,7 @@ impl<'a> DatadirModification<'a> {
|
||||
|
||||
let writer = self.tline.writer().await;
|
||||
|
||||
let mut layer_map = self.tline.layers.write().unwrap();
|
||||
let mut layer_map = self.tline.layers.write().await;
|
||||
|
||||
// Flush relation and SLRU data blocks, keep metadata.
|
||||
let mut result: anyhow::Result<()> = Ok(());
|
||||
@@ -1152,10 +1152,10 @@ impl<'a> DatadirModification<'a> {
|
||||
self.pending_nblocks = 0;
|
||||
|
||||
for (key, value) in self.pending_updates.drain() {
|
||||
writer.put(key, lsn, &value)?;
|
||||
writer.put(key, lsn, &value).await?;
|
||||
}
|
||||
for key_range in self.pending_deletions.drain(..) {
|
||||
writer.delete(key_range, lsn)?;
|
||||
writer.delete(key_range, lsn).await?;
|
||||
}
|
||||
|
||||
writer.finish_write(lsn);
|
||||
|
||||
@@ -213,6 +213,7 @@ impl UninitializedTimeline<'_> {
|
||||
if load_layer_map {
|
||||
new_timeline
|
||||
.load_layer_map(new_disk_consistent_lsn)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to load layermap for timeline {tenant_id}/{timeline_id}"
|
||||
@@ -556,7 +557,7 @@ impl Tenant {
|
||||
|| timeline
|
||||
.layers
|
||||
.read()
|
||||
.unwrap()
|
||||
.await
|
||||
.iter_historic_layers()
|
||||
.next()
|
||||
.is_some(),
|
||||
@@ -1206,6 +1207,7 @@ impl Tenant {
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Helper for unit tests to create an emtpy timeline.
|
||||
@@ -2475,7 +2477,8 @@ impl Tenant {
|
||||
timeline_uninit_mark,
|
||||
false,
|
||||
Some(Arc::clone(src_timeline)),
|
||||
)?
|
||||
)
|
||||
.await?
|
||||
.initialize_with_lock(ctx, &mut timelines, true)
|
||||
.await?
|
||||
};
|
||||
@@ -2554,8 +2557,9 @@ impl Tenant {
|
||||
pgdata_lsn,
|
||||
pg_version,
|
||||
);
|
||||
let raw_timeline =
|
||||
self.prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None)?;
|
||||
let raw_timeline = self
|
||||
.prepare_timeline(timeline_id, &new_metadata, timeline_uninit_mark, true, None)
|
||||
.await?;
|
||||
|
||||
let tenant_id = raw_timeline.owning_tenant.tenant_id;
|
||||
let unfinished_timeline = raw_timeline.raw_timeline()?;
|
||||
@@ -2610,7 +2614,7 @@ impl Tenant {
|
||||
|
||||
/// Creates intermediate timeline structure and its files, without loading it into memory.
|
||||
/// It's up to the caller to import the necesary data and import the timeline into memory.
|
||||
fn prepare_timeline(
|
||||
async fn prepare_timeline(
|
||||
&self,
|
||||
new_timeline_id: TimelineId,
|
||||
new_metadata: &TimelineMetadata,
|
||||
@@ -2642,7 +2646,7 @@ impl Tenant {
|
||||
) {
|
||||
Ok(new_timeline) => {
|
||||
if init_layers {
|
||||
new_timeline.layers.write().unwrap().next_open_layer_at =
|
||||
new_timeline.layers.write().await.next_open_layer_at =
|
||||
Some(new_timeline.initdb_lsn);
|
||||
}
|
||||
debug!(
|
||||
@@ -3307,12 +3311,16 @@ mod tests {
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x10));
|
||||
drop(writer);
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x20));
|
||||
drop(writer);
|
||||
|
||||
@@ -3384,13 +3392,21 @@ mod tests {
|
||||
let TEST_KEY_B: Key = Key::from_hex("112222222233333333444444445500000002").unwrap();
|
||||
|
||||
// Insert a value on the timeline
|
||||
writer.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))?;
|
||||
writer.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))?;
|
||||
writer
|
||||
.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))
|
||||
.await?;
|
||||
writer
|
||||
.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x20));
|
||||
|
||||
writer.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))?;
|
||||
writer
|
||||
.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x30));
|
||||
writer.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))?;
|
||||
writer
|
||||
.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x40));
|
||||
|
||||
//assert_current_logical_size(&tline, Lsn(0x40));
|
||||
@@ -3404,7 +3420,9 @@ mod tests {
|
||||
.await
|
||||
.expect("Should have a local timeline");
|
||||
let new_writer = newtline.writer().await;
|
||||
new_writer.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))?;
|
||||
new_writer
|
||||
.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))
|
||||
.await?;
|
||||
new_writer.finish_write(Lsn(0x40));
|
||||
|
||||
// Check page contents on both branches
|
||||
@@ -3432,36 +3450,44 @@ mod tests {
|
||||
{
|
||||
let writer = tline.writer().await;
|
||||
// Create a relation on the timeline
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
lsn += 0x10;
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
lsn += 0x10;
|
||||
}
|
||||
tline.freeze_and_flush().await?;
|
||||
{
|
||||
let writer = tline.writer().await;
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
lsn += 0x10;
|
||||
writer.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
}
|
||||
tline.freeze_and_flush().await
|
||||
@@ -3784,7 +3810,9 @@ mod tests {
|
||||
.await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x10));
|
||||
drop(writer);
|
||||
|
||||
@@ -3792,7 +3820,9 @@ mod tests {
|
||||
tline.compact(&ctx).await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))?;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x20));
|
||||
drop(writer);
|
||||
|
||||
@@ -3800,7 +3830,9 @@ mod tests {
|
||||
tline.compact(&ctx).await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))?;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x30));
|
||||
drop(writer);
|
||||
|
||||
@@ -3808,7 +3840,9 @@ mod tests {
|
||||
tline.compact(&ctx).await?;
|
||||
|
||||
let writer = tline.writer().await;
|
||||
writer.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))?;
|
||||
writer
|
||||
.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x40));
|
||||
drop(writer);
|
||||
|
||||
@@ -3860,11 +3894,13 @@ mod tests {
|
||||
for _ in 0..10000 {
|
||||
test_key.field6 = blknum;
|
||||
let writer = tline.writer().await;
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
|
||||
@@ -3910,11 +3946,13 @@ mod tests {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer().await;
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
updated[blknum] = lsn;
|
||||
drop(writer);
|
||||
@@ -3928,11 +3966,13 @@ mod tests {
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer().await;
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
updated[blknum] = lsn;
|
||||
@@ -3985,11 +4025,13 @@ mod tests {
|
||||
lsn = Lsn(lsn.0 + 0x10);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer().await;
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(lsn);
|
||||
updated[blknum] = lsn;
|
||||
drop(writer);
|
||||
@@ -4012,11 +4054,13 @@ mod tests {
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer().await;
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
println!("updating {} at {}", blknum, lsn);
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
@@ -4079,11 +4123,13 @@ mod tests {
|
||||
let blknum = thread_rng().gen_range(0..NUM_KEYS);
|
||||
test_key.field6 = blknum as u32;
|
||||
let writer = tline.writer().await;
|
||||
writer.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
|
||||
)?;
|
||||
writer
|
||||
.put(
|
||||
test_key,
|
||||
lsn,
|
||||
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
|
||||
)
|
||||
.await?;
|
||||
println!("updating [{}][{}] at {}", idx, blknum, lsn);
|
||||
writer.finish_write(lsn);
|
||||
drop(writer);
|
||||
|
||||
@@ -304,7 +304,7 @@ impl InMemoryLayer {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
|
||||
pub async fn put_tombstone(&self, _key_range: Range<Key>, _lsn: Lsn) -> Result<()> {
|
||||
// TODO: Currently, we just leak the storage for any deleted keys
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -119,7 +119,7 @@ pub struct Timeline {
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
pub(crate) layers: RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
@@ -238,7 +238,7 @@ pub struct Timeline {
|
||||
eviction_task_timeline_state: tokio::sync::Mutex<EvictionTaskTimelineState>,
|
||||
}
|
||||
|
||||
type LayerMapWriteLockGuard<'t> = std::sync::RwLockWriteGuard<'t, LayerMap<dyn PersistentLayer>>;
|
||||
type LayerMapWriteLockGuard<'t> = tokio::sync::RwLockWriteGuard<'t, LayerMap<dyn PersistentLayer>>;
|
||||
|
||||
/// Internal structure to hold all data needed for logical size calculation.
|
||||
///
|
||||
@@ -574,8 +574,8 @@ impl Timeline {
|
||||
/// The sum of the file size of all historic layers in the layer map.
|
||||
/// This method makes no distinction between local and remote layers.
|
||||
/// Hence, the result **does not represent local filesystem usage**.
|
||||
pub fn layer_size_sum(&self) -> u64 {
|
||||
let layer_map = self.layers.read().unwrap();
|
||||
pub async fn layer_size_sum(&self) -> u64 {
|
||||
let layer_map = self.layers.read().await;
|
||||
let mut size = 0;
|
||||
for l in layer_map.iter_historic_layers() {
|
||||
size += l.file_size();
|
||||
@@ -885,7 +885,7 @@ impl Timeline {
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
pub async fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_size = open_layer.size()?;
|
||||
drop(layers);
|
||||
@@ -981,8 +981,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
let layer_map = self.layers.read().unwrap();
|
||||
pub async fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
let layer_map = self.layers.read().await;
|
||||
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
|
||||
if let Some(open_layer) = &layer_map.open_layer {
|
||||
in_memory_layers.push(open_layer.info());
|
||||
@@ -1004,7 +1004,7 @@ impl Timeline {
|
||||
|
||||
#[instrument(skip_all, fields(tenant = %self.tenant_id, timeline = %self.timeline_id))]
|
||||
pub async fn download_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let Some(layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
|
||||
let Some(remote_layer) = layer.downcast_remote_layer() else { return Ok(Some(false)) };
|
||||
if self.remote_client.is_none() {
|
||||
return Ok(Some(false));
|
||||
@@ -1017,7 +1017,7 @@ impl Timeline {
|
||||
/// Like [`evict_layer_batch`], but for just one layer.
|
||||
/// Additional case `Ok(None)` covers the case where the layer could not be found by its `layer_file_name`.
|
||||
pub async fn evict_layer(&self, layer_file_name: &str) -> anyhow::Result<Option<bool>> {
|
||||
let Some(local_layer) = self.find_layer(layer_file_name) else { return Ok(None) };
|
||||
let Some(local_layer) = self.find_layer(layer_file_name).await else { return Ok(None) };
|
||||
let remote_client = self
|
||||
.remote_client
|
||||
.as_ref()
|
||||
@@ -1102,7 +1102,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// start the batch update
|
||||
let mut layer_map = self.layers.write().unwrap();
|
||||
let mut layer_map = self.layers.write().await;
|
||||
let mut batch_updates = layer_map.batch_update();
|
||||
|
||||
let mut results = Vec::with_capacity(layers_to_evict.len());
|
||||
@@ -1346,7 +1346,7 @@ impl Timeline {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
layers: RwLock::new(LayerMap::default()),
|
||||
layers: tokio::sync::RwLock::new(LayerMap::default()),
|
||||
wanted_image_layers: Mutex::new(None),
|
||||
|
||||
walredo_mgr,
|
||||
@@ -1519,8 +1519,8 @@ impl Timeline {
|
||||
/// Scan the timeline directory to populate the layer map.
|
||||
/// Returns all timeline-related files that were found and loaded.
|
||||
///
|
||||
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
pub(super) async fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let mut num_layers = 0;
|
||||
|
||||
@@ -1649,7 +1649,7 @@ impl Timeline {
|
||||
|
||||
// We're holding a layer map lock for a while but this
|
||||
// method is only called during init so it's fine.
|
||||
let mut layer_map = self.layers.write().unwrap();
|
||||
let mut layer_map = self.layers.write().await;
|
||||
let mut updates = layer_map.batch_update();
|
||||
for remote_layer_name in &index_part.timeline_layers {
|
||||
let local_layer = local_only_layers.remove(remote_layer_name);
|
||||
@@ -1802,7 +1802,7 @@ impl Timeline {
|
||||
let local_layers = self
|
||||
.layers
|
||||
.read()
|
||||
.unwrap()
|
||||
.await
|
||||
.iter_historic_layers()
|
||||
.map(|l| (l.filename(), l))
|
||||
.collect::<HashMap<_, _>>();
|
||||
@@ -2154,8 +2154,8 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
|
||||
async fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||
for historic_layer in self.layers.read().await.iter_historic_layers() {
|
||||
let historic_layer_name = historic_layer.filename().file_name();
|
||||
if layer_file_name == historic_layer_name {
|
||||
return Some(historic_layer);
|
||||
@@ -2362,7 +2362,7 @@ impl Timeline {
|
||||
#[allow(clippy::never_loop)] // see comment at bottom of this loop
|
||||
'layer_map_search: loop {
|
||||
let remote_layer = {
|
||||
let layers = timeline.layers.read().unwrap();
|
||||
let layers = timeline.layers.read().await;
|
||||
|
||||
// Check the open and frozen in-memory layers first, in order from newest
|
||||
// to oldest.
|
||||
@@ -2541,8 +2541,8 @@ impl Timeline {
|
||||
///
|
||||
/// Get a handle to the latest layer for appending.
|
||||
///
|
||||
fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
async fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut layers = self.layers.write().await;
|
||||
self.get_layer_for_write_locked(lsn, &mut layers)
|
||||
}
|
||||
|
||||
@@ -2593,9 +2593,9 @@ impl Timeline {
|
||||
Ok(layer)
|
||||
}
|
||||
|
||||
fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
|
||||
async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
|
||||
//info!("PUT: key {} at {}", key, lsn);
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
let layer = self.get_layer_for_write(lsn).await?;
|
||||
layer.put_value(key, lsn, val)?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -2613,9 +2613,9 @@ impl Timeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
let layer = self.get_layer_for_write(lsn)?;
|
||||
layer.put_tombstone(key_range, lsn)?;
|
||||
async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
let layer = self.get_layer_for_write(lsn).await?;
|
||||
layer.put_tombstone(key_range, lsn).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -2635,7 +2635,7 @@ impl Timeline {
|
||||
} else {
|
||||
Some(self.write_lock.lock().await)
|
||||
};
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_rc = Arc::clone(open_layer);
|
||||
// Does this layer need freezing?
|
||||
@@ -2673,7 +2673,7 @@ impl Timeline {
|
||||
let flush_counter = *layer_flush_start_rx.borrow();
|
||||
let result = loop {
|
||||
let layer_to_flush = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
layers.frozen_layers.front().cloned()
|
||||
// drop 'layers' lock to allow concurrent reads and writes
|
||||
};
|
||||
@@ -2765,7 +2765,7 @@ impl Timeline {
|
||||
.await?
|
||||
} else {
|
||||
// normal case, write out a L0 delta layer file.
|
||||
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer)?;
|
||||
let (delta_path, metadata) = self.create_delta_layer(&frozen_layer).await?;
|
||||
HashMap::from([(delta_path, metadata)])
|
||||
};
|
||||
|
||||
@@ -2774,7 +2774,7 @@ impl Timeline {
|
||||
// The new on-disk layers are now in the layer map. We can remove the
|
||||
// in-memory layer from the map now.
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let l = layers.frozen_layers.pop_front();
|
||||
|
||||
// Only one thread may call this function at a time (for this
|
||||
@@ -2868,7 +2868,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Write out the given frozen in-memory layer as a new L0 delta file
|
||||
fn create_delta_layer(
|
||||
async fn create_delta_layer(
|
||||
&self,
|
||||
frozen_layer: &InMemoryLayer,
|
||||
) -> anyhow::Result<(LayerFileName, LayerFileMetadata)> {
|
||||
@@ -2892,7 +2892,7 @@ impl Timeline {
|
||||
|
||||
// Add it to the layer map
|
||||
let l = Arc::new(new_delta);
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut batch_updates = layers.batch_update();
|
||||
l.access_stats().record_residence_event(
|
||||
&batch_updates,
|
||||
@@ -2944,10 +2944,14 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// Is it time to create a new image layer for the given partition?
|
||||
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
|
||||
async fn time_for_new_image_layer(
|
||||
&self,
|
||||
partition: &KeySpace,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<bool> {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
|
||||
let mut max_deltas = 0;
|
||||
{
|
||||
@@ -3042,7 +3046,7 @@ impl Timeline {
|
||||
for partition in partitioning.parts.iter() {
|
||||
let img_range = start..partition.ranges.last().unwrap().end;
|
||||
start = img_range.end;
|
||||
if force || self.time_for_new_image_layer(partition, lsn)? {
|
||||
if force || self.time_for_new_image_layer(partition, lsn).await? {
|
||||
let mut image_layer_writer = ImageLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
@@ -3120,7 +3124,7 @@ impl Timeline {
|
||||
|
||||
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
|
||||
for l in image_layers {
|
||||
@@ -3187,7 +3191,7 @@ impl Timeline {
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
|
||||
// Only compact if enough layers have accumulated.
|
||||
@@ -3545,7 +3549,7 @@ impl Timeline {
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
}
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
|
||||
for l in new_layers {
|
||||
@@ -3805,7 +3809,7 @@ impl Timeline {
|
||||
// 4. newer on-disk image layers cover the layer's whole key range
|
||||
//
|
||||
// TODO holding a write lock is too agressive and avoidable
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write().await;
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
result.layers_total += 1;
|
||||
|
||||
@@ -4101,7 +4105,7 @@ impl Timeline {
|
||||
|
||||
// Download complete. Replace the RemoteLayer with the corresponding
|
||||
// Delta- or ImageLayer in the layer map.
|
||||
let mut layers = self_clone.layers.write().unwrap();
|
||||
let mut layers = self_clone.layers.write().await;
|
||||
let mut updates = layers.batch_update();
|
||||
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||
{
|
||||
@@ -4259,7 +4263,7 @@ impl Timeline {
|
||||
) {
|
||||
let mut downloads = Vec::new();
|
||||
{
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
layers
|
||||
.iter_historic_layers()
|
||||
.filter_map(|l| l.downcast_remote_layer())
|
||||
@@ -4361,8 +4365,8 @@ impl LocalLayerInfoForDiskUsageEviction {
|
||||
}
|
||||
|
||||
impl Timeline {
|
||||
pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
|
||||
let layers = self.layers.read().unwrap();
|
||||
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
|
||||
let layers = self.layers.read().await;
|
||||
|
||||
let mut max_layer_size: Option<u64> = None;
|
||||
let mut resident_layers = Vec::new();
|
||||
@@ -4450,8 +4454,8 @@ impl<'a> TimelineWriter<'a> {
|
||||
///
|
||||
/// This will implicitly extend the relation, if the page is beyond the
|
||||
/// current end-of-file.
|
||||
pub fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
|
||||
self.tl.put_value(key, lsn, value)
|
||||
pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
|
||||
self.tl.put_value(key, lsn, value).await
|
||||
}
|
||||
|
||||
pub fn put_locked(
|
||||
@@ -4465,8 +4469,8 @@ impl<'a> TimelineWriter<'a> {
|
||||
.put_value_locked(key, lsn, value, pre_locked_layer_map)
|
||||
}
|
||||
|
||||
pub fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
self.tl.put_tombstone(key_range, lsn)
|
||||
pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
|
||||
self.tl.put_tombstone(key_range, lsn).await
|
||||
}
|
||||
|
||||
/// Track the end of the latest digested WAL record.
|
||||
|
||||
@@ -185,7 +185,7 @@ impl Timeline {
|
||||
// We don't want to hold the layer map lock during eviction.
|
||||
// So, we just need to deal with this.
|
||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read().await;
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
if hist_layer.is_remote_layer() {
|
||||
|
||||
Reference in New Issue
Block a user