mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 17:40:37 +00:00
Use RwLock instad of Mutex for layer map lock.
For more concurrency
This commit is contained in:
@@ -193,7 +193,7 @@ impl Repository for LayeredRepository {
|
||||
Arc::clone(&self.walredo_mgr),
|
||||
self.upload_layers,
|
||||
);
|
||||
timeline.layers.lock().unwrap().next_open_layer_at = Some(initdb_lsn);
|
||||
timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn);
|
||||
|
||||
let timeline = Arc::new(timeline);
|
||||
let r = timelines.insert(
|
||||
@@ -725,7 +725,7 @@ pub struct LayeredTimeline {
|
||||
tenantid: ZTenantId,
|
||||
timelineid: ZTimelineId,
|
||||
|
||||
layers: Mutex<LayerMap>,
|
||||
layers: RwLock<LayerMap>,
|
||||
|
||||
last_freeze_at: AtomicLsn,
|
||||
|
||||
@@ -997,7 +997,7 @@ impl LayeredTimeline {
|
||||
conf,
|
||||
timelineid,
|
||||
tenantid,
|
||||
layers: Mutex::new(LayerMap::default()),
|
||||
layers: RwLock::new(LayerMap::default()),
|
||||
|
||||
walredo_mgr,
|
||||
|
||||
@@ -1040,7 +1040,7 @@ impl LayeredTimeline {
|
||||
/// Returns all timeline-related files that were found and loaded.
|
||||
///
|
||||
fn load_layer_map(&self, disk_consistent_lsn: Lsn) -> anyhow::Result<()> {
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let mut num_layers = 0;
|
||||
|
||||
// Scan timeline directory and create ImageFileName and DeltaFilename
|
||||
@@ -1194,7 +1194,7 @@ impl LayeredTimeline {
|
||||
continue;
|
||||
}
|
||||
|
||||
let layers = timeline.layers.lock().unwrap();
|
||||
let layers = timeline.layers.read().unwrap();
|
||||
|
||||
// Check the open and frozen in-memory layers first
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
@@ -1276,7 +1276,7 @@ impl LayeredTimeline {
|
||||
/// Get a handle to the latest layer for appending.
|
||||
///
|
||||
fn get_layer_for_write(&self, lsn: Lsn) -> anyhow::Result<Arc<InMemoryLayer>> {
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
|
||||
ensure!(lsn.is_aligned());
|
||||
|
||||
@@ -1347,7 +1347,7 @@ impl LayeredTimeline {
|
||||
} else {
|
||||
Some(self.write_lock.lock().unwrap())
|
||||
};
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
if let Some(open_layer) = &layers.open_layer {
|
||||
let open_layer_rc = Arc::clone(open_layer);
|
||||
// Does this layer need freezing?
|
||||
@@ -1412,7 +1412,7 @@ impl LayeredTimeline {
|
||||
let timer = self.flush_time_histo.start_timer();
|
||||
|
||||
loop {
|
||||
let layers = self.layers.lock().unwrap();
|
||||
let layers = self.layers.read().unwrap();
|
||||
if let Some(frozen_layer) = layers.frozen_layers.front() {
|
||||
let frozen_layer = Arc::clone(frozen_layer);
|
||||
drop(layers); // to allow concurrent reads and writes
|
||||
@@ -1456,7 +1456,7 @@ impl LayeredTimeline {
|
||||
|
||||
// Finally, replace the frozen in-memory layer with the new on-disk layers
|
||||
{
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
let l = layers.frozen_layers.pop_front();
|
||||
|
||||
// Only one thread may call this function at a time (for this
|
||||
@@ -1612,7 +1612,7 @@ impl LayeredTimeline {
|
||||
lsn: Lsn,
|
||||
threshold: usize,
|
||||
) -> Result<bool> {
|
||||
let layers = self.layers.lock().unwrap();
|
||||
let layers = self.layers.read().unwrap();
|
||||
|
||||
for part_range in &partition.ranges {
|
||||
let image_coverage = layers.image_coverage(part_range, lsn)?;
|
||||
@@ -1670,7 +1670,7 @@ impl LayeredTimeline {
|
||||
|
||||
// FIXME: Do we need to do something to upload it to remote storage here?
|
||||
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
layers.insert_historic(Arc::new(image_layer));
|
||||
drop(layers);
|
||||
|
||||
@@ -1678,7 +1678,7 @@ impl LayeredTimeline {
|
||||
}
|
||||
|
||||
fn compact_level0(&self, target_file_size: u64) -> Result<()> {
|
||||
let layers = self.layers.lock().unwrap();
|
||||
let layers = self.layers.read().unwrap();
|
||||
|
||||
let level0_deltas = layers.get_level0_deltas()?;
|
||||
|
||||
@@ -1768,7 +1768,7 @@ impl LayeredTimeline {
|
||||
layer_paths.pop().unwrap();
|
||||
}
|
||||
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
for l in new_layers {
|
||||
layers.insert_historic(Arc::new(l));
|
||||
}
|
||||
@@ -1850,7 +1850,7 @@ impl LayeredTimeline {
|
||||
// 2. it doesn't need to be retained for 'retain_lsns';
|
||||
// 3. newer on-disk image layers cover the layer's whole key range
|
||||
//
|
||||
let mut layers = self.layers.lock().unwrap();
|
||||
let mut layers = self.layers.write().unwrap();
|
||||
'outer: for l in layers.iter_historic_layers() {
|
||||
// This layer is in the process of being flushed to disk.
|
||||
// It will be swapped out of the layer map, replaced with
|
||||
|
||||
Reference in New Issue
Block a user