Remove Repository trait, rename LayeredRepository struct into Repository

This commit is contained in:
Kirill Bulatov
2022-08-18 16:20:52 +03:00
committed by Kirill Bulatov
parent 8043612334
commit c19b4a65f9
10 changed files with 71 additions and 129 deletions

View File

@@ -12,7 +12,6 @@ use super::models::{
TimelineCreateRequest,
};
use crate::layered_repository::{metadata::TimelineMetadata, Timeline};
use crate::repository::Repository;
use crate::repository::{LocalTimelineState, RepositoryTimeline};
use crate::storage_sync;
use crate::storage_sync::index::{RemoteIndex, RemoteTimeline};

View File

@@ -31,7 +31,7 @@ use crate::config::PageServerConf;
use crate::storage_sync::index::RemoteIndex;
use crate::tenant_config::{TenantConf, TenantConfOpt};
use crate::repository::{GcResult, Repository, RepositoryTimeline};
use crate::repository::{GcResult, RepositoryTimeline};
use crate::thread_mgr;
use crate::walredo::WalRedoManager;
use crate::CheckpointConfig;
@@ -78,7 +78,7 @@ pub const TIMELINES_SEGMENT_NAME: &str = "timelines";
///
/// Repository consists of multiple timelines. Keep them in a hash table.
///
pub struct LayeredRepository {
pub struct Repository {
// Global pageserver config parameters
pub conf: &'static PageServerConf,
@@ -119,15 +119,19 @@ pub struct LayeredRepository {
upload_layers: bool,
}
/// Public interface
impl Repository for LayeredRepository {
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Timeline>> {
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
impl Repository {
/// Get Timeline handle for given zenith timeline ID.
/// This function is idempotent. It doesn't change internal state in any way.
pub fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Timeline>> {
let timelines = self.timelines.lock().unwrap();
self.get_timeline_internal(timelineid, &timelines)
.map(RepositoryTimeline::from)
}
fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result<Arc<Timeline>> {
/// Get Timeline handle for locally available timeline. Load it into memory if it is not loaded.
pub fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result<Arc<Timeline>> {
let mut timelines = self.timelines.lock().unwrap();
match self.get_timeline_load_internal(timelineid, &mut timelines)? {
Some(local_loaded_timeline) => Ok(local_loaded_timeline),
@@ -138,7 +142,9 @@ impl Repository for LayeredRepository {
}
}
fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline<Timeline>)> {
/// Lists timelines the repository contains.
/// Up to repository's implementation to omit certain timelines that ar not considered ready for use.
pub fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline<Timeline>)> {
self.timelines
.lock()
.unwrap()
@@ -152,7 +158,9 @@ impl Repository for LayeredRepository {
.collect()
}
fn create_empty_timeline(
/// Create a new, empty timeline. The caller is responsible for loading data into it
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
pub fn create_empty_timeline(
&self,
timeline_id: ZTimelineId,
initdb_lsn: Lsn,
@@ -194,7 +202,7 @@ impl Repository for LayeredRepository {
}
/// Branch a timeline
fn branch_timeline(
pub fn branch_timeline(
&self,
src: ZTimelineId,
dst: ZTimelineId,
@@ -284,10 +292,16 @@ impl Repository for LayeredRepository {
Ok(())
}
/// Public entry point to GC. All the logic is in the private
/// gc_iteration_internal function, this public facade just wraps it for
/// metrics collection.
fn gc_iteration(
/// perform one garbage collection iteration, removing old data files from disk.
/// this function is periodically called by gc thread.
/// also it can be explicitly requested through page server api 'do_gc' command.
///
/// 'timelineid' specifies the timeline to GC, or None for all.
/// `horizon` specifies delta from last lsn to preserve all object versions (pitr interval).
/// `checkpoint_before_gc` parameter is used to force compaction of storage before GC
/// to make tests more deterministic.
/// TODO Do we still need it or we can call checkpoint explicitly in tests where needed?
pub fn gc_iteration(
&self,
target_timeline_id: Option<ZTimelineId>,
horizon: u64,
@@ -305,7 +319,11 @@ impl Repository for LayeredRepository {
})
}
fn compaction_iteration(&self) -> Result<()> {
/// Perform one compaction iteration.
/// This function is periodically called by compactor thread.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
pub fn compaction_iteration(&self) -> Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// compactions. We don't want to block everything else while the
@@ -333,12 +351,11 @@ impl Repository for LayeredRepository {
Ok(())
}
///
/// Flush all in-memory data to disk.
///
/// Used at shutdown.
/// Used at graceful shutdown.
///
fn checkpoint(&self) -> Result<()> {
pub fn checkpoint(&self) -> Result<()> {
// Scan through the hashmap and collect a list of all the timelines,
// while holding the lock. Then drop the lock and actually perform the
// checkpoints. We don't want to block everything else while the
@@ -368,7 +385,8 @@ impl Repository for LayeredRepository {
Ok(())
}
fn delete_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result<()> {
/// Removes timeline-related in-memory data
pub fn delete_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result<()> {
// in order to be retriable detach needs to be idempotent
// (or at least to a point that each time the detach is called it can make progress)
let mut timelines = self.timelines.lock().unwrap();
@@ -405,7 +423,9 @@ impl Repository for LayeredRepository {
Ok(())
}
fn attach_timeline(&self, timeline_id: ZTimelineId) -> Result<()> {
/// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization.
/// See [`crate::remote_storage`] for more details about the synchronization.
pub fn attach_timeline(&self, timeline_id: ZTimelineId) -> Result<()> {
debug!("attach timeline_id: {}", timeline_id,);
match self.timelines.lock().unwrap().entry(timeline_id) {
Entry::Occupied(_) => bail!("We completed a download for a timeline that already exists in repository. This is a bug."),
@@ -419,13 +439,14 @@ impl Repository for LayeredRepository {
Ok(())
}
fn get_remote_index(&self) -> &RemoteIndex {
/// Allows to retrieve remote timeline index from the tenant. Used in walreceiver to grab remote consistent lsn.
pub fn get_remote_index(&self) -> &RemoteIndex {
&self.remote_index
}
}
/// Private functions
impl LayeredRepository {
impl Repository {
pub fn get_checkpoint_distance(&self) -> u64 {
let tenant_conf = self.tenant_conf.read().unwrap();
tenant_conf
@@ -515,7 +536,7 @@ impl LayeredRepository {
tenant_conf.update(&new_tenant_conf);
LayeredRepository::persist_tenant_config(self.conf, self.tenant_id, *tenant_conf)?;
Repository::persist_tenant_config(self.conf, self.tenant_id, *tenant_conf)?;
Ok(())
}
@@ -613,8 +634,8 @@ impl LayeredRepository {
tenant_id: ZTenantId,
remote_index: RemoteIndex,
upload_layers: bool,
) -> LayeredRepository {
LayeredRepository {
) -> Repository {
Repository {
tenant_id,
file_lock: RwLock::new(()),
conf,

View File

@@ -34,7 +34,6 @@ use crate::layered_repository::Timeline;
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::profiling::profpoint_start;
use crate::reltag::RelTag;
use crate::repository::Repository;
use crate::tenant_mgr;
use crate::thread_mgr;
use crate::thread_mgr::ThreadKind;

View File

@@ -1390,8 +1390,8 @@ fn is_slru_block_key(key: Key) -> bool {
//
#[cfg(test)]
pub fn create_test_timeline<R: Repository>(
repo: R,
pub fn create_test_timeline(
repo: crate::layered_repository::Repository,
timeline_id: utils::zid::ZTimelineId,
) -> Result<std::sync::Arc<Timeline>> {
let tline = repo.create_empty_timeline(timeline_id, Lsn(8))?;

View File

@@ -1,6 +1,4 @@
use crate::layered_repository::metadata::TimelineMetadata;
use crate::layered_repository::Timeline;
use crate::storage_sync::index::RemoteIndex;
use crate::walrecord::ZenithWalRecord;
use anyhow::{bail, Result};
use byteorder::{ByteOrder, BE};
@@ -10,7 +8,7 @@ use std::fmt;
use std::ops::{AddAssign, Range};
use std::sync::Arc;
use std::time::Duration;
use utils::{lsn::Lsn, zid::ZTimelineId};
use utils::lsn::Lsn;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
/// Key used in the Repository kv-store.
@@ -178,76 +176,6 @@ impl Value {
}
}
///
/// A repository corresponds to one .neon directory. One repository holds multiple
/// timelines, forked off from the same initial call to 'initdb'.
pub trait Repository: Send + Sync {
/// Updates timeline based on the `TimelineSyncStatusUpdate`, received from the remote storage synchronization.
/// See [`crate::remote_storage`] for more details about the synchronization.
fn attach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>;
/// Get Timeline handle for given zenith timeline ID.
/// This function is idempotent. It doesn't change internal state in any way.
fn get_timeline(&self, timelineid: ZTimelineId) -> Option<RepositoryTimeline<Timeline>>;
/// Get Timeline handle for locally available timeline. Load it into memory if it is not loaded.
fn get_timeline_load(&self, timelineid: ZTimelineId) -> Result<Arc<Timeline>>;
/// Lists timelines the repository contains.
/// Up to repository's implementation to omit certain timelines that ar not considered ready for use.
fn list_timelines(&self) -> Vec<(ZTimelineId, RepositoryTimeline<Timeline>)>;
/// Create a new, empty timeline. The caller is responsible for loading data into it
/// Initdb lsn is provided for timeline impl to be able to perform checks for some operations against it.
fn create_empty_timeline(
&self,
timeline_id: ZTimelineId,
initdb_lsn: Lsn,
) -> Result<Arc<Timeline>>;
/// Branch a timeline
fn branch_timeline(
&self,
src: ZTimelineId,
dst: ZTimelineId,
start_lsn: Option<Lsn>,
) -> Result<()>;
/// Flush all data to disk.
///
/// this is used at graceful shutdown.
fn checkpoint(&self) -> Result<()>;
/// perform one garbage collection iteration, removing old data files from disk.
/// this function is periodically called by gc thread.
/// also it can be explicitly requested through page server api 'do_gc' command.
///
/// 'timelineid' specifies the timeline to GC, or None for all.
/// `horizon` specifies delta from last lsn to preserve all object versions (pitr interval).
/// `checkpoint_before_gc` parameter is used to force compaction of storage before GC
/// to make tests more deterministic.
/// TODO Do we still need it or we can call checkpoint explicitly in tests where needed?
fn gc_iteration(
&self,
timelineid: Option<ZTimelineId>,
horizon: u64,
pitr: Duration,
checkpoint_before_gc: bool,
) -> Result<GcResult>;
/// Perform one compaction iteration.
/// This function is periodically called by compactor thread.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
fn compaction_iteration(&self) -> Result<()>;
/// removes timeline-related in-memory data
fn delete_timeline(&self, timeline_id: ZTimelineId) -> anyhow::Result<()>;
/// Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn.
fn get_remote_index(&self) -> &RemoteIndex;
}
/// A timeline, that belongs to the current repository.
pub enum RepositoryTimeline<T> {
/// Timeline, with its files present locally in pageserver's working directory.
@@ -332,16 +260,17 @@ pub mod repo_harness {
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::{fs, path::PathBuf};
use crate::storage_sync::index::RemoteIndex;
use crate::{
config::PageServerConf,
layered_repository::LayeredRepository,
layered_repository::Repository,
walredo::{WalRedoError, WalRedoManager},
};
use super::*;
use crate::tenant_config::{TenantConf, TenantConfOpt};
use hex_literal::hex;
use utils::zid::ZTenantId;
use utils::zid::{ZTenantId, ZTimelineId};
pub const TIMELINE_ID: ZTimelineId =
ZTimelineId::from_array(hex!("11223344556677881122334455667788"));
@@ -427,14 +356,14 @@ pub mod repo_harness {
})
}
pub fn load(&self) -> LayeredRepository {
pub fn load(&self) -> Repository {
self.try_load().expect("failed to load test repo")
}
pub fn try_load(&self) -> Result<LayeredRepository> {
pub fn try_load(&self) -> Result<Repository> {
let walredo_mgr = Arc::new(TestRedoManager);
let repo = LayeredRepository::new(
let repo = Repository::new(
self.conf,
TenantConfOpt::from(self.tenant_conf),
walredo_mgr,

View File

@@ -3,8 +3,7 @@
use crate::config::PageServerConf;
use crate::http::models::TenantInfo;
use crate::layered_repository::{load_metadata, LayeredRepository, Timeline};
use crate::repository::Repository;
use crate::layered_repository::{load_metadata, Repository, Timeline};
use crate::storage_sync::index::{RemoteIndex, RemoteTimelineIndex};
use crate::storage_sync::{self, LocalTimelineInitStatus, SyncStartupData};
use crate::tenant_config::TenantConfOpt;
@@ -94,7 +93,7 @@ mod tenants_state {
struct Tenant {
state: TenantState,
/// Contains in-memory state, including the timeline that might not yet flushed on disk or loaded form disk.
repo: Arc<LayeredRepository>,
repo: Arc<Repository>,
/// Timelines, located locally in the pageserver's datadir.
/// Timelines can entirely be removed entirely by the `detach` operation only.
///
@@ -365,7 +364,7 @@ pub fn set_tenant_state(tenant_id: ZTenantId, new_state: TenantState) -> anyhow:
Ok(())
}
pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result<Arc<LayeredRepository>> {
pub fn get_repository_for_tenant(tenant_id: ZTenantId) -> anyhow::Result<Arc<Repository>> {
let m = tenants_state::read_tenants();
let tenant = m
.get(&tenant_id)
@@ -484,7 +483,7 @@ pub fn detach_tenant(conf: &'static PageServerConf, tenant_id: ZTenantId) -> any
}
fn load_local_timeline(
repo: &LayeredRepository,
repo: &Repository,
timeline_id: ZTimelineId,
) -> anyhow::Result<Arc<Timeline>> {
let inmem_timeline = repo.get_timeline_load(timeline_id).with_context(|| {
@@ -588,7 +587,7 @@ fn init_local_repository(
}
fn attach_downloaded_tenant(
repo: &LayeredRepository,
repo: &Repository,
downloaded_timelines: HashSet<ZTimelineId>,
) -> anyhow::Result<()> {
let mut registration_queue = Vec::with_capacity(downloaded_timelines.len());
@@ -630,14 +629,14 @@ fn load_local_repo(
conf: &'static PageServerConf,
tenant_id: ZTenantId,
remote_index: &RemoteIndex,
) -> anyhow::Result<Arc<LayeredRepository>> {
) -> anyhow::Result<Arc<Repository>> {
let mut m = tenants_state::write_tenants();
let tenant = m.entry(tenant_id).or_insert_with(|| {
// Set up a WAL redo manager, for applying WAL records.
let walredo_mgr = PostgresRedoManager::new(conf, tenant_id);
// Set up an object repository, for actual data storage.
let repo: Arc<LayeredRepository> = Arc::new(LayeredRepository::new(
let repo: Arc<Repository> = Arc::new(Repository::new(
conf,
TenantConfOpt::default(),
Arc::new(walredo_mgr),
@@ -653,7 +652,7 @@ fn load_local_repo(
});
// Restore tenant config
let tenant_conf = LayeredRepository::load_tenant_config(conf, tenant_id)?;
let tenant_conf = Repository::load_tenant_config(conf, tenant_id)?;
tenant.repo.update_tenant_config(tenant_conf)?;
Ok(Arc::clone(&tenant.repo))

View File

@@ -5,7 +5,6 @@ use std::collections::HashMap;
use std::ops::ControlFlow;
use std::time::Duration;
use crate::repository::Repository;
use crate::tenant_mgr::TenantState;
use crate::thread_mgr::ThreadKind;
use crate::{tenant_mgr, thread_mgr};

View File

@@ -22,11 +22,10 @@ use crate::import_datadir;
use crate::tenant_mgr;
use crate::CheckpointConfig;
use crate::{
config::PageServerConf, repository::Repository, storage_sync::index::RemoteIndex,
tenant_config::TenantConfOpt,
config::PageServerConf, storage_sync::index::RemoteIndex, tenant_config::TenantConfOpt,
};
use crate::{
layered_repository::{LayeredRepository, Timeline},
layered_repository::{Repository, Timeline},
walredo::WalRedoManager,
};
@@ -42,7 +41,7 @@ pub fn create_repo(
tenant_id: ZTenantId,
wal_redo_manager: Arc<dyn WalRedoManager + Send + Sync>,
remote_index: RemoteIndex,
) -> Result<Arc<LayeredRepository>> {
) -> Result<Arc<Repository>> {
let repo_dir = conf.tenant_path(&tenant_id);
ensure!(
!repo_dir.exists(),
@@ -57,9 +56,9 @@ pub fn create_repo(
info!("created directory structure in {}", repo_dir.display());
// Save tenant's config
LayeredRepository::persist_tenant_config(conf, tenant_id, tenant_conf)?;
Repository::persist_tenant_config(conf, tenant_id, tenant_conf)?;
Ok(Arc::new(LayeredRepository::new(
Ok(Arc::new(Repository::new(
conf,
tenant_conf,
wal_redo_manager,
@@ -104,11 +103,11 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
// - run initdb to init temporary instance and get bootstrap data
// - after initialization complete, remove the temp dir.
//
fn bootstrap_timeline<R: Repository>(
fn bootstrap_timeline(
conf: &'static PageServerConf,
tenantid: ZTenantId,
tli: ZTimelineId,
repo: &R,
repo: &Repository,
) -> Result<()> {
let initdb_path = conf
.tenant_path(&tenantid)

View File

@@ -735,10 +735,7 @@ fn wal_stream_connection_string(
#[cfg(test)]
mod tests {
use crate::repository::{
repo_harness::{RepoHarness, TIMELINE_ID},
Repository,
};
use crate::repository::repo_harness::{RepoHarness, TIMELINE_ID};
use super::*;

View File

@@ -20,7 +20,7 @@ use tracing::{debug, error, info, info_span, trace, warn, Instrument};
use super::TaskEvent;
use crate::{
layered_repository::WalReceiverInfo, repository::Repository, tenant_mgr, walingest::WalIngest,
layered_repository::WalReceiverInfo, tenant_mgr, walingest::WalIngest,
walrecord::DecodedWALRecord,
};
use postgres_ffi::v14::waldecoder::WalStreamDecoder;