From c19b4a65f96062e1aeb521e17fde204b27ca2158 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Thu, 18 Aug 2022 16:20:52 +0300 Subject: [PATCH] Remove Repository trait, rename LayeredRepository struct into Repository --- pageserver/src/http/routes.rs | 1 - pageserver/src/layered_repository.rs | 69 +++++++++------ pageserver/src/page_service.rs | 1 - pageserver/src/pgdatadir_mapping.rs | 4 +- pageserver/src/repository.rs | 85 ++----------------- pageserver/src/tenant_mgr.rs | 17 ++-- pageserver/src/tenant_tasks.rs | 1 - pageserver/src/timelines.rs | 15 ++-- .../src/walreceiver/connection_manager.rs | 5 +- .../src/walreceiver/walreceiver_connection.rs | 2 +- 10 files changed, 71 insertions(+), 129 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 8d300e554a..da21f6883a 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -12,7 +12,6 @@ use super::models::{ TimelineCreateRequest, }; use crate::layered_repository::{metadata::TimelineMetadata, Timeline}; -use crate::repository::Repository; use crate::repository::{LocalTimelineState, RepositoryTimeline}; use crate::storage_sync; use crate::storage_sync::index::{RemoteIndex, RemoteTimeline}; diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index c0f4aece54..a5877c8482 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -31,7 +31,7 @@ use crate::config::PageServerConf; use crate::storage_sync::index::RemoteIndex; use crate::tenant_config::{TenantConf, TenantConfOpt}; -use crate::repository::{GcResult, Repository, RepositoryTimeline}; +use crate::repository::{GcResult, RepositoryTimeline}; use crate::thread_mgr; use crate::walredo::WalRedoManager; use crate::CheckpointConfig; @@ -78,7 +78,7 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines"; /// /// Repository consists of multiple timelines. Keep them in a hash table. /// -pub struct LayeredRepository { +pub struct Repository { // Global pageserver config parameters pub conf: &'static PageServerConf, @@ -119,15 +119,19 @@ pub struct LayeredRepository { upload_layers: bool, } -/// Public interface -impl Repository for LayeredRepository { - fn get_timeline(&self, timelineid: ZTimelineId) -> Option> { +/// A repository corresponds to one .neon directory. One repository holds multiple +/// timelines, forked off from the same initial call to 'initdb'. +impl Repository { + /// Get Timeline handle for given zenith timeline ID. + /// This function is idempotent. It doesn't change internal state in any way. + pub fn get_timeline(&self, timelineid: ZTimelineId) -> Option> { let timelines = self.timelines.lock().unwrap(); self.get_timeline_internal(timelineid, &timelines) .map(RepositoryTimeline::from) } - fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result> { + /// Get Timeline handle for locally available timeline. Load it into memory if it is not loaded. + pub fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result> { let mut timelines = self.timelines.lock().unwrap(); match self.get_timeline_load_internal(timelineid, &mut timelines)? { Some(local_loaded_timeline) => Ok(local_loaded_timeline), @@ -138,7 +142,9 @@ impl Repository for LayeredRepository { } } - fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline)> { + /// Lists timelines the repository contains. + /// Up to repository's implementation to omit certain timelines that ar not considered ready for use. + pub fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline)> { self.timelines .lock() .unwrap() @@ -152,7 +158,9 @@ impl Repository for LayeredRepository { .collect() } - fn create_empty_timeline( + /// Create a new, empty timeline. The caller is responsible for loading data into it + /// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it. + pub fn create_empty_timeline( &self, timeline_id: ZTimelineId, initdb_lsn: Lsn, @@ -194,7 +202,7 @@ impl Repository for LayeredRepository { } /// Branch a timeline - fn branch_timeline( + pub fn branch_timeline( &self, src: ZTimelineId, dst: ZTimelineId, @@ -284,10 +292,16 @@ impl Repository for LayeredRepository { Ok(()) } - /// Public entry point to GC. All the logic is in the private - /// gc_iteration_internal function, this public facade just wraps it for - /// metrics collection. - fn gc_iteration( + /// perform one garbage collection iteration, removing old data files from disk. + /// this function is periodically called by gc thread. + /// also it can be explicitly requested through page server api 'do_gc' command. + /// + /// 'timelineid' specifies the timeline to GC, or None for all. + /// `horizon` specifies delta from last lsn to preserve all object versions (pitr interval). + /// `checkpoint_before_gc` parameter is used to force compaction of storage before GC + /// to make tests more deterministic. + /// TODO Do we still need it or we can call checkpoint explicitly in tests where needed? + pub fn gc_iteration( &self, target_timeline_id: Option, horizon: u64, @@ -305,7 +319,11 @@ impl Repository for LayeredRepository { }) } - fn compaction_iteration(&self) -> Result<()> { + /// Perform one compaction iteration. + /// This function is periodically called by compactor thread. + /// Also it can be explicitly requested per timeline through page server + /// api's 'compact' command. + pub fn compaction_iteration(&self) -> Result<()> { // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the // compactions. We don't want to block everything else while the @@ -333,12 +351,11 @@ impl Repository for LayeredRepository { Ok(()) } - /// /// Flush all in-memory data to disk. /// - /// Used at shutdown. + /// Used at graceful shutdown. /// - fn checkpoint(&self) -> Result<()> { + pub fn checkpoint(&self) -> Result<()> { // Scan through the hashmap and collect a list of all the timelines, // while holding the lock. Then drop the lock and actually perform the // checkpoints. We don't want to block everything else while the @@ -368,7 +385,8 @@ impl Repository for LayeredRepository { Ok(()) } - fn delete_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result<()> { + /// Removes timeline-related in-memory data + pub fn delete_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result<()> { // in order to be retriable detach needs to be idempotent // (or at least to a point that each time the detach is called it can make progress) let mut timelines = self.timelines.lock().unwrap(); @@ -405,7 +423,9 @@ impl Repository for LayeredRepository { Ok(()) } - fn attach_timeline(&self, timeline_id: ZTimelineId) -> Result<()> { + /// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization. + /// See [`crate::remote_storage`] for more details about the synchronization. + pub fn attach_timeline(&self, timeline_id: ZTimelineId) -> Result<()> { debug!("attach timeline_id: {}", timeline_id,); match self.timelines.lock().unwrap().entry(timeline_id) { Entry::Occupied(_) => bail!("We completed a download for a timeline that already exists in repository. This is a bug."), @@ -419,13 +439,14 @@ impl Repository for LayeredRepository { Ok(()) } - fn get_remote_index(&self) -> &RemoteIndex { + /// Allows to retrieve remote timeline index from the tenant. Used in walreceiver to grab remote consistent lsn. + pub fn get_remote_index(&self) -> &RemoteIndex { &self.remote_index } } /// Private functions -impl LayeredRepository { +impl Repository { pub fn get_checkpoint_distance(&self) -> u64 { let tenant_conf = self.tenant_conf.read().unwrap(); tenant_conf @@ -515,7 +536,7 @@ impl LayeredRepository { tenant_conf.update(&new_tenant_conf); - LayeredRepository::persist_tenant_config(self.conf, self.tenant_id, *tenant_conf)?; + Repository::persist_tenant_config(self.conf, self.tenant_id, *tenant_conf)?; Ok(()) } @@ -613,8 +634,8 @@ impl LayeredRepository { tenant_id: ZTenantId, remote_index: RemoteIndex, upload_layers: bool, - ) -> LayeredRepository { - LayeredRepository { + ) -> Repository { + Repository { tenant_id, file_lock: RwLock::new(()), conf, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index f5f1e4d7bd..e6114c0fc5 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -34,7 +34,6 @@ use crate::layered_repository::Timeline; use crate::pgdatadir_mapping::LsnForTimestamp; use crate::profiling::profpoint_start; use crate::reltag::RelTag; -use crate::repository::Repository; use crate::tenant_mgr; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index d10e48393c..beaac292ec 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -1390,8 +1390,8 @@ fn is_slru_block_key(key: Key) -> bool { // #[cfg(test)] -pub fn create_test_timeline( - repo: R, +pub fn create_test_timeline( + repo: crate::layered_repository::Repository, timeline_id: utils::zid::ZTimelineId, ) -> Result> { let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?; diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 5cdc27a846..d0e1ed24b6 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,6 +1,4 @@ use crate::layered_repository::metadata::TimelineMetadata; -use crate::layered_repository::Timeline; -use crate::storage_sync::index::RemoteIndex; use crate::walrecord::ZenithWalRecord; use anyhow::{bail, Result}; use byteorder::{ByteOrder, BE}; @@ -10,7 +8,7 @@ use std::fmt; use std::ops::{AddAssign, Range}; use std::sync::Arc; use std::time::Duration; -use utils::{lsn::Lsn, zid::ZTimelineId}; +use utils::lsn::Lsn; #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)] /// Key used in the Repository kv-store. @@ -178,76 +176,6 @@ impl Value { } } -/// -/// A repository corresponds to one .neon directory. One repository holds multiple -/// timelines, forked off from the same initial call to 'initdb'. -pub trait Repository: Send + Sync { - /// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization. - /// See [`crate::remote_storage`] for more details about the synchronization. - fn attach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>; - - /// Get Timeline handle for given zenith timeline ID. - /// This function is idempotent. It doesn't change internal state in any way. - fn get_timeline(&self, timelineid: ZTimelineId) -> Option>; - - /// Get Timeline handle for locally available timeline. Load it into memory if it is not loaded. - fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result>; - - /// Lists timelines the repository contains. - /// Up to repository's implementation to omit certain timelines that ar not considered ready for use. - fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline)>; - - /// Create a new, empty timeline. The caller is responsible for loading data into it - /// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it. - fn create_empty_timeline( - &self, - timeline_id: ZTimelineId, - initdb_lsn: Lsn, - ) -> Result>; - - /// Branch a timeline - fn branch_timeline( - &self, - src: ZTimelineId, - dst: ZTimelineId, - start_lsn: Option, - ) -> Result<()>; - - /// Flush all data to disk. - /// - /// this is used at graceful shutdown. - fn checkpoint(&self) -> Result<()>; - - /// perform one garbage collection iteration, removing old data files from disk. - /// this function is periodically called by gc thread. - /// also it can be explicitly requested through page server api 'do_gc' command. - /// - /// 'timelineid' specifies the timeline to GC, or None for all. - /// `horizon` specifies delta from last lsn to preserve all object versions (pitr interval). - /// `checkpoint_before_gc` parameter is used to force compaction of storage before GC - /// to make tests more deterministic. - /// TODO Do we still need it or we can call checkpoint explicitly in tests where needed? - fn gc_iteration( - &self, - timelineid: Option, - horizon: u64, - pitr: Duration, - checkpoint_before_gc: bool, - ) -> Result; - - /// Perform one compaction iteration. - /// This function is periodically called by compactor thread. - /// Also it can be explicitly requested per timeline through page server - /// api's 'compact' command. - fn compaction_iteration(&self) -> Result<()>; - - /// removes timeline-related in-memory data - fn delete_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result<()>; - - /// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn. - fn get_remote_index(&self) -> &RemoteIndex; -} - /// A timeline, that belongs to the current repository. pub enum RepositoryTimeline { /// Timeline, with its files present locally in pageserver's working directory. @@ -332,16 +260,17 @@ pub mod repo_harness { use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::{fs, path::PathBuf}; + use crate::storage_sync::index::RemoteIndex; use crate::{ config::PageServerConf, - layered_repository::LayeredRepository, + layered_repository::Repository, walredo::{WalRedoError, WalRedoManager}, }; use super::*; use crate::tenant_config::{TenantConf, TenantConfOpt}; use hex_literal::hex; - use utils::zid::ZTenantId; + use utils::zid::{ZTenantId, ZTimelineId}; pub const TIMELINE_ID: ZTimelineId = ZTimelineId::from_array(hex!("11223344556677881122334455667788")); @@ -427,14 +356,14 @@ pub mod repo_harness { }) } - pub fn load(&self) -> LayeredRepository { + pub fn load(&self) -> Repository { self.try_load().expect("failed to load test repo") } - pub fn try_load(&self) -> Result { + pub fn try_load(&self) -> Result { let walredo_mgr = Arc::new(TestRedoManager); - let repo = LayeredRepository::new( + let repo = Repository::new( self.conf, TenantConfOpt::from(self.tenant_conf), walredo_mgr, diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 36c3e569a6..5afa38c926 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -3,8 +3,7 @@ use crate::config::PageServerConf; use crate::http::models::TenantInfo; -use crate::layered_repository::{load_metadata, LayeredRepository, Timeline}; -use crate::repository::Repository; +use crate::layered_repository::{load_metadata, Repository, Timeline}; use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex}; use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData}; use crate::tenant_config::TenantConfOpt; @@ -94,7 +93,7 @@ mod tenants_state { struct Tenant { state: TenantState, /// Contains in-memory state, including the timeline that might not yet flushed on disk or loaded form disk. - repo: Arc, + repo: Arc, /// Timelines, located locally in the pageserver's datadir. /// Timelines can entirely be removed entirely by the `detach` operation only. /// @@ -365,7 +364,7 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow: Ok(()) } -pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result> { +pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result> { let m = tenants_state::read_tenants(); let tenant = m .get(&tenant_id) @@ -484,7 +483,7 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any } fn load_local_timeline( - repo: &LayeredRepository, + repo: &Repository, timeline_id: ZTimelineId, ) -> anyhow::Result> { let inmem_timeline = repo.get_timeline_load(timeline_id).with_context(|| { @@ -588,7 +587,7 @@ fn init_local_repository( } fn attach_downloaded_tenant( - repo: &LayeredRepository, + repo: &Repository, downloaded_timelines: HashSet, ) -> anyhow::Result<()> { let mut registration_queue = Vec::with_capacity(downloaded_timelines.len()); @@ -630,14 +629,14 @@ fn load_local_repo( conf: &'static PageServerConf, tenant_id: ZTenantId, remote_index: &RemoteIndex, -) -> anyhow::Result> { +) -> anyhow::Result> { let mut m = tenants_state::write_tenants(); let tenant = m.entry(tenant_id).or_insert_with(|| { // Set up a WAL redo manager, for applying WAL records. let walredo_mgr = PostgresRedoManager::new(conf, tenant_id); // Set up an object repository, for actual data storage. - let repo: Arc = Arc::new(LayeredRepository::new( + let repo: Arc = Arc::new(Repository::new( conf, TenantConfOpt::default(), Arc::new(walredo_mgr), @@ -653,7 +652,7 @@ fn load_local_repo( }); // Restore tenant config - let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?; + let tenant_conf = Repository::load_tenant_config(conf, tenant_id)?; tenant.repo.update_tenant_config(tenant_conf)?; Ok(Arc::clone(&tenant.repo)) diff --git a/pageserver/src/tenant_tasks.rs b/pageserver/src/tenant_tasks.rs index e51744d3cc..ca239ae254 100644 --- a/pageserver/src/tenant_tasks.rs +++ b/pageserver/src/tenant_tasks.rs @@ -5,7 +5,6 @@ use std::collections::HashMap; use std::ops::ControlFlow; use std::time::Duration; -use crate::repository::Repository; use crate::tenant_mgr::TenantState; use crate::thread_mgr::ThreadKind; use crate::{tenant_mgr, thread_mgr}; diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index 6a55dd286e..4f760751db 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -22,11 +22,10 @@ use crate::import_datadir; use crate::tenant_mgr; use crate::CheckpointConfig; use crate::{ - config::PageServerConf, repository::Repository, storage_sync::index::RemoteIndex, - tenant_config::TenantConfOpt, + config::PageServerConf, storage_sync::index::RemoteIndex, tenant_config::TenantConfOpt, }; use crate::{ - layered_repository::{LayeredRepository, Timeline}, + layered_repository::{Repository, Timeline}, walredo::WalRedoManager, }; @@ -42,7 +41,7 @@ pub fn create_repo( tenant_id: ZTenantId, wal_redo_manager: Arc, remote_index: RemoteIndex, -) -> Result> { +) -> Result> { let repo_dir = conf.tenant_path(&tenant_id); ensure!( !repo_dir.exists(), @@ -57,9 +56,9 @@ pub fn create_repo( info!("created directory structure in {}", repo_dir.display()); // Save tenant's config - LayeredRepository::persist_tenant_config(conf, tenant_id, tenant_conf)?; + Repository::persist_tenant_config(conf, tenant_id, tenant_conf)?; - Ok(Arc::new(LayeredRepository::new( + Ok(Arc::new(Repository::new( conf, tenant_conf, wal_redo_manager, @@ -104,11 +103,11 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> { // - run initdb to init temporary instance and get bootstrap data // - after initialization complete, remove the temp dir. // -fn bootstrap_timeline( +fn bootstrap_timeline( conf: &'static PageServerConf, tenantid: ZTenantId, tli: ZTimelineId, - repo: &R, + repo: &Repository, ) -> Result<()> { let initdb_path = conf .tenant_path(&tenantid) diff --git a/pageserver/src/walreceiver/connection_manager.rs b/pageserver/src/walreceiver/connection_manager.rs index 2fc44cb26a..912073a731 100644 --- a/pageserver/src/walreceiver/connection_manager.rs +++ b/pageserver/src/walreceiver/connection_manager.rs @@ -735,10 +735,7 @@ fn wal_stream_connection_string( #[cfg(test)] mod tests { - use crate::repository::{ - repo_harness::{RepoHarness, TIMELINE_ID}, - Repository, - }; + use crate::repository::repo_harness::{RepoHarness, TIMELINE_ID}; use super::*; diff --git a/pageserver/src/walreceiver/walreceiver_connection.rs b/pageserver/src/walreceiver/walreceiver_connection.rs index 283cc76e66..b5f266614e 100644 --- a/pageserver/src/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/walreceiver/walreceiver_connection.rs @@ -20,7 +20,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument}; use super::TaskEvent; use crate::{ - layered_repository::WalReceiverInfo, repository::Repository, tenant_mgr, walingest::WalIngest, + layered_repository::WalReceiverInfo, tenant_mgr, walingest::WalIngest, walrecord::DecodedWALRecord, }; use postgres_ffi::v14::waldecoder::WalStreamDecoder;