From 5ad82418a9ed9780ae77e5bd04ff9cfb02501a56 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 24 Nov 2021 18:04:48 +0300 Subject: [PATCH] Add upload thread --- Cargo.lock | 2 - pageserver/Cargo.toml | 2 +- pageserver/src/bin/pageserver.rs | 37 ++++++ pageserver/src/buffered_repository.rs | 109 +++++++++++++++++- .../src/layered_repository/delta_layer.rs | 6 +- .../src/layered_repository/image_layer.rs | 6 +- pageserver/src/lib.rs | 7 ++ pageserver/src/tenant_mgr.rs | 5 + 8 files changed, 164 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e6e2041c0d..f842c55a12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2572,8 +2572,6 @@ checksum = "b07db065a5cf61a7e4ba64f29e67db906fb1787316516c4e6e5ff0fea1efcd8a" [[package]] name = "yakv" version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ae1ec7c67193f20a10b113492c57f097c34958d5bdc3c974a65361559aa21d9" dependencies = [ "anyhow", "fs2", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 52c58587f6..0e469c85ee 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -38,7 +38,7 @@ const_format = "0.2.21" tracing = "0.1.27" signal-hook = {version = "0.3.10", features = ["extended-siginfo"] } #yakv = { path = "../../yakv" } -yakv = "0.2.1" +yakv = "0.2.2" lz4_flex = "0.9.0" postgres_ffi = { path = "../postgres_ffi" } diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index be99a2d8bb..995d7dca3d 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -42,6 +42,8 @@ struct CfgFileParams { listen_http_addr: Option, checkpoint_distance: Option, checkpoint_period: Option, + upload_distance: Option, + upload_period: Option, reconstruct_threshold: Option, gc_horizon: Option, gc_period: Option, @@ -104,6 +106,8 @@ impl CfgFileParams { listen_http_addr: get_arg("listen-http"), checkpoint_distance: get_arg("checkpoint_distance"), checkpoint_period: get_arg("checkpoint_period"), + upload_distance: get_arg("upload_distance"), + upload_period: get_arg("upload_period"), reconstruct_threshold: get_arg("reconstruct_threshold"), gc_horizon: get_arg("gc_horizon"), gc_period: get_arg("gc_period"), @@ -123,6 +127,8 @@ impl CfgFileParams { listen_http_addr: self.listen_http_addr.or(other.listen_http_addr), checkpoint_distance: self.checkpoint_distance.or(other.checkpoint_distance), checkpoint_period: self.checkpoint_period.or(other.checkpoint_period), + upload_distance: self.upload_distance.or(other.upload_distance), + upload_period: self.upload_period.or(other.upload_period), reconstruct_threshold: self.reconstruct_threshold.or(other.reconstruct_threshold), gc_horizon: self.gc_horizon.or(other.gc_horizon), gc_period: self.gc_period.or(other.gc_period), @@ -161,6 +167,15 @@ impl CfgFileParams { None => DEFAULT_CHECKPOINT_PERIOD, }; + let upload_distance: u64 = match self.upload_distance.as_ref() { + Some(upload_distance_str) => upload_distance_str.parse()?, + None => DEFAULT_UPLOAD_DISTANCE, + }; + let upload_period = match self.upload_period.as_ref() { + Some(upload_period_str) => humantime::parse_duration(upload_period_str)?, + None => DEFAULT_UPLOAD_PERIOD, + }; + let reconstruct_threshold: u64 = match self.reconstruct_threshold.as_ref() { Some(reconstruct_threshold_str) => reconstruct_threshold_str.parse()?, None => DEFAULT_RECONSTRUCT_THRESHOLD, @@ -244,6 +259,8 @@ impl CfgFileParams { listen_http_addr, checkpoint_distance, checkpoint_period, + upload_distance, + upload_period, reconstruct_threshold, gc_horizon, gc_period, @@ -305,6 +322,18 @@ fn main() -> Result<()> { .takes_value(true) .help("Interval between checkpoint iterations"), ) + .arg( + Arg::with_name("checkpoint_distance") + .long("checkpoint_distance") + .takes_value(true) + .help("Distance from current LSN to perform checkpoint of in-memory layers"), + ) + .arg( + Arg::with_name("upload_period") + .long("upload_period") + .takes_value(true) + .help("Interval between upload iterations"), + ) .arg( Arg::with_name("reconstruct_threshold") .long("reconstruct_threshold") @@ -615,6 +644,8 @@ mod tests { listen_http_addr: Some("listen_http_addr_VALUE".to_string()), checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()), checkpoint_period: Some("checkpoint_period_VALUE".to_string()), + upload_distance: Some("upload_distance_VALUE".to_string()), + upload_period: Some("upload_period_VALUE".to_string()), reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()), gc_horizon: Some("gc_horizon_VALUE".to_string()), gc_period: Some("gc_period_VALUE".to_string()), @@ -639,6 +670,8 @@ mod tests { listen_http_addr = 'listen_http_addr_VALUE' checkpoint_distance = 'checkpoint_distance_VALUE' checkpoint_period = 'checkpoint_period_VALUE' +upload_distance = 'upload_distance_VALUE' +upload_period = 'upload_period_VALUE' reconstruct_threshold = 'reconstruct_threshold_VALUE' gc_horizon = 'gc_horizon_VALUE' gc_period = 'gc_period_VALUE' @@ -674,6 +707,8 @@ local_path = 'relish_storage_local_VALUE' listen_http_addr: Some("listen_http_addr_VALUE".to_string()), checkpoint_distance: Some("checkpoint_distance_VALUE".to_string()), checkpoint_period: Some("checkpoint_period_VALUE".to_string()), + upload_distance: Some("upload_distance_VALUE".to_string()), + upload_period: Some("upload_period_VALUE".to_string()), reconstruct_threshold: Some("reconstruct_threshold_VALUE".to_string()), gc_horizon: Some("gc_horizon_VALUE".to_string()), gc_period: Some("gc_period_VALUE".to_string()), @@ -701,6 +736,8 @@ local_path = 'relish_storage_local_VALUE' listen_http_addr = 'listen_http_addr_VALUE' checkpoint_distance = 'checkpoint_distance_VALUE' checkpoint_period = 'checkpoint_period_VALUE' +upload_distance = 'upload_distance_VALUE' +upload_period = 'upload_period_VALUE' reconstruct_threshold = 'reconstruct_threshold_VALUE' gc_horizon = 'gc_horizon_VALUE' gc_period = 'gc_period_VALUE' diff --git a/pageserver/src/buffered_repository.rs b/pageserver/src/buffered_repository.rs index 91be882bf0..54b049b148 100644 --- a/pageserver/src/buffered_repository.rs +++ b/pageserver/src/buffered_repository.rs @@ -427,6 +427,53 @@ impl BufferedRepository { Ok(()) } + /// + /// Launch the S3 uppload thread in given repository. + /// + pub fn launch_upload_thread( + conf: &'static PageServerConf, + rc: Arc, + ) -> JoinHandle<()> { + std::thread::Builder::new() + .name("Upload thread".into()) + .spawn(move || { + // FIXME: relaunch it? Panic is not good. + rc.upload_loop(conf).expect("Checkpointer thread died"); + }) + .unwrap() + } + + /// + /// Upload thread's main loop + /// + fn upload_loop(&self, conf: &'static PageServerConf) -> Result<()> { + while !tenant_mgr::shutdown_requested() { + std::thread::sleep(conf.upload_period); + info!("upload thread for tenant {} waking up", self.tenantid); + + { + let timelines: Vec<(ZTimelineId, Arc)> = self + .timelines + .lock() + .unwrap() + .iter() + .map(|pair| (*pair.0, pair.1.clone())) + .collect(); + for (timelineid, timeline) in timelines.iter() { + let _entered = + info_span!("upload", timeline = %timelineid, tenant = %self.tenantid) + .entered(); + + STORAGE_TIME + .with_label_values(&["upload_timed"]) + .observe_closure_duration(|| timeline.upload_internal())? + } + } + } + trace!("Upload thread shut down"); + Ok(()) + } + /// /// Launch the GC thread in given repository. /// @@ -1001,9 +1048,11 @@ impl Timeline for BufferedTimeline { blknum: 0, lsn: Lsn(0), }); + let nosync = true; // currently proceed block number let mut from_blknum = 0; + let mut last_lsn = Lsn(0); let mut page_versions: Vec<(u32, Lsn, PageVersion)> = Vec::new(); 'pages: loop { let iter = store.data.range(&from.ser()?..); @@ -1011,7 +1060,7 @@ impl Timeline for BufferedTimeline { let pair = entry?; if let StoreKey::Data(dk) = StoreKey::des(&pair.0)? { let same_seg = from_rel == dk.rel - && dk.blknum / RELISH_SEG_SIZE < from_blknum / RELISH_SEG_SIZE; + && dk.blknum / RELISH_SEG_SIZE == from_blknum / RELISH_SEG_SIZE; if !same_seg && from_rel != zero_rel { let is_dropped = dropped.contains(&from_rel); let segtag = SegmentTag::from_blknum(from_rel, from_blknum); @@ -1026,8 +1075,10 @@ impl Timeline for BufferedTimeline { is_dropped, page_versions.iter().map(|t| (t.0, t.1, &t.2)), relsizes[&from_rel].clone(), + nosync, )?; page_versions.clear(); + last_lsn = Lsn(0); } if !is_dropped { let mut images: Vec = @@ -1048,6 +1099,7 @@ impl Timeline for BufferedTimeline { segtag, end_lsn, images, + nosync, )?; } } @@ -1060,7 +1112,7 @@ impl Timeline for BufferedTimeline { blknum: from_blknum, lsn: start_lsn, }); - } else if dk.lsn >= start_lsn { + } else if dk.lsn >= end_lsn { from_blknum += 1; from = StoreKey::Data(DataKey { rel: from_rel, @@ -1068,7 +1120,10 @@ impl Timeline for BufferedTimeline { lsn: start_lsn, }); } else { - page_versions.push((dk.blknum, dk.lsn, PageVersion::des(&pair.1)?)); + if dk.lsn != last_lsn { + last_lsn = dk.lsn; + page_versions.push((dk.blknum, dk.lsn, PageVersion::des(&pair.1)?)); + } continue; } continue 'pages; @@ -1092,6 +1147,7 @@ impl Timeline for BufferedTimeline { is_dropped, page_versions.iter().map(|t| (t.0, t.1, &t.2)), relsizes[&from_rel].clone(), + nosync, )?; } if !is_dropped { @@ -1112,6 +1168,7 @@ impl Timeline for BufferedTimeline { segtag, end_lsn, images, + nosync, )?; } } @@ -1369,6 +1426,52 @@ impl BufferedTimeline { Ok(result) } + /// + /// Upload materialized page versions to S3 + /// + fn upload_internal(&self) -> Result<()> { + /* + // TODO: remember LSN of previos backup + let start_lsn = Lsn(0); + let end_lsn = self.get_last_record_lsn(); + if start_lsn + self.conf.upload_distance < end_lsn { + self.export_timeline(start_lsn, end_lsn)?; + } + Ok(()) + */ + self.make_snapshot() + } + + fn make_snapshot(&self) -> Result<()> { + let store = self.store.read().unwrap(); + let now = Instant::now(); + if let Some(meta_hash) = &store.meta { + let lsn = self.get_last_record_lsn(); + for (rel, snap) in meta_hash.iter() { + let rel_size = snap.size; + for segno in 0..(rel_size + RELISH_SEG_SIZE - 1) / RELISH_SEG_SIZE { + let first_blknum = segno * RELISH_SEG_SIZE; + let last_blknum = u32::min(first_blknum + RELISH_SEG_SIZE, rel_size); + let images: Result> = (first_blknum..last_blknum) + .map(|blknum| self.get_page_at_lsn(*rel, blknum, lsn)) + .collect(); + let segtag = SegmentTag::from_blknum(*rel, first_blknum); + ImageLayer::create( + self.conf, + self.timelineid, + self.tenantid, + segtag, + lsn, + images?, + true, + )?; + } + } + } + info!("Make snapshot in {:?}", now.elapsed()); + Ok(()) + } + /// /// Matrialize last page versions /// diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index e4a0c2fccf..f1732bc938 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -379,6 +379,7 @@ impl DeltaLayer { dropped: bool, page_versions: impl Iterator, relsizes: VecMap, + nosync: bool, ) -> Result { if seg.rel.is_blocky() { assert!(!relsizes.is_empty()); @@ -451,8 +452,9 @@ impl DeltaLayer { // This flushes the underlying 'buf_writer'. let writer = book.close()?; - writer.get_ref().sync_all()?; - + if !nosync { + writer.get_ref().sync_all()?; + } trace!("saved {}", &path.display()); drop(inner); diff --git a/pageserver/src/layered_repository/image_layer.rs b/pageserver/src/layered_repository/image_layer.rs index 0af37b1c00..3ea8bb3568 100644 --- a/pageserver/src/layered_repository/image_layer.rs +++ b/pageserver/src/layered_repository/image_layer.rs @@ -258,6 +258,7 @@ impl ImageLayer { seg: SegmentTag, lsn: Lsn, base_images: Vec, + nosync: bool, ) -> Result { let image_type = if seg.rel.is_blocky() { let num_blocks: u32 = base_images.len().try_into()?; @@ -317,8 +318,9 @@ impl ImageLayer { // This flushes the underlying 'buf_writer'. let writer = book.close()?; - writer.get_ref().sync_all()?; - + if !nosync { + writer.get_ref().sync_all()?; + } trace!("saved {}", path.display()); drop(inner); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 518095c8f9..4a89bc24b4 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -36,6 +36,9 @@ pub mod defaults { pub const DEFAULT_CHECKPOINT_DISTANCE: u64 = 256 * 1024 * 1024; pub const DEFAULT_CHECKPOINT_PERIOD: Duration = Duration::from_secs(10); + pub const DEFAULT_UPLOAD_DISTANCE: u64 = 1024 * 1024 * 1024; + pub const DEFAULT_UPLOAD_PERIOD: Duration = Duration::from_secs(250); + pub const DEFAULT_RECONSTRUCT_THRESHOLD: u64 = 0; pub const DEFAULT_GC_HORIZON: u64 = 1024; @@ -66,6 +69,8 @@ pub struct PageServerConf { // page server crashes. pub checkpoint_distance: u64, pub checkpoint_period: Duration, + pub upload_period: Duration, + pub upload_distance: u64, pub reconstruct_threshold: u64, pub gc_horizon: u64, @@ -152,6 +157,8 @@ impl PageServerConf { daemonize: false, checkpoint_distance: defaults::DEFAULT_CHECKPOINT_DISTANCE, checkpoint_period: Duration::from_secs(10), + upload_distance: defaults::DEFAULT_UPLOAD_DISTANCE, + upload_period: defaults::DEFAULT_UPLOAD_PERIOD, reconstruct_threshold: defaults::DEFAULT_RECONSTRUCT_THRESHOLD, gc_horizon: defaults::DEFAULT_GC_HORIZON, gc_period: Duration::from_secs(10), diff --git a/pageserver/src/tenant_mgr.rs b/pageserver/src/tenant_mgr.rs index 5d4694e91a..36b3c2f8f3 100644 --- a/pageserver/src/tenant_mgr.rs +++ b/pageserver/src/tenant_mgr.rs @@ -62,6 +62,7 @@ fn access_tenants() -> MutexGuard<'static, HashMap> { struct TenantHandleEntry { checkpointer_handle: Option>, + uploader_handle: Option>, gc_handle: Option>, } @@ -107,11 +108,13 @@ fn init_repo(conf: &'static PageServerConf, tenant_id: ZTenantId) { let checkpointer_handle = BufferedRepository::launch_checkpointer_thread(conf, repo.clone()); let gc_handle = BufferedRepository::launch_gc_thread(conf, repo.clone()); + let uploader_handle = BufferedRepository::launch_upload_thread(conf, repo.clone()); let mut handles = TENANT_HANDLES.lock().unwrap(); let h = TenantHandleEntry { checkpointer_handle: Some(checkpointer_handle), gc_handle: Some(gc_handle), + uploader_handle: Some(uploader_handle), }; handles.insert(tenant_id, h); @@ -170,6 +173,8 @@ pub fn stop_tenant_threads(tenantid: ZTenantId) { if let Some(h) = handles.get_mut(&tenantid) { h.checkpointer_handle.take().map(JoinHandle::join); debug!("checkpointer for tenant {} has stopped", tenantid); + h.uploader_handle.take().map(JoinHandle::join); + debug!("uploader for tenant {} has stopped", tenantid); h.gc_handle.take().map(JoinHandle::join); debug!("gc for tenant {} has stopped", tenantid); }