mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-13 19:20:36 +00:00
switch Timeline::layers to parking_lot::RwLock
This commit is contained in:
@@ -560,7 +560,6 @@ impl Tenant {
|
||||
|| timeline
|
||||
.layers
|
||||
.read()
|
||||
.unwrap()
|
||||
.iter_historic_layers()
|
||||
.next()
|
||||
.is_some(),
|
||||
@@ -2502,8 +2501,7 @@ impl Tenant {
|
||||
) {
|
||||
Ok(new_timeline) => {
|
||||
if init_layers {
|
||||
new_timeline.layers.write().unwrap().next_open_layer_at =
|
||||
Some(new_timeline.initdb_lsn);
|
||||
new_timeline.layers.write().next_open_layer_at = Some(new_timeline.initdb_lsn);
|
||||
}
|
||||
debug!(
|
||||
"Successfully created initial files for timeline {tenant_id}/{new_timeline_id}"
|
||||
|
||||
@@ -121,7 +121,7 @@ pub struct Timeline {
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
pub(super) layers: RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
pub(super) layers: parking_lot::RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
|
||||
last_freeze_at: AtomicLsn,
|
||||
// Atomic would be more appropriate here.
|
||||
@@ -547,7 +547,7 @@ impl Timeline {
|
||||
/// This method makes no distinction between local and remote layers.
|
||||
/// Hence, the result **does not represent local filesystem usage**.
|
||||
pub fn layer_size_sum(&self) -> u64 {
|
||||
let layer_map = self.layers.read().unwrap();
|
||||
let layer_map = self.layers.read();
|
||||
let mut size = 0;
|
||||
for l in layer_map.iter_historic_layers() {
|
||||
size += l.file_size();
|
||||
@@ -847,7 +847,7 @@ impl Timeline {
|
||||
/// safekeepers to regard pageserver as caught up and suspend activity.
|
||||
pub fn check_checkpoint_distance(self: &Arc<Timeline>) -> anyhow::Result<()> {
|
||||
let last_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read();
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_size = open_layer.size()?;
|
||||
drop(layers);
|
||||
@@ -927,7 +927,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
pub fn layer_map_info(&self, reset: LayerAccessStatsReset) -> LayerMapInfo {
|
||||
let layer_map = self.layers.read().unwrap();
|
||||
let layer_map = self.layers.read();
|
||||
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
|
||||
if let Some(open_layer) = &layer_map.open_layer {
|
||||
in_memory_layers.push(open_layer.info());
|
||||
@@ -1047,7 +1047,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
// start the batch update
|
||||
let mut layer_map = self.layers.write().unwrap();
|
||||
let mut layer_map = self.layers.write();
|
||||
let mut batch_updates = layer_map.batch_update();
|
||||
|
||||
let mut results = Vec::with_capacity(layers_to_evict.len());
|
||||
@@ -1311,7 +1311,7 @@ impl Timeline {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
layers: RwLock::new(LayerMap::default()),
|
||||
layers: parking_lot::RwLock::new(LayerMap::default()),
|
||||
|
||||
walredo_mgr,
|
||||
walreceiver,
|
||||
@@ -1452,7 +1452,7 @@ impl Timeline {
|
||||
/// Returns all timeline-related files that were found and loaded.
|
||||
///
|
||||
pub(super) fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write();
|
||||
let mut updates = layers.batch_update();
|
||||
let mut num_layers = 0;
|
||||
|
||||
@@ -1581,7 +1581,7 @@ impl Timeline {
|
||||
|
||||
// We're holding a layer map lock for a while but this
|
||||
// method is only called during init so it's fine.
|
||||
let mut layer_map = self.layers.write().unwrap();
|
||||
let mut layer_map = self.layers.write();
|
||||
let mut updates = layer_map.batch_update();
|
||||
for remote_layer_name in &index_part.timeline_layers {
|
||||
let local_layer = local_only_layers.remove(remote_layer_name);
|
||||
@@ -1734,7 +1734,6 @@ impl Timeline {
|
||||
let local_layers = self
|
||||
.layers
|
||||
.read()
|
||||
.unwrap()
|
||||
.iter_historic_layers()
|
||||
.map(|l| (l.filename(), l))
|
||||
.collect::<HashMap<_, _>>();
|
||||
@@ -2060,7 +2059,7 @@ impl Timeline {
|
||||
}
|
||||
|
||||
fn find_layer(&self, layer_file_name: &str) -> Option<Arc<dyn PersistentLayer>> {
|
||||
for historic_layer in self.layers.read().unwrap().iter_historic_layers() {
|
||||
for historic_layer in self.layers.read().iter_historic_layers() {
|
||||
let historic_layer_name = historic_layer.filename().file_name();
|
||||
if layer_file_name == historic_layer_name {
|
||||
return Some(historic_layer);
|
||||
@@ -2227,7 +2226,7 @@ impl Timeline {
|
||||
#[allow(clippy::never_loop)] // see comment at bottom of this loop
|
||||
'layer_map_search: loop {
|
||||
let remote_layer = {
|
||||
let layers = timeline.layers.read().unwrap();
|
||||
let layers = timeline.layers.read();
|
||||
|
||||
// Check the open and frozen in-memory layers first, in order from newest
|
||||
// to oldest.
|
||||
@@ -2407,7 +2406,7 @@ impl Timeline {
|
||||
/// Get a handle to the latest layer for appending.
|
||||
///
|
||||
fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write();
|
||||
|
||||
ensure!(lsn.is_aligned());
|
||||
|
||||
@@ -2480,7 +2479,7 @@ impl Timeline {
|
||||
} else {
|
||||
Some(self.write_lock.lock().unwrap())
|
||||
};
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write();
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_rc = Arc::clone(open_layer);
|
||||
// Does this layer need freezing?
|
||||
@@ -2518,7 +2517,7 @@ impl Timeline {
|
||||
let flush_counter = *layer_flush_start_rx.borrow();
|
||||
let result = loop {
|
||||
let layer_to_flush = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read();
|
||||
layers.frozen_layers.front().cloned()
|
||||
// drop 'layers' lock to allow concurrent reads and writes
|
||||
};
|
||||
@@ -2619,7 +2618,7 @@ impl Timeline {
|
||||
// The new on-disk layers are now in the layer map. We can remove the
|
||||
// in-memory layer from the map now.
|
||||
{
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write();
|
||||
let l = layers.frozen_layers.pop_front();
|
||||
|
||||
// Only one thread may call this function at a time (for this
|
||||
@@ -2737,7 +2736,7 @@ impl Timeline {
|
||||
|
||||
// Add it to the layer map
|
||||
let l = Arc::new(new_delta);
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write();
|
||||
let mut batch_updates = layers.batch_update();
|
||||
l.access_stats().record_residence_event(
|
||||
&batch_updates,
|
||||
@@ -2792,7 +2791,7 @@ impl Timeline {
|
||||
fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> anyhow::Result<bool> {
|
||||
let threshold = self.get_image_creation_threshold();
|
||||
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read();
|
||||
|
||||
let mut max_deltas = 0;
|
||||
|
||||
@@ -2935,7 +2934,7 @@ impl Timeline {
|
||||
|
||||
let mut layer_paths_to_upload = HashMap::with_capacity(image_layers.len());
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write();
|
||||
let mut updates = layers.batch_update();
|
||||
let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id);
|
||||
for l in image_layers {
|
||||
@@ -3002,7 +3001,7 @@ impl Timeline {
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read();
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
drop(layers);
|
||||
|
||||
@@ -3125,7 +3124,7 @@ impl Timeline {
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let layers = self.layers.read().unwrap(); // Is'n it better to hold original layers lock till here?
|
||||
let layers = self.layers.read(); // Is'n it better to hold original layers lock till here?
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
@@ -3362,7 +3361,7 @@ impl Timeline {
|
||||
.context("wait for layer upload ops to complete")?;
|
||||
}
|
||||
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write();
|
||||
let mut updates = layers.batch_update();
|
||||
let mut new_layer_paths = HashMap::with_capacity(new_layers.len());
|
||||
for l in new_layers {
|
||||
@@ -3621,7 +3620,7 @@ impl Timeline {
|
||||
// 4. newer on-disk image layers cover the layer's whole key range
|
||||
//
|
||||
// TODO holding a write lock is too agressive and avoidable
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut layers = self.layers.write();
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
result.layers_total += 1;
|
||||
|
||||
@@ -3904,7 +3903,7 @@ impl Timeline {
|
||||
|
||||
// Download complete. Replace the RemoteLayer with the corresponding
|
||||
// Delta- or ImageLayer in the layer map.
|
||||
let mut layers = self_clone.layers.write().unwrap();
|
||||
let mut layers = self_clone.layers.write();
|
||||
let mut updates = layers.batch_update();
|
||||
let new_layer = remote_layer.create_downloaded_layer(&updates, self_clone.conf, *size);
|
||||
{
|
||||
@@ -4062,7 +4061,7 @@ impl Timeline {
|
||||
) {
|
||||
let mut downloads = Vec::new();
|
||||
{
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read();
|
||||
layers
|
||||
.iter_historic_layers()
|
||||
.filter_map(|l| l.downcast_remote_layer())
|
||||
@@ -4165,7 +4164,7 @@ impl LocalLayerInfoForDiskUsageEviction {
|
||||
|
||||
impl Timeline {
|
||||
pub(crate) fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read();
|
||||
|
||||
let mut max_layer_size: Option<u64> = None;
|
||||
let mut resident_layers = Vec::new();
|
||||
|
||||
@@ -178,7 +178,7 @@ impl Timeline {
|
||||
// We don't want to hold the layer map lock during eviction.
|
||||
// So, we just need to deal with this.
|
||||
let candidates: Vec<Arc<dyn PersistentLayer>> = {
|
||||
let layers = self.layers.read().unwrap();
|
||||
let layers = self.layers.read();
|
||||
let mut candidates = Vec::new();
|
||||
for hist_layer in layers.iter_historic_layers() {
|
||||
if hist_layer.is_remote_layer() {
|
||||
|
||||
Reference in New Issue
Block a user