From 55de0b88f5b02fe4a77d7b78640b51ca9f236baa Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 25 Mar 2022 23:53:37 +0200 Subject: [PATCH] Hide remote timeline index access details --- pageserver/src/http/routes.rs | 30 ++++++---- pageserver/src/layered_repository.rs | 10 ++-- pageserver/src/remote_storage.rs | 9 ++- pageserver/src/remote_storage/storage_sync.rs | 58 ++++++++++--------- .../remote_storage/storage_sync/download.rs | 30 +++++----- .../src/remote_storage/storage_sync/index.rs | 34 +++++++++-- .../src/remote_storage/storage_sync/upload.rs | 49 +++++++--------- pageserver/src/repository.rs | 6 +- pageserver/src/tenant_mgr.rs | 10 ++-- pageserver/src/timelines.rs | 25 ++------ 10 files changed, 134 insertions(+), 127 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 3ca8b6334a..13e79f8f55 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -3,7 +3,6 @@ use std::sync::Arc; use anyhow::Result; use hyper::StatusCode; use hyper::{Body, Request, Response, Uri}; -use tokio::sync::RwLock; use tracing::*; use zenith_utils::auth::JwtAuth; use zenith_utils::http::endpoint::attach_openapi_ui; @@ -22,17 +21,14 @@ use zenith_utils::zid::{ZTenantTimelineId, ZTimelineId}; use super::models::{ StatusResponse, TenantCreateRequest, TenantCreateResponse, TimelineCreateRequest, }; -use crate::remote_storage::{schedule_timeline_download, RemoteTimelineIndex}; -use crate::timelines::{ - extract_remote_timeline_info, LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo, -}; +use crate::remote_storage::{schedule_timeline_download, RemoteIndex}; +use crate::timelines::{LocalTimelineInfo, RemoteTimelineInfo, TimelineInfo}; use crate::{config::PageServerConf, tenant_mgr, timelines, ZTenantId}; -#[derive(Debug)] struct State { conf: &'static PageServerConf, auth: Option>, - remote_index: Arc>, + remote_index: RemoteIndex, allowlist_routes: Vec, } @@ -40,7 +36,7 @@ impl State { fn new( conf: &'static PageServerConf, auth: Option>, - remote_index: Arc>, + remote_index: RemoteIndex, ) -> Self { let allowlist_routes = ["/v1/status", "/v1/doc", "/swagger.yml"] .iter() @@ -113,14 +109,24 @@ async fn timeline_list_handler(request: Request) -> Result, .await .map_err(ApiError::from_err)??; - let remote_index = get_state(&request).remote_index.read().await; let mut response_data = Vec::with_capacity(local_timeline_infos.len()); for (timeline_id, local_timeline_info) in local_timeline_infos { response_data.push(TimelineInfo { tenant_id, timeline_id, local: Some(local_timeline_info), - remote: extract_remote_timeline_info(tenant_id, timeline_id, &remote_index), + remote: get_state(&request) + .remote_index + .read() + .await + .timeline_entry(&ZTenantTimelineId { + tenant_id, + timeline_id, + }) + .map(|remote_entry| RemoteTimelineInfo { + remote_consistent_lsn: remote_entry.disk_consistent_lsn(), + awaits_download: remote_entry.get_awaits_download(), + }), }) } @@ -277,7 +283,7 @@ async fn tenant_create_handler(mut request: Request) -> Result) -> Result, ApiError> { pub fn make_router( conf: &'static PageServerConf, auth: Option>, - remote_index: Arc>, + remote_index: RemoteIndex, ) -> RouterBuilder { let spec = include_bytes!("openapi_spec.yml"); let mut router = attach_openapi_ui(endpoint::make_router(), spec, "/swagger.yml", "/v1/doc"); diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index ac0afcb275..bf5f52b18d 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -35,7 +35,7 @@ use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}; use crate::config::PageServerConf; use crate::page_cache; use crate::relish::*; -use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteTimelineIndex}; +use crate::remote_storage::{schedule_timeline_checkpoint_upload, RemoteIndex}; use crate::repository::{ BlockNumber, GcResult, Repository, RepositoryTimeline, Timeline, TimelineSyncStatusUpdate, TimelineWriter, ZenithWalRecord, @@ -132,7 +132,7 @@ pub struct LayeredRepository { // provides access to timeline data sitting in the remote storage // supposed to be used for retrieval of remote consistent lsn in walreceiver - remote_index: Arc>, + remote_index: RemoteIndex, /// Makes every timeline to backup their files to remote storage. upload_relishes: bool, @@ -355,8 +355,8 @@ impl Repository for LayeredRepository { Ok(()) } - fn get_remote_index(&self) -> &tokio::sync::RwLock { - self.remote_index.as_ref() + fn get_remote_index(&self) -> &RemoteIndex { + &self.remote_index } } @@ -511,7 +511,7 @@ impl LayeredRepository { conf: &'static PageServerConf, walredo_mgr: Arc, tenantid: ZTenantId, - remote_index: Arc>, + remote_index: RemoteIndex, upload_relishes: bool, ) -> LayeredRepository { LayeredRepository { diff --git a/pageserver/src/remote_storage.rs b/pageserver/src/remote_storage.rs index 6eb7bd910b..bdd6086b94 100644 --- a/pageserver/src/remote_storage.rs +++ b/pageserver/src/remote_storage.rs @@ -89,15 +89,14 @@ use std::{ collections::HashMap, ffi, fs, path::{Path, PathBuf}, - sync::Arc, }; use anyhow::{bail, Context}; -use tokio::{io, sync::RwLock}; +use tokio::io; use tracing::{debug, error, info}; use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; -pub use self::storage_sync::index::{RemoteTimelineIndex, TimelineIndexEntry}; +pub use self::storage_sync::index::{RemoteIndex, TimelineIndexEntry}; pub use self::storage_sync::{schedule_timeline_checkpoint_upload, schedule_timeline_download}; use self::{local_fs::LocalFs, rust_s3::S3}; use crate::layered_repository::ephemeral_file::is_ephemeral_file; @@ -120,7 +119,7 @@ type LocalTimelineInitStatuses = HashMap>, + pub remote_index: RemoteIndex, pub local_timeline_init_statuses: LocalTimelineInitStatuses, } @@ -172,7 +171,7 @@ pub fn start_local_timeline_sync( } Ok(SyncStartupData { local_timeline_init_statuses, - remote_index: Arc::new(RwLock::new(RemoteTimelineIndex::empty())), + remote_index: RemoteIndex::empty(), }) } } diff --git a/pageserver/src/remote_storage/storage_sync.rs b/pageserver/src/remote_storage/storage_sync.rs index b01b152e0a..9fe2ab2847 100644 --- a/pageserver/src/remote_storage/storage_sync.rs +++ b/pageserver/src/remote_storage/storage_sync.rs @@ -25,6 +25,7 @@ //! * all never local state gets scheduled for upload, such timelines are "local" and fully operational //! * the rest of the remote timelines are reported to pageserver, but not downloaded before they are actually accessed in pageserver, //! it may schedule the download on such occasions. +//! Then, the index is shared across pageserver under [`RemoteIndex`] guard to ensure proper synchronization. //! //! The synchronization unit is an archive: a set of timeline files (or relishes) and a special metadata file, all compressed into a blob. //! Currently, there's no way to process an archive partially, if the archive processing fails, it has to be started from zero next time again. @@ -80,10 +81,7 @@ use futures::stream::{FuturesUnordered, StreamExt}; use lazy_static::lazy_static; use tokio::{ runtime::Runtime, - sync::{ - mpsc::{self, UnboundedReceiver}, - RwLock, - }, + sync::mpsc::{self, UnboundedReceiver}, time::{Duration, Instant}, }; use tracing::*; @@ -92,8 +90,8 @@ use self::{ compression::ArchiveHeader, download::{download_timeline, DownloadedTimeline}, index::{ - ArchiveDescription, ArchiveId, RemoteTimeline, RemoteTimelineIndex, TimelineIndexEntry, - TimelineIndexEntryInner, + ArchiveDescription, ArchiveId, RemoteIndex, RemoteTimeline, RemoteTimelineIndex, + TimelineIndexEntry, TimelineIndexEntryInner, }, upload::upload_timeline_checkpoint, }; @@ -392,13 +390,14 @@ pub(super) fn spawn_storage_sync_thread< None } }); - let mut remote_index = - RemoteTimelineIndex::try_parse_descriptions_from_paths(conf, download_paths); + let remote_index = RemoteIndex::try_parse_descriptions_from_paths(conf, download_paths); - let local_timeline_init_statuses = - schedule_first_sync_tasks(&mut remote_index, local_timeline_files); - let remote_index = Arc::new(RwLock::new(remote_index)); - let remote_index_cloned = Arc::clone(&remote_index); + let local_timeline_init_statuses = schedule_first_sync_tasks( + &mut runtime.block_on(remote_index.write()), + local_timeline_files, + ); + + let loop_index = remote_index.clone(); thread_mgr::spawn( ThreadKind::StorageSync, None, @@ -410,7 +409,7 @@ pub(super) fn spawn_storage_sync_thread< runtime, conf, receiver, - remote_index_cloned, + loop_index, storage, max_concurrent_sync, max_sync_errors, @@ -438,14 +437,14 @@ fn storage_sync_loop< runtime: Runtime, conf: &'static PageServerConf, mut receiver: UnboundedReceiver, - index: Arc>, + index: RemoteIndex, storage: S, max_concurrent_sync: NonZeroUsize, max_sync_errors: NonZeroU32, ) { - let remote_assets = Arc::new((storage, Arc::clone(&index))); + let remote_assets = Arc::new((storage, index.clone())); loop { - let index = Arc::clone(&index); + let index = index.clone(); let loop_step = runtime.block_on(async { tokio::select! { new_timeline_states = loop_step( @@ -480,7 +479,7 @@ async fn loop_step< >( conf: &'static PageServerConf, receiver: &mut UnboundedReceiver, - remote_assets: Arc<(S, Arc>)>, + remote_assets: Arc<(S, RemoteIndex)>, max_concurrent_sync: NonZeroUsize, max_sync_errors: NonZeroU32, ) -> HashMap> { @@ -560,7 +559,7 @@ async fn process_task< S: RemoteStorage + Send + Sync + 'static, >( conf: &'static PageServerConf, - remote_assets: Arc<(S, Arc>)>, + remote_assets: Arc<(S, RemoteIndex)>, task: SyncTask, max_sync_errors: NonZeroU32, ) -> Option { @@ -584,7 +583,7 @@ async fn process_task< tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await; } - let remote_index = Arc::clone(&remote_assets.1); + let remote_index = &remote_assets.1; let sync_start = Instant::now(); let sync_name = task.kind.sync_name(); @@ -592,7 +591,7 @@ async fn process_task< SyncKind::Download(download_data) => { let download_result = download_timeline( conf, - remote_assets, + remote_assets.clone(), task.sync_id, download_data, task.retries + 1, @@ -772,7 +771,7 @@ async fn fetch_full_index< P: Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, >( - (storage, index): &(S, Arc>), + (storage, index): &(S, RemoteIndex), timeline_dir: &Path, id: ZTenantTimelineId, ) -> anyhow::Result { @@ -808,8 +807,9 @@ async fn fetch_full_index< } }; drop(index_read); // tokio rw lock is not upgradeable - let mut index_write = index.write().await; - index_write + index + .write() + .await .upgrade_timeline_entry(&id, full_index.clone()) .context("cannot upgrade timeline entry in remote index")?; Ok(full_index) @@ -855,7 +855,7 @@ mod test_utils { #[track_caller] pub async fn ensure_correct_timeline_upload( harness: &RepoHarness, - remote_assets: Arc<(LocalFs, Arc>)>, + remote_assets: Arc<(LocalFs, RemoteIndex)>, timeline_id: ZTimelineId, new_upload: NewCheckpoint, ) { @@ -872,7 +872,7 @@ mod test_utils { let (storage, index) = remote_assets.as_ref(); assert_index_descriptions( index, - RemoteTimelineIndex::try_parse_descriptions_from_paths( + &RemoteIndex::try_parse_descriptions_from_paths( harness.conf, remote_assets .0 @@ -914,7 +914,7 @@ mod test_utils { } pub async fn expect_timeline( - index: &Arc>, + index: &RemoteIndex, sync_id: ZTenantTimelineId, ) -> RemoteTimeline { if let Some(TimelineIndexEntryInner::Full(remote_timeline)) = index @@ -934,9 +934,11 @@ mod test_utils { #[track_caller] pub async fn assert_index_descriptions( - index: &Arc>, - expected_index_with_descriptions: RemoteTimelineIndex, + index: &RemoteIndex, + expected_index_with_descriptions: &RemoteIndex, ) { + let expected_index_with_descriptions = expected_index_with_descriptions.read().await; + let index_read = index.read().await; let actual_sync_ids = index_read.all_sync_ids().collect::>(); let expected_sync_ids = expected_index_with_descriptions diff --git a/pageserver/src/remote_storage/storage_sync/download.rs b/pageserver/src/remote_storage/storage_sync/download.rs index e5362b2973..32549c8650 100644 --- a/pageserver/src/remote_storage/storage_sync/download.rs +++ b/pageserver/src/remote_storage/storage_sync/download.rs @@ -3,7 +3,7 @@ use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc}; use anyhow::{ensure, Context}; -use tokio::{fs, sync::RwLock}; +use tokio::fs; use tracing::{debug, error, trace, warn}; use zenith_utils::zid::ZTenantId; @@ -20,8 +20,8 @@ use crate::{ }; use super::{ - index::{ArchiveId, RemoteTimeline, RemoteTimelineIndex}, - TimelineDownload, + index::{ArchiveId, RemoteTimeline}, + RemoteIndex, TimelineDownload, }; /// Timeline download result, with extra data, needed for downloading. @@ -47,7 +47,7 @@ pub(super) async fn download_timeline< S: RemoteStorage + Send + Sync + 'static, >( conf: &'static PageServerConf, - remote_assets: Arc<(S, Arc>)>, + remote_assets: Arc<(S, RemoteIndex)>, sync_id: ZTenantTimelineId, mut download: TimelineDownload, retries: u32, @@ -167,7 +167,7 @@ async fn try_download_archive< tenant_id, timeline_id, }: ZTenantTimelineId, - remote_assets: Arc<(S, Arc>)>, + remote_assets: Arc<(S, RemoteIndex)>, remote_timeline: &RemoteTimeline, archive_id: ArchiveId, files_to_skip: Arc>, @@ -255,16 +255,14 @@ mod tests { let repo_harness = RepoHarness::create("test_download_timeline")?; let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; - let index = Arc::new(RwLock::new( - RemoteTimelineIndex::try_parse_descriptions_from_paths( - repo_harness.conf, - storage - .list() - .await? - .into_iter() - .map(|storage_path| storage.local_path(&storage_path).unwrap()), - ), - )); + let index = RemoteIndex::try_parse_descriptions_from_paths( + repo_harness.conf, + storage + .list() + .await? + .into_iter() + .map(|storage_path| storage.local_path(&storage_path).unwrap()), + ); let remote_assets = Arc::new((storage, index)); let storage = &remote_assets.0; let index = &remote_assets.1; @@ -314,7 +312,7 @@ mod tests { .await; assert_index_descriptions( index, - RemoteTimelineIndex::try_parse_descriptions_from_paths( + &RemoteIndex::try_parse_descriptions_from_paths( repo_harness.conf, remote_assets .0 diff --git a/pageserver/src/remote_storage/storage_sync/index.rs b/pageserver/src/remote_storage/storage_sync/index.rs index 7d6b4881f7..d7bd1f1657 100644 --- a/pageserver/src/remote_storage/storage_sync/index.rs +++ b/pageserver/src/remote_storage/storage_sync/index.rs @@ -7,10 +7,12 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, path::{Path, PathBuf}, + sync::Arc, }; use anyhow::{bail, ensure, Context}; use serde::{Deserialize, Serialize}; +use tokio::sync::RwLock; use tracing::*; use zenith_utils::{ lsn::Lsn, @@ -55,11 +57,14 @@ pub struct RemoteTimelineIndex { timeline_entries: HashMap, } -impl RemoteTimelineIndex { +/// A wrapper to synchrnize access to the index, should be created and used before dealing with any [`RemoteTimelineIndex`]. +pub struct RemoteIndex(Arc>); + +impl RemoteIndex { pub fn empty() -> Self { - Self { + Self(Arc::new(RwLock::new(RemoteTimelineIndex { timeline_entries: HashMap::new(), - } + }))) } /// Attempts to parse file paths (not checking the file contents) and find files @@ -69,7 +74,9 @@ impl RemoteTimelineIndex { conf: &'static PageServerConf, paths: impl Iterator, ) -> Self { - let mut index = Self::empty(); + let mut index = RemoteTimelineIndex { + timeline_entries: HashMap::new(), + }; for path in paths { if let Err(e) = try_parse_index_entry(&mut index, conf, path.as_ref()) { debug!( @@ -79,9 +86,26 @@ impl RemoteTimelineIndex { ); } } - index + + Self(Arc::new(RwLock::new(index))) } + pub async fn read(&self) -> tokio::sync::RwLockReadGuard<'_, RemoteTimelineIndex> { + self.0.read().await + } + + pub async fn write(&self) -> tokio::sync::RwLockWriteGuard<'_, RemoteTimelineIndex> { + self.0.write().await + } +} + +impl Clone for RemoteIndex { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} + +impl RemoteTimelineIndex { pub fn timeline_entry(&self, id: &ZTenantTimelineId) -> Option<&TimelineIndexEntry> { self.timeline_entries.get(id) } diff --git a/pageserver/src/remote_storage/storage_sync/upload.rs b/pageserver/src/remote_storage/storage_sync/upload.rs index dfc4433694..76e92c2781 100644 --- a/pageserver/src/remote_storage/storage_sync/upload.rs +++ b/pageserver/src/remote_storage/storage_sync/upload.rs @@ -2,7 +2,6 @@ use std::{borrow::Cow, collections::BTreeSet, path::PathBuf, sync::Arc}; -use tokio::sync::RwLock; use tracing::{debug, error, warn}; use crate::{ @@ -17,7 +16,7 @@ use crate::{ }, }; -use super::{compression::ArchiveHeader, index::RemoteTimelineIndex, NewCheckpoint}; +use super::{compression::ArchiveHeader, NewCheckpoint, RemoteIndex}; /// Attempts to compress and upload given checkpoint files. /// No extra checks for overlapping files is made: download takes care of that, ensuring no non-metadata local timeline files are overwritten. @@ -29,7 +28,7 @@ pub(super) async fn upload_timeline_checkpoint< S: RemoteStorage + Send + Sync + 'static, >( config: &'static PageServerConf, - remote_assets: Arc<(S, Arc>)>, + remote_assets: Arc<(S, RemoteIndex)>, sync_id: ZTenantTimelineId, new_checkpoint: NewCheckpoint, retries: u32, @@ -156,7 +155,7 @@ async fn try_upload_checkpoint< S: RemoteStorage + Send + Sync + 'static, >( config: &'static PageServerConf, - remote_assets: Arc<(S, Arc>)>, + remote_assets: Arc<(S, RemoteIndex)>, sync_id: ZTenantTimelineId, new_checkpoint: &NewCheckpoint, files_to_skip: BTreeSet, @@ -238,16 +237,14 @@ mod tests { let repo_harness = RepoHarness::create("reupload_timeline")?; let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; - let index = Arc::new(RwLock::new( - RemoteTimelineIndex::try_parse_descriptions_from_paths( - repo_harness.conf, - storage - .list() - .await? - .into_iter() - .map(|storage_path| storage.local_path(&storage_path).unwrap()), - ), - )); + let index = RemoteIndex::try_parse_descriptions_from_paths( + repo_harness.conf, + storage + .list() + .await? + .into_iter() + .map(|storage_path| storage.local_path(&storage_path).unwrap()), + ); let remote_assets = Arc::new((storage, index)); let index = &remote_assets.1; @@ -436,16 +433,14 @@ mod tests { let repo_harness = RepoHarness::create("reupload_timeline_rejected")?; let sync_id = ZTenantTimelineId::new(repo_harness.tenant_id, TIMELINE_ID); let storage = LocalFs::new(tempdir()?.path().to_owned(), &repo_harness.conf.workdir)?; - let index = Arc::new(RwLock::new( - RemoteTimelineIndex::try_parse_descriptions_from_paths( - repo_harness.conf, - storage - .list() - .await? - .into_iter() - .map(|storage_path| storage.local_path(&storage_path).unwrap()), - ), - )); + let index = RemoteIndex::try_parse_descriptions_from_paths( + repo_harness.conf, + storage + .list() + .await? + .into_iter() + .map(|storage_path| storage.local_path(&storage_path).unwrap()), + ); let remote_assets = Arc::new((storage, index)); let storage = &remote_assets.0; let index = &remote_assets.1; @@ -464,7 +459,7 @@ mod tests { first_checkpoint, ) .await; - let after_first_uploads = RemoteTimelineIndex::try_parse_descriptions_from_paths( + let after_first_uploads = RemoteIndex::try_parse_descriptions_from_paths( repo_harness.conf, remote_assets .0 @@ -495,7 +490,7 @@ mod tests { 0, ) .await; - assert_index_descriptions(index, after_first_uploads.clone()).await; + assert_index_descriptions(index, &after_first_uploads).await; let checkpoint_with_uploaded_lsn = create_local_timeline( &repo_harness, @@ -511,7 +506,7 @@ mod tests { 0, ) .await; - assert_index_descriptions(index, after_first_uploads.clone()).await; + assert_index_descriptions(index, &after_first_uploads).await; Ok(()) } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 074bdf4d01..36273e6d6c 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -1,6 +1,6 @@ use crate::layered_repository::metadata::TimelineMetadata; use crate::relish::*; -use crate::remote_storage::RemoteTimelineIndex; +use crate::remote_storage::RemoteIndex; use crate::walrecord::MultiXactMember; use crate::CheckpointConfig; use anyhow::Result; @@ -91,7 +91,7 @@ pub trait Repository: Send + Sync { fn detach_timeline(&self, timeline_id: ZTimelineId) -> Result<()>; // Allows to retrieve remote timeline index from the repo. Used in walreceiver to grab remote consistent lsn. - fn get_remote_index(&self) -> &tokio::sync::RwLock; + fn get_remote_index(&self) -> &RemoteIndex; } /// A timeline, that belongs to the current repository. @@ -407,7 +407,7 @@ pub mod repo_harness { self.conf, walredo_mgr, self.tenant_id, - Arc::new(tokio::sync::RwLock::new(RemoteTimelineIndex::empty())), + RemoteIndex::empty(), false, )); // populate repo with locally available timelines diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 0bc18231c9..e7cc4ecbaf 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -3,7 +3,7 @@ use crate::config::PageServerConf; use crate::layered_repository::LayeredRepository; -use crate::remote_storage::RemoteTimelineIndex; +use crate::remote_storage::RemoteIndex; use crate::repository::{Repository, Timeline, TimelineSyncStatusUpdate}; use crate::thread_mgr; use crate::thread_mgr::ThreadKind; @@ -66,7 +66,7 @@ fn access_tenants() -> MutexGuard<'static, HashMap> { pub fn load_local_repo( conf: &'static PageServerConf, tenant_id: ZTenantId, - remote_index: &Arc>, + remote_index: &RemoteIndex, ) -> Arc { let mut m = access_tenants(); let tenant = m.entry(tenant_id).or_insert_with(|| { @@ -78,7 +78,7 @@ pub fn load_local_repo( conf, Arc::new(walredo_mgr), tenant_id, - Arc::clone(remote_index), + remote_index.clone(), conf.remote_storage_config.is_some(), )); Tenant { @@ -92,7 +92,7 @@ pub fn load_local_repo( /// Updates tenants' repositories, changing their timelines state in memory. pub fn apply_timeline_sync_status_updates( conf: &'static PageServerConf, - remote_index: Arc>, + remote_index: RemoteIndex, sync_status_updates: HashMap>, ) { if sync_status_updates.is_empty() { @@ -172,7 +172,7 @@ pub fn shutdown_all_tenants() { pub fn create_tenant_repository( conf: &'static PageServerConf, tenantid: ZTenantId, - remote_index: Arc>, + remote_index: RemoteIndex, ) -> Result> { match access_tenants().entry(tenantid) { Entry::Occupied(_) => { diff --git a/pageserver/src/timelines.rs b/pageserver/src/timelines.rs index 8c018ce70f..53c4124701 100644 --- a/pageserver/src/timelines.rs +++ b/pageserver/src/timelines.rs @@ -15,13 +15,13 @@ use std::{ use tracing::*; use zenith_utils::lsn::Lsn; -use zenith_utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}; +use zenith_utils::zid::{ZTenantId, ZTimelineId}; use zenith_utils::{crashsafe_dir, logging}; use crate::{ config::PageServerConf, layered_repository::metadata::TimelineMetadata, - remote_storage::RemoteTimelineIndex, + remote_storage::RemoteIndex, repository::{LocalTimelineState, Repository}, }; use crate::{import_datadir, LOG_FILE_NAME}; @@ -127,22 +127,6 @@ pub struct TimelineInfo { pub remote: Option, } -pub fn extract_remote_timeline_info( - tenant_id: ZTenantId, - timeline_id: ZTimelineId, - remote_index: &RemoteTimelineIndex, -) -> Option { - remote_index - .timeline_entry(&ZTenantTimelineId { - tenant_id, - timeline_id, - }) - .map(|remote_entry| RemoteTimelineInfo { - remote_consistent_lsn: remote_entry.disk_consistent_lsn(), - awaits_download: remote_entry.get_awaits_download(), - }) -} - #[derive(Debug, Clone, Copy)] pub struct PointInTime { pub timeline_id: ZTimelineId, @@ -179,7 +163,7 @@ pub fn init_pageserver( pub enum CreateRepo { Real { wal_redo_manager: Arc, - remote_index: Arc>, + remote_index: RemoteIndex, }, Dummy, } @@ -207,8 +191,7 @@ pub fn create_repo( // anymore, but I think that could still happen. let wal_redo_manager = Arc::new(crate::walredo::DummyRedoManager {}); - let remote_index = Arc::new(tokio::sync::RwLock::new(RemoteTimelineIndex::empty())); - (wal_redo_manager as _, remote_index) + (wal_redo_manager as _, RemoteIndex::empty()) } };