From 232fe14297c6f12b6ad83b723ab6dcba09febc5e Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 31 Mar 2022 20:23:56 +0300 Subject: [PATCH] Refactor partitioning --- pageserver/src/layered_repository.rs | 29 +++------------------------- pageserver/src/pgdatadir_mapping.rs | 25 +++++++++++++----------- pageserver/src/repository.rs | 14 -------------- 3 files changed, 17 insertions(+), 51 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index eb4f49ddd1..5ab6097960 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -34,7 +34,7 @@ use std::time::Instant; use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}; use crate::config::PageServerConf; -use crate::keyspace::{KeyPartitioning, KeySpace}; +use crate::keyspace::KeySpace; use crate::page_cache; use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex}; use crate::repository::{ @@ -792,8 +792,6 @@ pub struct LayeredTimeline { // garbage collecting data that is still needed by the child timelines. gc_info: RwLock, - partitioning: RwLock>, - // It may change across major versions so for simplicity // keep it after running initdb for a timeline. // It is needed in checks when we want to error on some operations @@ -943,14 +941,6 @@ impl Timeline for LayeredTimeline { self.disk_consistent_lsn.load() } - fn hint_partitioning(&self, partitioning: KeyPartitioning, lsn: Lsn) -> Result<()> { - self.partitioning - .write() - .unwrap() - .replace((partitioning, lsn)); - Ok(()) - } - fn writer<'a>(&'a self) -> Box { Box::new(LayeredTimelineWriter { tl: self, @@ -1037,7 +1027,6 @@ impl LayeredTimeline { retain_lsns: Vec::new(), cutoff: Lsn(0), }), - partitioning: RwLock::new(None), latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()), initdb_lsn: metadata.initdb_lsn(), @@ -1592,23 +1581,11 @@ impl LayeredTimeline { // Define partitioning schema if needed if let Ok(pgdir) = tenant_mgr::get_timeline_for_tenant_load(self.tenantid, self.timelineid) { - pgdir.repartition(self.get_last_record_lsn())?; - } - - // 1. The partitioning was already done by the code in - // pgdatadir_mapping.rs. We just use it here. - let partitioning_guard = self.partitioning.read().unwrap(); - if let Some((partitioning, lsn)) = partitioning_guard.as_ref() { + let (partitioning, lsn) = pgdir.repartition(self.get_last_record_lsn())?; let timer = self.create_images_time_histo.start_timer(); - // Make a copy of the partitioning, so that we can release - // the lock. Otherwise we could block the WAL receiver. - let lsn = *lsn; - let parts = partitioning.parts.clone(); - drop(partitioning_guard); - // 2. Create new image layers for partitions that have been modified // "enough". - for part in parts.iter() { + for part in partitioning.parts.iter() { if self.time_for_new_image_layer(part, lsn, 3)? { self.create_image_layer(part, lsn)?; } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 75ace4ecee..fbd1b56180 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -6,7 +6,7 @@ //! walingest.rs handles a few things like implicit relation creation and extension. //! Clarify that) //! -use crate::keyspace::{KeySpace, KeySpaceAccum, TARGET_FILE_SIZE_BYTES}; +use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceAccum, TARGET_FILE_SIZE_BYTES}; use crate::reltag::{RelTag, SlruKind}; use crate::repository::*; use crate::repository::{Repository, Timeline}; @@ -18,10 +18,9 @@ use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::ops::Range; use std::sync::atomic::{AtomicIsize, Ordering}; -use std::sync::{Arc, RwLockReadGuard}; +use std::sync::{Arc, RwLock, RwLockReadGuard}; use tracing::{debug, error, trace, warn}; use zenith_utils::bin_ser::BeSer; -use zenith_utils::lsn::AtomicLsn; use zenith_utils::lsn::Lsn; /// Block number within a relation or SLRU. This matches PostgreSQL's BlockNumber type. @@ -38,7 +37,7 @@ where pub tline: Arc, /// When did we last calculate the partitioning? - last_partitioning: AtomicLsn, + partitioning: RwLock<(KeyPartitioning, Lsn)>, /// Configuration: how often should the partitioning be recalculated. repartition_threshold: u64, @@ -51,7 +50,7 @@ impl DatadirTimeline { pub fn new(tline: Arc, repartition_threshold: u64) -> Self { DatadirTimeline { tline, - last_partitioning: AtomicLsn::new(0), + partitioning: RwLock::new((KeyPartitioning::new(), Lsn(0))), current_logical_size: AtomicIsize::new(0), repartition_threshold, } @@ -389,15 +388,19 @@ impl DatadirTimeline { Ok(result.to_keyspace()) } - pub fn repartition(&self, lsn: Lsn) -> Result<()> { - let last_partitioning = self.last_partitioning.load(); - if last_partitioning == Lsn(0) || lsn.0 - last_partitioning.0 > self.repartition_threshold { + pub fn repartition(&self, lsn: Lsn) -> Result<(KeyPartitioning, Lsn)> { + let partitioning_guard = self.partitioning.read().unwrap(); + if partitioning_guard.1 == Lsn(0) + || lsn.0 - partitioning_guard.1 .0 > self.repartition_threshold + { let keyspace = self.collect_keyspace(lsn)?; + drop(partitioning_guard); + let mut partitioning_guard = self.partitioning.write().unwrap(); let partitioning = keyspace.partition(TARGET_FILE_SIZE_BYTES); - self.tline.hint_partitioning(partitioning, lsn)?; - self.last_partitioning.store(lsn); + *partitioning_guard = (partitioning, lsn); + return Ok((partitioning_guard.0.clone(), lsn)); } - Ok(()) + Ok((partitioning_guard.0.clone(), partitioning_guard.1)) } } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index b960e037be..7e998b0ebe 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,4 +1,3 @@ -use crate::keyspace::KeyPartitioning; use crate::layered_repository::metadata::TimelineMetadata; use crate::remote_storage::RemoteIndex; use crate::walrecord::ZenithWalRecord; @@ -372,19 +371,6 @@ pub trait Timeline: Send + Sync { /// know anything about them here in the repository. fn checkpoint(&self, cconf: CheckpointConfig) -> Result<()>; - /// - /// Tell the implementation how the keyspace should be partitioned. - /// - /// FIXME: This is quite a hack. The code in pgdatadir_mapping.rs knows - /// which keys exist and what is the logical grouping of them. That's why - /// the code there (and in keyspace.rs) decides the partitioning, not the - /// layered_repository.rs implementation. That's a layering violation: - /// the Repository implementation ought to be responsible for the physical - /// layout, but currently it's more convenient to do it in pgdatadir_mapping.rs - /// rather than in layered_repository.rs. - /// - fn hint_partitioning(&self, partitioning: KeyPartitioning, lsn: Lsn) -> Result<()>; - /// /// Check that it is valid to request operations with that lsn. fn check_lsn_is_in_scope(