From 5719f13cb2b09f7d8394942f00799294954627dd Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Tue, 5 Oct 2021 10:15:56 +0300 Subject: [PATCH] Rework the relish thread model (#689) --- pageserver/src/bin/pageserver.rs | 14 ++- pageserver/src/branches.rs | 1 + pageserver/src/layered_repository.rs | 46 +++---- pageserver/src/relish_storage.rs | 35 +++++- .../src/relish_storage/storage_uploader.rs | 116 ------------------ .../src/relish_storage/synced_storage.rs | 52 ++++++++ pageserver/src/repository.rs | 1 + pageserver/src/tenant_mgr.rs | 81 ++++++++---- 8 files changed, 179 insertions(+), 167 deletions(-) delete mode 100644 pageserver/src/relish_storage/storage_uploader.rs create mode 100644 pageserver/src/relish_storage/synced_storage.rs diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 8cefe24519..21bf0d4ba1 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -24,8 +24,8 @@ use pageserver::{ DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR, DEFAULT_RELISH_STORAGE_MAX_CONCURRENT_SYNC_LIMITS, }, - http, page_service, tenant_mgr, PageServerConf, RelishStorageConfig, RelishStorageKind, - S3Config, LOG_FILE_NAME, + http, page_service, relish_storage, tenant_mgr, PageServerConf, RelishStorageConfig, + RelishStorageKind, S3Config, LOG_FILE_NAME, }; use zenith_utils::http::endpoint; @@ -483,12 +483,16 @@ fn start_pageserver(conf: &'static PageServerConf) -> Result<()> { } } + // keep join handles for spawned threads + // don't spawn threads before daemonizing + let mut join_handles = Vec::new(); + + if let Some(handle) = relish_storage::run_storage_sync_thread(conf)? { + join_handles.push(handle); + } // Initialize tenant manager. tenant_mgr::init(conf); - // keep join handles for spawned threads - let mut join_handles = vec![]; - // initialize authentication for incoming connections let auth = match &conf.auth_type { AuthType::Trust | AuthType::MD5 => None, diff --git a/pageserver/src/branches.rs b/pageserver/src/branches.rs index 3bd30bb26a..f17a081b13 100644 --- a/pageserver/src/branches.rs +++ b/pageserver/src/branches.rs @@ -151,6 +151,7 @@ pub fn create_repo( conf, wal_redo_manager, tenantid, + false, )); // Load data into pageserver diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index eeb9ada757..fe66dd7b30 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -34,7 +34,7 @@ use std::{fs, thread}; use crate::layered_repository::inmemory_layer::FreezeLayers; use crate::relish::*; -use crate::relish_storage::storage_uploader::QueueBasedRelishUploader; +use crate::relish_storage::schedule_timeline_upload; use crate::repository::{GcResult, Repository, Timeline, WALRecord}; use crate::walreceiver::IS_WAL_RECEIVER; use crate::walredo::WalRedoManager; @@ -119,7 +119,9 @@ pub struct LayeredRepository { timelines: Mutex>>, walredo_mgr: Arc, - relish_uploader: Option>, + /// Makes evey repo's timelines to backup their files to remote storage, + /// when they get frozen. + upload_relishes: bool, } /// Public interface @@ -151,8 +153,8 @@ impl Repository for LayeredRepository { timelineid, self.tenantid, Arc::clone(&self.walredo_mgr), - self.relish_uploader.as_ref().map(Arc::clone), 0, + false, )?; let timeline_rc = Arc::new(timeline); @@ -244,8 +246,8 @@ impl LayeredRepository { timelineid, self.tenantid, Arc::clone(&self.walredo_mgr), - self.relish_uploader.as_ref().map(Arc::clone), - 0, // init with 0 and update after layers are loaded + 0, // init with 0 and update after layers are loaded, + self.upload_relishes, )?; // List the layers on disk, and load them into the layer map @@ -278,15 +280,14 @@ impl LayeredRepository { conf: &'static PageServerConf, walredo_mgr: Arc, tenantid: ZTenantId, + upload_relishes: bool, ) -> LayeredRepository { LayeredRepository { tenantid, conf, timelines: Mutex::new(HashMap::new()), walredo_mgr, - relish_uploader: conf.relish_storage_config.as_ref().map(|config| { - Arc::new(QueueBasedRelishUploader::new(config, &conf.workdir).unwrap()) - }), + upload_relishes, } } @@ -588,8 +589,6 @@ pub struct LayeredTimeline { // WAL redo manager walredo_mgr: Arc, - relish_uploader: Option>, - // What page versions do we hold in the repository? If we get a // request > last_record_lsn, we need to wait until we receive all // the WAL up to the request. The SeqWait provides functions for @@ -637,6 +636,9 @@ pub struct LayeredTimeline { // TODO: it is possible to combine these two fields into single one using custom metric which uses SeqCst // ordering for its operations, but involves private modules, and macro trickery current_logical_size_gauge: IntGauge, + + /// If `true`, will backup its timeline files to remote storage after freezing. + upload_relishes: bool, } /// Public interface functions @@ -1020,8 +1022,8 @@ impl LayeredTimeline { timelineid: ZTimelineId, tenantid: ZTenantId, walredo_mgr: Arc, - relish_uploader: Option>, current_logical_size: usize, + upload_relishes: bool, ) -> Result { let current_logical_size_gauge = LOGICAL_TIMELINE_SIZE .get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()]) @@ -1033,7 +1035,6 @@ impl LayeredTimeline { layers: Mutex::new(LayerMap::default()), walredo_mgr, - relish_uploader, // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'. last_record_lsn: SeqWait::new(RecordLsn { @@ -1046,6 +1047,7 @@ impl LayeredTimeline { ancestor_lsn: metadata.ancestor_lsn, current_logical_size: AtomicUsize::new(current_logical_size), current_logical_size_gauge, + upload_relishes, }; Ok(timeline) } @@ -1448,12 +1450,6 @@ impl LayeredTimeline { created_historics = true; } - if let Some(relish_uploader) = &self.relish_uploader { - for label_path in new_historics.iter().filter_map(|layer| layer.path()) { - relish_uploader.schedule_upload(self.timelineid, label_path); - } - } - // Finally, replace the frozen in-memory layer with the new on-disk layers layers.remove_historic(frozen.clone()); @@ -1515,15 +1511,23 @@ impl LayeredTimeline { ancestor_timeline: ancestor_timelineid, ancestor_lsn: self.ancestor_lsn, }; - let metadata_path = LayeredRepository::save_metadata( + let _metadata_path = LayeredRepository::save_metadata( self.conf, self.timelineid, self.tenantid, &metadata, false, )?; - if let Some(relish_uploader) = &self.relish_uploader { - relish_uploader.schedule_upload(self.timelineid, metadata_path); + if self.upload_relishes { + schedule_timeline_upload(()) + // schedule_timeline_upload(LocalTimeline { + // tenant_id: self.tenantid, + // timeline_id: self.timelineid, + // metadata_path, + // image_layers: image_layer_uploads, + // delta_layers: delta_layer_uploads, + // disk_consistent_lsn, + // }); } // Also update the in-memory copy diff --git a/pageserver/src/relish_storage.rs b/pageserver/src/relish_storage.rs index 11e73711a3..a687abe489 100644 --- a/pageserver/src/relish_storage.rs +++ b/pageserver/src/relish_storage.rs @@ -8,14 +8,43 @@ mod local_fs; mod rust_s3; -/// A queue and the background machinery behind it to upload -/// local page server layer files to external storage. -pub mod storage_uploader; +/// A queue-based storage with the background machinery behind it to synchronize +/// local page server layer files with external storage. +mod synced_storage; use std::path::Path; +use std::thread; use anyhow::Context; +use self::local_fs::LocalFs; +pub use self::synced_storage::schedule_timeline_upload; +use crate::relish_storage::rust_s3::RustS3; +use crate::{PageServerConf, RelishStorageKind}; + +pub fn run_storage_sync_thread( + config: &'static PageServerConf, +) -> anyhow::Result>>> { + match &config.relish_storage_config { + Some(relish_storage_config) => { + let max_concurrent_sync = relish_storage_config.max_concurrent_sync; + match &relish_storage_config.storage { + RelishStorageKind::LocalFs(root) => synced_storage::run_storage_sync_thread( + config, + LocalFs::new(root.clone())?, + max_concurrent_sync, + ), + RelishStorageKind::AwsS3(s3_config) => synced_storage::run_storage_sync_thread( + config, + RustS3::new(s3_config)?, + max_concurrent_sync, + ), + } + } + None => Ok(None), + } +} + /// Storage (potentially remote) API to manage its state. #[async_trait::async_trait] pub trait RelishStorage: Send + Sync { diff --git a/pageserver/src/relish_storage/storage_uploader.rs b/pageserver/src/relish_storage/storage_uploader.rs deleted file mode 100644 index b8bb2cd72e..0000000000 --- a/pageserver/src/relish_storage/storage_uploader.rs +++ /dev/null @@ -1,116 +0,0 @@ -use std::{ - collections::VecDeque, - path::{Path, PathBuf}, - sync::{Arc, Mutex}, - thread, -}; - -use zenith_utils::zid::ZTimelineId; - -use crate::{relish_storage::RelishStorage, RelishStorageConfig, RelishStorageKind}; - -use super::{local_fs::LocalFs, rust_s3::RustS3}; - -pub struct QueueBasedRelishUploader { - upload_queue: Arc>>, -} - -impl QueueBasedRelishUploader { - pub fn new( - config: &RelishStorageConfig, - page_server_workdir: &'static Path, - ) -> anyhow::Result { - let upload_queue = Arc::new(Mutex::new(VecDeque::new())); - let _handle = match &config.storage { - RelishStorageKind::LocalFs(root) => { - let relish_storage = LocalFs::new(root.clone())?; - create_upload_thread( - Arc::clone(&upload_queue), - relish_storage, - page_server_workdir, - )? - } - RelishStorageKind::AwsS3(s3_config) => { - let relish_storage = RustS3::new(s3_config)?; - create_upload_thread( - Arc::clone(&upload_queue), - relish_storage, - page_server_workdir, - )? - } - }; - - Ok(Self { upload_queue }) - } - - pub fn schedule_upload(&self, timeline_id: ZTimelineId, relish_path: PathBuf) { - self.upload_queue - .lock() - .unwrap() - .push_back((timeline_id, relish_path)) - } -} - -fn create_upload_thread>( - upload_queue: Arc>>, - relish_storage: S, - page_server_workdir: &'static Path, -) -> std::io::Result> { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - thread::Builder::new() - .name("Queue based relish uploader".to_string()) - .spawn(move || loop { - runtime.block_on(async { - upload_loop_step(&upload_queue, &relish_storage, page_server_workdir).await; - }) - }) -} - -async fn upload_loop_step>( - upload_queue: &Mutex>, - relish_storage: &S, - page_server_workdir: &Path, -) { - let mut queue_accessor = upload_queue.lock().unwrap(); - log::debug!("current upload queue length: {}", queue_accessor.len()); - let next_upload = queue_accessor.pop_front(); - drop(queue_accessor); - - let (relish_timeline_id, relish_local_path) = match next_upload { - Some(data) => data, - None => { - // Don't spin and allow others to use the queue. - // In future, could be improved to be more clever about delays depending on relish upload stats - thread::sleep(std::time::Duration::from_secs(1)); - return; - } - }; - - if let Err(e) = upload_relish(relish_storage, page_server_workdir, &relish_local_path).await { - log::error!( - "Failed to upload relish '{}' for timeline {}, reason: {:#}", - relish_local_path.display(), - relish_timeline_id, - e - ); - upload_queue - .lock() - .unwrap() - .push_back((relish_timeline_id, relish_local_path)) - } else { - log::debug!("Relish successfully uploaded"); - } -} - -async fn upload_relish>( - relish_storage: &S, - page_server_workdir: &Path, - relish_local_path: &Path, -) -> anyhow::Result<()> { - let destination = S::derive_destination(page_server_workdir, relish_local_path)?; - relish_storage - .upload_relish(relish_local_path, &destination) - .await -} diff --git a/pageserver/src/relish_storage/synced_storage.rs b/pageserver/src/relish_storage/synced_storage.rs new file mode 100644 index 0000000000..f51e976a83 --- /dev/null +++ b/pageserver/src/relish_storage/synced_storage.rs @@ -0,0 +1,52 @@ +use std::time::Duration; +use std::{collections::BinaryHeap, sync::Mutex, thread}; + +use crate::{relish_storage::RelishStorage, PageServerConf}; + +lazy_static::lazy_static! { + static ref UPLOAD_QUEUE: Mutex> = Mutex::new(BinaryHeap::new()); +} + +pub fn schedule_timeline_upload(_local_timeline: ()) { + // UPLOAD_QUEUE + // .lock() + // .unwrap() + // .push(SyncTask::Upload(local_timeline)) +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +enum SyncTask {} + +pub fn run_storage_sync_thread< + P: std::fmt::Debug, + S: 'static + RelishStorage, +>( + config: &'static PageServerConf, + relish_storage: S, + max_concurrent_sync: usize, +) -> anyhow::Result>>> { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + let handle = thread::Builder::new() + .name("Queue based relish storage sync".to_string()) + .spawn(move || loop { + let mut queue_accessor = UPLOAD_QUEUE.lock().unwrap(); + log::debug!("Upload queue length: {}", queue_accessor.len()); + let next_task = queue_accessor.pop(); + drop(queue_accessor); + match next_task { + Some(task) => runtime.block_on(async { + // suppress warnings + let _ = (config, task, &relish_storage, max_concurrent_sync); + todo!("omitted for brevity") + }), + None => { + thread::sleep(Duration::from_secs(1)); + continue; + } + } + })?; + Ok(Some(handle)) +} diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 6bf15f9489..e4412aef6f 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -289,6 +289,7 @@ mod tests { self.conf, walredo_mgr, self.tenant_id, + false, )) } diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 76c5511050..b5b7252324 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -9,46 +9,84 @@ use crate::PageServerConf; use anyhow::{anyhow, bail, Context, Result}; use lazy_static::lazy_static; use log::info; +use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fs; use std::str::FromStr; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, MutexGuard}; use zenith_utils::zid::{ZTenantId, ZTimelineId}; lazy_static! { - pub static ref REPOSITORY: Mutex>> = + static ref REPOSITORY: Mutex>> = Mutex::new(HashMap::new()); } -pub fn init(conf: &'static PageServerConf) { - let mut m = REPOSITORY.lock().unwrap(); +fn access_repository() -> MutexGuard<'static, HashMap>> { + REPOSITORY.lock().unwrap() +} +pub fn init(conf: &'static PageServerConf) { + let mut m = access_repository(); for dir_entry in fs::read_dir(conf.tenants_path()).unwrap() { let tenantid = ZTenantId::from_str(dir_entry.unwrap().file_name().to_str().unwrap()).unwrap(); - - // Set up a WAL redo manager, for applying WAL records. - let walredo_mgr = PostgresRedoManager::new(conf, tenantid); - - // Set up an object repository, for actual data storage. - let repo = Arc::new(LayeredRepository::new( - conf, - Arc::new(walredo_mgr), - tenantid, - )); - LayeredRepository::launch_checkpointer_thread(conf, repo.clone()); - LayeredRepository::launch_gc_thread(conf, repo.clone()); - + let repo = init_repo(conf, tenantid); info!("initialized storage for tenant: {}", &tenantid); m.insert(tenantid, repo); } } +fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) -> Arc { + // Set up a WAL redo manager, for applying WAL records. + let walredo_mgr = PostgresRedoManager::new(conf, tenant_id); + + // Set up an object repository, for actual data storage. + let repo = Arc::new(LayeredRepository::new( + conf, + Arc::new(walredo_mgr), + tenant_id, + true, + )); + LayeredRepository::launch_checkpointer_thread(conf, repo.clone()); + LayeredRepository::launch_gc_thread(conf, repo.clone()); + repo +} + +// TODO kb Currently unused function, will later be used when the relish storage downloads a new layer. +// Relevant PR: https://github.com/zenithdb/zenith/pull/686 +pub fn register_relish_download( + conf: &'static PageServerConf, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, +) { + log::info!( + "Registering new download, tenant id {}, timeline id: {}", + tenant_id, + timeline_id + ); + match access_repository().entry(tenant_id) { + Entry::Occupied(o) => init_timeline(o.get().as_ref(), timeline_id), + Entry::Vacant(v) => { + log::info!("New repo initialized"); + let new_repo = init_repo(conf, tenant_id); + init_timeline(new_repo.as_ref(), timeline_id); + v.insert(new_repo); + } + } +} + +fn init_timeline(repo: &dyn Repository, timeline_id: ZTimelineId) { + match repo.get_timeline(timeline_id) { + Ok(_timeline) => log::info!("Successfully initialized timeline {}", timeline_id), + Err(e) => log::error!("Failed to init timeline {}, reason: {:#}", timeline_id, e), + } +} + pub fn create_repository_for_tenant( conf: &'static PageServerConf, tenantid: ZTenantId, ) -> Result<()> { - let mut m = REPOSITORY.lock().unwrap(); + let mut m = access_repository(); // First check that the tenant doesn't exist already if m.get(&tenantid).is_some() { @@ -63,13 +101,12 @@ pub fn create_repository_for_tenant( } pub fn insert_repository_for_tenant(tenantid: ZTenantId, repo: Arc) { - let o = &mut REPOSITORY.lock().unwrap(); - o.insert(tenantid, repo); + access_repository().insert(tenantid, repo); } pub fn get_repository_for_tenant(tenantid: ZTenantId) -> Result> { - let o = &REPOSITORY.lock().unwrap(); - o.get(&tenantid) + access_repository() + .get(&tenantid) .map(Arc::clone) .ok_or_else(|| anyhow!("repository not found for tenant name {}", tenantid)) }