mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-27 01:50:38 +00:00
Refactor partitioning
This commit is contained in:
committed by
Anastasia Lubennikova
parent
92031d376a
commit
232fe14297
@@ -34,7 +34,7 @@ use std::time::Instant;
|
||||
|
||||
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
|
||||
use crate::config::PageServerConf;
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace};
|
||||
use crate::keyspace::KeySpace;
|
||||
use crate::page_cache;
|
||||
use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex};
|
||||
use crate::repository::{
|
||||
@@ -792,8 +792,6 @@ pub struct LayeredTimeline {
|
||||
// garbage collecting data that is still needed by the child timelines.
|
||||
gc_info: RwLock<GcInfo>,
|
||||
|
||||
partitioning: RwLock<Option<(KeyPartitioning, Lsn)>>,
|
||||
|
||||
// It may change across major versions so for simplicity
|
||||
// keep it after running initdb for a timeline.
|
||||
// It is needed in checks when we want to error on some operations
|
||||
@@ -943,14 +941,6 @@ impl Timeline for LayeredTimeline {
|
||||
self.disk_consistent_lsn.load()
|
||||
}
|
||||
|
||||
fn hint_partitioning(&self, partitioning: KeyPartitioning, lsn: Lsn) -> Result<()> {
|
||||
self.partitioning
|
||||
.write()
|
||||
.unwrap()
|
||||
.replace((partitioning, lsn));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn writer<'a>(&'a self) -> Box<dyn TimelineWriter + 'a> {
|
||||
Box::new(LayeredTimelineWriter {
|
||||
tl: self,
|
||||
@@ -1037,7 +1027,6 @@ impl LayeredTimeline {
|
||||
retain_lsns: Vec::new(),
|
||||
cutoff: Lsn(0),
|
||||
}),
|
||||
partitioning: RwLock::new(None),
|
||||
|
||||
latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()),
|
||||
initdb_lsn: metadata.initdb_lsn(),
|
||||
@@ -1592,23 +1581,11 @@ impl LayeredTimeline {
|
||||
// Define partitioning schema if needed
|
||||
if let Ok(pgdir) = tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid)
|
||||
{
|
||||
pgdir.repartition(self.get_last_record_lsn())?;
|
||||
}
|
||||
|
||||
// 1. The partitioning was already done by the code in
|
||||
// pgdatadir_mapping.rs. We just use it here.
|
||||
let partitioning_guard = self.partitioning.read().unwrap();
|
||||
if let Some((partitioning, lsn)) = partitioning_guard.as_ref() {
|
||||
let (partitioning, lsn) = pgdir.repartition(self.get_last_record_lsn())?;
|
||||
let timer = self.create_images_time_histo.start_timer();
|
||||
// Make a copy of the partitioning, so that we can release
|
||||
// the lock. Otherwise we could block the WAL receiver.
|
||||
let lsn = *lsn;
|
||||
let parts = partitioning.parts.clone();
|
||||
drop(partitioning_guard);
|
||||
|
||||
// 2. Create new image layers for partitions that have been modified
|
||||
// "enough".
|
||||
for part in parts.iter() {
|
||||
for part in partitioning.parts.iter() {
|
||||
if self.time_for_new_image_layer(part, lsn, 3)? {
|
||||
self.create_image_layer(part, lsn)?;
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
//! walingest.rs handles a few things like implicit relation creation and extension.
|
||||
//! Clarify that)
|
||||
//!
|
||||
use crate::keyspace::{KeySpace, KeySpaceAccum, TARGET_FILE_SIZE_BYTES};
|
||||
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceAccum, TARGET_FILE_SIZE_BYTES};
|
||||
use crate::reltag::{RelTag, SlruKind};
|
||||
use crate::repository::*;
|
||||
use crate::repository::{Repository, Timeline};
|
||||
@@ -18,10 +18,9 @@ use serde::{Deserialize, Serialize};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::ops::Range;
|
||||
use std::sync::atomic::{AtomicIsize, Ordering};
|
||||
use std::sync::{Arc, RwLockReadGuard};
|
||||
use std::sync::{Arc, RwLock, RwLockReadGuard};
|
||||
use tracing::{debug, error, trace, warn};
|
||||
use zenith_utils::bin_ser::BeSer;
|
||||
use zenith_utils::lsn::AtomicLsn;
|
||||
use zenith_utils::lsn::Lsn;
|
||||
|
||||
/// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type.
|
||||
@@ -38,7 +37,7 @@ where
|
||||
pub tline: Arc<R::Timeline>,
|
||||
|
||||
/// When did we last calculate the partitioning?
|
||||
last_partitioning: AtomicLsn,
|
||||
partitioning: RwLock<(KeyPartitioning, Lsn)>,
|
||||
|
||||
/// Configuration: how often should the partitioning be recalculated.
|
||||
repartition_threshold: u64,
|
||||
@@ -51,7 +50,7 @@ impl<R: Repository> DatadirTimeline<R> {
|
||||
pub fn new(tline: Arc<R::Timeline>, repartition_threshold: u64) -> Self {
|
||||
DatadirTimeline {
|
||||
tline,
|
||||
last_partitioning: AtomicLsn::new(0),
|
||||
partitioning: RwLock::new((KeyPartitioning::new(), Lsn(0))),
|
||||
current_logical_size: AtomicIsize::new(0),
|
||||
repartition_threshold,
|
||||
}
|
||||
@@ -389,15 +388,19 @@ impl<R: Repository> DatadirTimeline<R> {
|
||||
Ok(result.to_keyspace())
|
||||
}
|
||||
|
||||
pub fn repartition(&self, lsn: Lsn) -> Result<()> {
|
||||
let last_partitioning = self.last_partitioning.load();
|
||||
if last_partitioning == Lsn(0) || lsn.0 - last_partitioning.0 > self.repartition_threshold {
|
||||
pub fn repartition(&self, lsn: Lsn) -> Result<(KeyPartitioning, Lsn)> {
|
||||
let partitioning_guard = self.partitioning.read().unwrap();
|
||||
if partitioning_guard.1 == Lsn(0)
|
||||
|| lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold
|
||||
{
|
||||
let keyspace = self.collect_keyspace(lsn)?;
|
||||
drop(partitioning_guard);
|
||||
let mut partitioning_guard = self.partitioning.write().unwrap();
|
||||
let partitioning = keyspace.partition(TARGET_FILE_SIZE_BYTES);
|
||||
self.tline.hint_partitioning(partitioning, lsn)?;
|
||||
self.last_partitioning.store(lsn);
|
||||
*partitioning_guard = (partitioning, lsn);
|
||||
return Ok((partitioning_guard.0.clone(), lsn));
|
||||
}
|
||||
Ok(())
|
||||
Ok((partitioning_guard.0.clone(), partitioning_guard.1))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use crate::keyspace::KeyPartitioning;
|
||||
use crate::layered_repository::metadata::TimelineMetadata;
|
||||
use crate::remote_storage::RemoteIndex;
|
||||
use crate::walrecord::ZenithWalRecord;
|
||||
@@ -372,19 +371,6 @@ pub trait Timeline: Send + Sync {
|
||||
/// know anything about them here in the repository.
|
||||
fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()>;
|
||||
|
||||
///
|
||||
/// Tell the implementation how the keyspace should be partitioned.
|
||||
///
|
||||
/// FIXME: This is quite a hack. The code in pgdatadir_mapping.rs knows
|
||||
/// which keys exist and what is the logical grouping of them. That's why
|
||||
/// the code there (and in keyspace.rs) decides the partitioning, not the
|
||||
/// layered_repository.rs implementation. That's a layering violation:
|
||||
/// the Repository implementation ought to be responsible for the physical
|
||||
/// layout, but currently it's more convenient to do it in pgdatadir_mapping.rs
|
||||
/// rather than in layered_repository.rs.
|
||||
///
|
||||
fn hint_partitioning(&self, partitioning: KeyPartitioning, lsn: Lsn) -> Result<()>;
|
||||
|
||||
///
|
||||
/// Check that it is valid to request operations with that lsn.
|
||||
fn check_lsn_is_in_scope(
|
||||
|
||||
Reference in New Issue
Block a user