From e6e883eb12503a3a013074c03f06d8a047f44c6c Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Wed, 11 May 2022 15:23:17 +0300 Subject: [PATCH 01/23] Do not set LSN for new FPI page (#1657) * Do not set LSN for new FPI page refer #1656 * Add page_is_new, page_get_lsn, page_set_lsn functions * Fix page_is_new implementation * Add comment from XLogReadBufferForRedoExtended --- libs/postgres_ffi/src/lib.rs | 19 +++++++++++++++++++ pageserver/src/walingest.rs | 11 +++++++++-- 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index 923fbe4d5a..28d9a13dbf 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -8,6 +8,7 @@ #![allow(deref_nullptr)] use serde::{Deserialize, Serialize}; +use utils::lsn::Lsn; include!(concat!(env!("OUT_DIR"), "/bindings.rs")); @@ -37,3 +38,21 @@ pub const fn transaction_id_precedes(id1: TransactionId, id2: TransactionId) -> let diff = id1.wrapping_sub(id2) as i32; diff < 0 } + +// Check if page is not yet initialized (port of Postgres PageIsInit() macro) +pub fn page_is_new(pg: &[u8]) -> bool { + pg[14] == 0 && pg[15] == 0 // pg_upper == 0 +} + +// ExtractLSN from page header +pub fn page_get_lsn(pg: &[u8]) -> Lsn { + Lsn( + ((u32::from_le_bytes(pg[0..4].try_into().unwrap()) as u64) << 32) + | u32::from_le_bytes(pg[4..8].try_into().unwrap()) as u64, + ) +} + +pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) { + pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes()); + pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes()); +} diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index fbdb328d2c..5223125ce6 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -24,6 +24,7 @@ use anyhow::Context; use postgres_ffi::nonrelfile_utils::clogpage_precedes; use postgres_ffi::nonrelfile_utils::slru_may_delete_clogsegment; +use postgres_ffi::{page_is_new, page_set_lsn}; use anyhow::Result; use bytes::{Buf, Bytes, BytesMut}; @@ -304,8 +305,14 @@ impl<'a, R: Repository> WalIngest<'a, R> { image.resize(image.len() + blk.hole_length as usize, 0u8); image.unsplit(tail); } - image[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes()); - image[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes()); + // + // Match the logic of XLogReadBufferForRedoExtended: + // The page may be uninitialized. If so, we can't set the LSN because + // that would corrupt the page. + // + if !page_is_new(&image) { + page_set_lsn(&mut image, lsn) + } assert_eq!(image.len(), pg_constants::BLCKSZ as usize); self.put_rel_page_image(modification, rel, blk.blkno, image.freeze())?; } else { From 5bd879f6418903a62b47758441a90153f9979237 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Wed, 11 May 2022 15:20:48 +0300 Subject: [PATCH 02/23] Proxy: update protocol after cluster->project rename --- proxy/src/auth_backend/console.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy/src/auth_backend/console.rs b/proxy/src/auth_backend/console.rs index 55a0889af4..41a822701f 100644 --- a/proxy/src/auth_backend/console.rs +++ b/proxy/src/auth_backend/console.rs @@ -117,7 +117,7 @@ async fn get_auth_info( let mut url = reqwest::Url::parse(&format!("{auth_endpoint}/proxy_get_role_secret"))?; url.query_pairs_mut() - .append_pair("cluster", cluster) + .append_pair("project", cluster) .append_pair("role", user); // TODO: use a proper logger @@ -141,7 +141,7 @@ async fn wake_compute( cluster: &str, ) -> Result<(String, u16), ConsoleAuthError> { let mut url = reqwest::Url::parse(&format!("{auth_endpoint}/proxy_wake_compute"))?; - url.query_pairs_mut().append_pair("cluster", cluster); + url.query_pairs_mut().append_pair("project", cluster); // TODO: use a proper logger println!("cplane request: {}", url); From b338b5dffef46264e3d35887d9698432d2a7cc40 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 11 May 2022 19:39:12 +0400 Subject: [PATCH 03/23] Make callmemaybe less agressive until we fix it/migrate to bigger machines. --- safekeeper/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index c848de9e71..03236d4e65 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -31,7 +31,7 @@ pub mod defaults { pub const DEFAULT_HTTP_LISTEN_PORT: u16 = 7676; pub const DEFAULT_HTTP_LISTEN_ADDR: &str = formatcp!("127.0.0.1:{DEFAULT_HTTP_LISTEN_PORT}"); - pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(1); + pub const DEFAULT_RECALL_PERIOD: Duration = Duration::from_secs(10); } #[derive(Debug, Clone)] From 20361395bb038659e476fb1566eb8ddff92612c6 Mon Sep 17 00:00:00 2001 From: Anton Shyrabokau <97127717+antons-antons@users.noreply.github.com> Date: Wed, 11 May 2022 11:36:53 -0700 Subject: [PATCH 04/23] Add zenith-us-stage-sk-5 to circleci inventory (#1665) Co-authored-by: Debian --- .circleci/ansible/staging.hosts | 1 + 1 file changed, 1 insertion(+) diff --git a/.circleci/ansible/staging.hosts b/.circleci/ansible/staging.hosts index 3ea815b907..b2bacb89ca 100644 --- a/.circleci/ansible/staging.hosts +++ b/.circleci/ansible/staging.hosts @@ -6,6 +6,7 @@ zenith-us-stage-ps-2 console_region_id=27 zenith-us-stage-sk-1 console_region_id=27 zenith-us-stage-sk-2 console_region_id=27 zenith-us-stage-sk-4 console_region_id=27 +zenith-us-stage-sk-5 console_region_id=27 [storage:children] pageservers From c8640910353a8c226f516d70e337d2eb137dfc88 Mon Sep 17 00:00:00 2001 From: Dhammika Pathirana Date: Wed, 11 May 2022 16:13:26 -0700 Subject: [PATCH 05/23] Fix err msg typo Signed-off-by: Dhammika Pathirana --- pageserver/src/layered_repository.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 01c2b961eb..6a614e184f 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -1512,7 +1512,7 @@ impl LayeredTimeline { .ensure_loaded() .with_context(|| { format!( - "Ancestor timeline is not is not loaded. Timeline id: {} Ancestor id {:?}", + "Ancestor timeline is not loaded. Timeline id: {} Ancestor id {:?}", self.timeline_id, self.get_ancestor_timeline_id(), ) From 2bde77fced256600295a0a1c09c6335aed679dac Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Thu, 12 May 2022 07:56:02 +0300 Subject: [PATCH 06/23] =?UTF-8?q?Do=20not=20apply=20records=20with=20LSN?= =?UTF-8?q?=20smaller=20than=20LSN=20of=20cached=20image=20in=20del?= =?UTF-8?q?=E2=80=A6=20(#1672)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Do not apply records with LSN smaller than LSN of cached image in delta layer * Do not apply records with LSN smaller than LSN of cached image in delta layer --- pageserver/src/layered_repository/delta_layer.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index e78b05695c..638df6f42a 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -254,6 +254,9 @@ impl Layer for DeltaLayer { return false; } let entry_lsn = DeltaKey::extract_lsn_from_buf(key); + if entry_lsn < lsn_range.start { + return false; + } offsets.push((entry_lsn, blob_ref.pos())); !blob_ref.will_init() From 5da4f3a4df88ac2b28565eea1604bbc8272a845e Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 12 May 2022 10:31:04 +0300 Subject: [PATCH 07/23] Refactor DeltaLayer::dump() function Put most of the code in a closure that returns Result, so that we can use the ?-operator for error handling. That's simpler. --- .../src/layered_repository/delta_layer.rs | 59 +++++++++---------- 1 file changed, 27 insertions(+), 32 deletions(-) diff --git a/pageserver/src/layered_repository/delta_layer.rs b/pageserver/src/layered_repository/delta_layer.rs index 638df6f42a..1c48f3def5 100644 --- a/pageserver/src/layered_repository/delta_layer.rs +++ b/pageserver/src/layered_repository/delta_layer.rs @@ -38,10 +38,6 @@ use crate::walrecord; use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION}; use anyhow::{bail, ensure, Context, Result}; use serde::{Deserialize, Serialize}; -use tracing::*; -// avoid binding to Write (conflicts with std::io::Write) -// while being able to use std::fmt::Write's methods -use std::fmt::Write as _; use std::fs; use std::io::{BufWriter, Write}; use std::io::{Seek, SeekFrom}; @@ -49,6 +45,7 @@ use std::ops::Range; use std::os::unix::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use tracing::*; use utils::{ bin_ser::BeSer, @@ -365,6 +362,28 @@ impl Layer for DeltaLayer { tree_reader.dump()?; let mut cursor = file.block_cursor(); + + // A subroutine to dump a single blob + let mut dump_blob = |blob_ref: BlobRef| -> anyhow::Result { + let buf = cursor.read_blob(blob_ref.pos())?; + let val = Value::des(&buf)?; + let desc = match val { + Value::Image(img) => { + format!(" img {} bytes", img.len()) + } + Value::WalRecord(rec) => { + let wal_desc = walrecord::describe_wal_record(&rec)?; + format!( + " rec {} bytes will_init: {} {}", + buf.len(), + rec.will_init(), + wal_desc + ) + } + }; + Ok(desc) + }; + tree_reader.visit( &[0u8; DELTA_KEY_SIZE], VisitDirection::Forwards, @@ -373,34 +392,10 @@ impl Layer for DeltaLayer { let key = DeltaKey::extract_key_from_buf(delta_key); let lsn = DeltaKey::extract_lsn_from_buf(delta_key); - let mut desc = String::new(); - match cursor.read_blob(blob_ref.pos()) { - Ok(buf) => { - let val = Value::des(&buf); - match val { - Ok(Value::Image(img)) => { - write!(&mut desc, " img {} bytes", img.len()).unwrap(); - } - Ok(Value::WalRecord(rec)) => { - let wal_desc = walrecord::describe_wal_record(&rec).unwrap(); - write!( - &mut desc, - " rec {} bytes will_init: {} {}", - buf.len(), - rec.will_init(), - wal_desc - ) - .unwrap(); - } - Err(err) => { - write!(&mut desc, " DESERIALIZATION ERROR: {}", err).unwrap(); - } - } - } - Err(err) => { - write!(&mut desc, " READ ERROR: {}", err).unwrap(); - } - } + let desc = match dump_blob(blob_ref) { + Ok(desc) => desc, + Err(err) => format!("ERROR: {}", err), + }; println!(" key {} at {}: {}", key, lsn, desc); true }, From b426775aa0dc3caa5287a91593c976f45fed0314 Mon Sep 17 00:00:00 2001 From: Alexey Kondratov Date: Thu, 12 May 2022 12:07:09 +0300 Subject: [PATCH 08/23] Use compute-tools from the new neondatabase Docker Hub repo --- vendor/postgres | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/postgres b/vendor/postgres index 9a9459a7f9..0ea7598329 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 9a9459a7f9cbcaa0e35ff1f2f34c419238fdec7e +Subproject commit 0ea7598329a83b818293137cc18bf7d42bf2fe68 From b10ae195b78835ba895d90ccc1573a0a018d8a28 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Thu, 12 May 2022 12:40:55 +0300 Subject: [PATCH 09/23] Set vendor/postgres back to the main branch I accidentally merged postgres PR that was referencing non-main branch. --- vendor/postgres | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendor/postgres b/vendor/postgres index 0ea7598329..d62ec22eff 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 0ea7598329a83b818293137cc18bf7d42bf2fe68 +Subproject commit d62ec22effeca7b5794ab2c15a3fd9ee5a4a5b99 From 4538f1e1b839556aab12e5aa7d1c38646253ec97 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Thu, 12 May 2022 14:18:35 +0300 Subject: [PATCH 10/23] Correctly operate etcd safekeeper timeline data --- libs/etcd_broker/src/lib.rs | 21 +++++++++++++------ safekeeper/src/broker.rs | 2 +- safekeeper/src/timeline.rs | 41 ++----------------------------------- 3 files changed, 18 insertions(+), 46 deletions(-) diff --git a/libs/etcd_broker/src/lib.rs b/libs/etcd_broker/src/lib.rs index 01cc0cf162..1b27f99ccf 100644 --- a/libs/etcd_broker/src/lib.rs +++ b/libs/etcd_broker/src/lib.rs @@ -51,7 +51,7 @@ pub struct SkTimelineInfo { #[serde(default)] pub peer_horizon_lsn: Option, #[serde(default)] - pub wal_stream_connection_string: Option, + pub safekeeper_connection_string: Option, } #[derive(Debug, thiserror::Error)] @@ -217,16 +217,22 @@ pub async fn subscribe_to_safekeeper_timeline_updates( break; } - let mut timeline_updates: HashMap> = - HashMap::new(); + let mut timeline_updates: HashMap> = HashMap::new(); + // Keep track that the timeline data updates from etcd arrive in the right order. + // https://etcd.io/docs/v3.5/learning/api_guarantees/#isolation-level-and-consistency-of-replicas + // > etcd does not ensure linearizability for watch operations. Users are expected to verify the revision of watch responses to ensure correct ordering. + let mut timeline_etcd_versions: HashMap = HashMap::new(); + let events = resp.events(); debug!("Processing {} events", events.len()); for event in events { if EventType::Put == event.event_type() { - if let Some(kv) = event.kv() { - match parse_etcd_key_value(subscription_kind, ®ex, kv) { + if let Some(new_etcd_kv) = event.kv() { + let new_kv_version = new_etcd_kv.version(); + + match parse_etcd_key_value(subscription_kind, ®ex, new_etcd_kv) { Ok(Some((zttid, timeline))) => { match timeline_updates .entry(zttid) @@ -234,12 +240,15 @@ pub async fn subscribe_to_safekeeper_timeline_updates( .entry(timeline.safekeeper_id) { hash_map::Entry::Occupied(mut o) => { - if o.get().flush_lsn < timeline.info.flush_lsn { + let old_etcd_kv_version = timeline_etcd_versions.get(&zttid).copied().unwrap_or(i64::MIN); + if old_etcd_kv_version < new_kv_version { o.insert(timeline.info); + timeline_etcd_versions.insert(zttid,new_kv_version); } } hash_map::Entry::Vacant(v) => { v.insert(timeline.info); + timeline_etcd_versions.insert(zttid,new_kv_version); } } } diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index c9ae1a8d98..d9c60c9db0 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -60,7 +60,7 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { // lock is held. for zttid in GlobalTimelines::get_active_timelines() { if let Ok(tli) = GlobalTimelines::get(&conf, zttid, false) { - let sk_info = tli.get_public_info()?; + let sk_info = tli.get_public_info(&conf)?; let put_opts = PutOptions::new().with_lease(lease.id()); client .put( diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 8b1072a54b..a12f628e06 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -89,7 +89,6 @@ struct SharedState { active: bool, num_computes: u32, pageserver_connstr: Option, - listen_pg_addr: String, last_removed_segno: XLogSegNo, } @@ -112,7 +111,6 @@ impl SharedState { active: false, num_computes: 0, pageserver_connstr: None, - listen_pg_addr: conf.listen_pg_addr.clone(), last_removed_segno: 0, }) } @@ -132,7 +130,6 @@ impl SharedState { active: false, num_computes: 0, pageserver_connstr: None, - listen_pg_addr: conf.listen_pg_addr.clone(), last_removed_segno: 0, }) } @@ -421,7 +418,7 @@ impl Timeline { } /// Prepare public safekeeper info for reporting. - pub fn get_public_info(&self) -> anyhow::Result { + pub fn get_public_info(&self, conf: &SafeKeeperConf) -> anyhow::Result { let shared_state = self.mutex.lock().unwrap(); Ok(SkTimelineInfo { last_log_term: Some(shared_state.sk.get_epoch()), @@ -435,18 +432,7 @@ impl Timeline { shared_state.sk.inmem.remote_consistent_lsn, )), peer_horizon_lsn: Some(shared_state.sk.inmem.peer_horizon_lsn), - wal_stream_connection_string: shared_state - .pageserver_connstr - .as_deref() - .map(|pageserver_connstr| { - wal_stream_connection_string( - self.zttid, - &shared_state.listen_pg_addr, - pageserver_connstr, - ) - }) - .transpose() - .context("Failed to get the pageserver callmemaybe connstr")?, + safekeeper_connection_string: Some(conf.listen_pg_addr.clone()), }) } @@ -504,29 +490,6 @@ impl Timeline { } } -// pageserver connstr is needed to be able to distinguish between different pageservers -// it is required to correctly manage callmemaybe subscriptions when more than one pageserver is involved -// TODO it is better to use some sort of a unique id instead of connection string, see https://github.com/zenithdb/zenith/issues/1105 -fn wal_stream_connection_string( - ZTenantTimelineId { - tenant_id, - timeline_id, - }: ZTenantTimelineId, - listen_pg_addr_str: &str, - pageserver_connstr: &str, -) -> anyhow::Result { - let me_connstr = format!("postgresql://no_user@{}/no_db", listen_pg_addr_str); - let me_conf = me_connstr - .parse::() - .with_context(|| { - format!("Failed to parse pageserver connection string '{me_connstr}' as a postgres one") - })?; - let (host, port) = utils::connstring::connection_host_port(&me_conf); - Ok(format!( - "host={host} port={port} options='-c ztimelineid={timeline_id} ztenantid={tenant_id} pageserver_connstr={pageserver_connstr}'", - )) -} - // Utilities needed by various Connection-like objects pub trait TimelineTools { fn set(&mut self, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool) -> Result<()>; From ec8861b8cc54f61d509925b67babc1af765c37ef Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Thu, 12 May 2022 19:53:07 +0300 Subject: [PATCH 11/23] Fix pageserver metrics names (#1682) Try to follow Prometheus style-guide https://prometheus.io/docs/practices/naming/ for metrics names. More specifically: - Use `pageserver_` prefix for all pagserver metrics - Specify `_seconds` unit in time metrics - Use unit as a suffix in other cases, such as `_hits`, `_bytes`, `_records` - Use `_total` suffix for accumulating counters (note that Histograms append that suffix internally) --- pageserver/src/layered_repository.rs | 14 +++++++------- pageserver/src/lib.rs | 2 +- pageserver/src/page_service.rs | 2 +- pageserver/src/storage_sync.rs | 4 ++-- pageserver/src/virtual_file.rs | 6 +++--- pageserver/src/walredo.rs | 8 ++++---- test_runner/fixtures/compare_fixtures.py | 4 ++-- 7 files changed, 20 insertions(+), 20 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 6a614e184f..b02ab00a21 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -89,7 +89,7 @@ pub use crate::layered_repository::ephemeral_file::writeback as writeback_epheme // Metrics collected on operations on the storage repository. lazy_static! { static ref STORAGE_TIME: HistogramVec = register_histogram_vec!( - "pageserver_storage_time", + "pageserver_storage_operations_seconds", "Time spent on storage operations", &["operation", "tenant_id", "timeline_id"] ) @@ -99,8 +99,8 @@ lazy_static! { // Metrics collected on operations on the storage repository. lazy_static! { static ref RECONSTRUCT_TIME: HistogramVec = register_histogram_vec!( - "pageserver_getpage_reconstruct_time", - "Time spent on storage operations", + "pageserver_getpage_reconstruct_seconds", + "Time spent in reconstruct_value", &["tenant_id", "timeline_id"] ) .expect("failed to define a metric"); @@ -108,13 +108,13 @@ lazy_static! { lazy_static! { static ref MATERIALIZED_PAGE_CACHE_HIT: IntCounterVec = register_int_counter_vec!( - "materialize_page_cache_hits", + "pageserver_materialized_cache_hits_total", "Number of cache hits from materialized page cache", &["tenant_id", "timeline_id"] ) .expect("failed to define a metric"); static ref WAIT_LSN_TIME: HistogramVec = register_histogram_vec!( - "wait_lsn_time", + "pageserver_wait_lsn_seconds", "Time spent waiting for WAL to arrive", &["tenant_id", "timeline_id"] ) @@ -134,12 +134,12 @@ lazy_static! { // or in testing they estimate how much we would upload if we did. lazy_static! { static ref NUM_PERSISTENT_FILES_CREATED: IntCounter = register_int_counter!( - "pageserver_num_persistent_files_created", + "pageserver_created_persistent_files_total", "Number of files created that are meant to be uploaded to cloud storage", ) .expect("failed to define a metric"); static ref PERSISTENT_BYTES_WRITTEN: IntCounter = register_int_counter!( - "pageserver_persistent_bytes_written", + "pageserver_written_persistent_bytes_total", "Total bytes written that are meant to be uploaded to cloud storage", ) .expect("failed to define a metric"); diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 83985069ec..fdce0e5c5f 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -45,7 +45,7 @@ pub const DELTA_FILE_MAGIC: u16 = 0x5A61; lazy_static! { static ref LIVE_CONNECTIONS_COUNT: IntGaugeVec = register_int_gauge_vec!( - "pageserver_live_connections_count", + "pageserver_live_connections", "Number of live network connections", &["pageserver_connection_kind"] ) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index da3dedfc84..88273cfa57 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -326,7 +326,7 @@ const TIME_BUCKETS: &[f64] = &[ lazy_static! { static ref SMGR_QUERY_TIME: HistogramVec = register_histogram_vec!( - "pageserver_smgr_query_time", + "pageserver_smgr_query_seconds", "Time spent on smgr query handling", &["smgr_query_type", "tenant_id", "timeline_id"], TIME_BUCKETS.into() diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index b8c6f7fdab..7755e67c8d 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -208,12 +208,12 @@ lazy_static! { ) .expect("failed to register pageserver remote storage remaining sync items int gauge"); static ref FATAL_TASK_FAILURES: IntCounter = register_int_counter!( - "pageserver_remote_storage_fatal_task_failures", + "pageserver_remote_storage_fatal_task_failures_total", "Number of critically failed tasks" ) .expect("failed to register pageserver remote storage remaining sync items int gauge"); static ref IMAGE_SYNC_TIME: HistogramVec = register_histogram_vec!( - "pageserver_remote_storage_image_sync_time", + "pageserver_remote_storage_image_sync_seconds", "Time took to synchronize (download or upload) a whole pageserver image. \ Grouped by `operation_kind` (upload|download) and `status` (success|failure)", &["operation_kind", "status"], diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 4ce245a74f..37d70372b5 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -34,7 +34,7 @@ const STORAGE_IO_TIME_BUCKETS: &[f64] = &[ lazy_static! { static ref STORAGE_IO_TIME: HistogramVec = register_histogram_vec!( - "pageserver_io_time", + "pageserver_io_operations_seconds", "Time spent in IO operations", &["operation", "tenant_id", "timeline_id"], STORAGE_IO_TIME_BUCKETS.into() @@ -43,8 +43,8 @@ lazy_static! { } lazy_static! { static ref STORAGE_IO_SIZE: IntGaugeVec = register_int_gauge_vec!( - "pageserver_io_size", - "Amount of bytes", + "pageserver_io_operations_bytes_total", + "Total amount of bytes read/written in IO operations", &["operation", "tenant_id", "timeline_id"] ) .expect("failed to define a metric"); diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 777718b311..e556c24548 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -106,16 +106,16 @@ impl crate::walredo::WalRedoManager for DummyRedoManager { // each tenant. lazy_static! { static ref WAL_REDO_TIME: Histogram = - register_histogram!("pageserver_wal_redo_time", "Time spent on WAL redo") + register_histogram!("pageserver_wal_redo_seconds", "Time spent on WAL redo") .expect("failed to define a metric"); static ref WAL_REDO_WAIT_TIME: Histogram = register_histogram!( - "pageserver_wal_redo_wait_time", + "pageserver_wal_redo_wait_seconds", "Time spent waiting for access to the WAL redo process" ) .expect("failed to define a metric"); static ref WAL_REDO_RECORD_COUNTER: IntCounter = register_int_counter!( - "pageserver_wal_records_replayed", - "Number of WAL records replayed" + "pageserver_replayed_wal_records_total", + "Number of WAL records replayed in WAL redo process" ) .unwrap(); } diff --git a/test_runner/fixtures/compare_fixtures.py b/test_runner/fixtures/compare_fixtures.py index d70f57aa52..d572901ed1 100644 --- a/test_runner/fixtures/compare_fixtures.py +++ b/test_runner/fixtures/compare_fixtures.py @@ -106,9 +106,9 @@ class ZenithCompare(PgCompare): report=MetricReport.LOWER_IS_BETTER) total_files = self.zenbenchmark.get_int_counter_value( - self.env.pageserver, "pageserver_num_persistent_files_created") + self.env.pageserver, "pageserver_created_persistent_files_total") total_bytes = self.zenbenchmark.get_int_counter_value( - self.env.pageserver, "pageserver_persistent_bytes_written") + self.env.pageserver, "pageserver_written_persistent_bytes_total") self.zenbenchmark.record("data_uploaded", total_bytes / (1024 * 1024), "MB", From 5812e26b906d8007aed1f3d407e52d0e126c6d18 Mon Sep 17 00:00:00 2001 From: Thang Pham Date: Thu, 12 May 2022 16:33:09 -0400 Subject: [PATCH 12/23] Create an initial timeline on CLI tenant creation (#1689) Resolves #1655 --- neon_local/src/main.rs | 23 +++++++++++++++++++ .../batch_others/test_ancestor_branch.py | 1 - test_runner/batch_others/test_zenith_cli.py | 12 +++++++++- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/neon_local/src/main.rs b/neon_local/src/main.rs index 8b54054080..75944fe107 100644 --- a/neon_local/src/main.rs +++ b/neon_local/src/main.rs @@ -540,6 +540,29 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an "tenant {} successfully created on the pageserver", new_tenant_id ); + + // Create an initial timeline for the new tenant + let new_timeline_id = parse_timeline_id(create_match)?; + let timeline = pageserver + .timeline_create(new_tenant_id, new_timeline_id, None, None)? + .context(format!( + "Failed to create initial timeline for tenant {new_tenant_id}" + ))?; + let new_timeline_id = timeline.timeline_id; + let last_record_lsn = timeline + .local + .context(format!("Failed to get last record LSN: no local timeline info for timeline {new_timeline_id}"))? + .last_record_lsn; + + env.register_branch_mapping( + DEFAULT_BRANCH_NAME.to_string(), + new_tenant_id, + new_timeline_id, + )?; + + println!( + "Created an initial timeline '{new_timeline_id}' at Lsn {last_record_lsn} for tenant: {new_tenant_id}", + ); } Some(("config", create_match)) => { let tenant_id = get_tenant_id(create_match, env)?; diff --git a/test_runner/batch_others/test_ancestor_branch.py b/test_runner/batch_others/test_ancestor_branch.py index d6b073492d..982921084f 100644 --- a/test_runner/batch_others/test_ancestor_branch.py +++ b/test_runner/batch_others/test_ancestor_branch.py @@ -35,7 +35,6 @@ def test_ancestor_branch(zenith_env_builder: ZenithEnvBuilder): with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur: pscur.execute("failpoints flush-frozen=sleep(10000)") - env.zenith_cli.create_timeline(f'main', tenant_id=tenant) pg_branch0 = env.postgres.create_start('main', tenant_id=tenant) branch0_cur = pg_branch0.connect().cursor() branch0_cur.execute("SHOW zenith.zenith_timeline") diff --git a/test_runner/batch_others/test_zenith_cli.py b/test_runner/batch_others/test_zenith_cli.py index 091d9ac8ba..81567dba12 100644 --- a/test_runner/batch_others/test_zenith_cli.py +++ b/test_runner/batch_others/test_zenith_cli.py @@ -1,7 +1,7 @@ import uuid import requests -from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient +from fixtures.zenith_fixtures import DEFAULT_BRANCH_NAME, ZenithEnv, ZenithEnvBuilder, ZenithPageserverHttpClient from typing import cast @@ -83,6 +83,16 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv): assert tenant2.hex in tenants +def test_cli_tenant_create(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + tenant_id = env.zenith_cli.create_tenant() + timelines = env.zenith_cli.list_timelines(tenant_id) + + # an initial timeline should be created upon tenant creation + assert len(timelines) == 1 + assert timelines[0][0] == DEFAULT_BRANCH_NAME + + def test_cli_ipv4_listeners(zenith_env_builder: ZenithEnvBuilder): # Start with single sk zenith_env_builder.num_safekeepers = 1 From ae20751724779986632a6cbc316b50c7568ff2d5 Mon Sep 17 00:00:00 2001 From: Thang Pham Date: Thu, 12 May 2022 17:27:08 -0400 Subject: [PATCH 13/23] update `ZenithCli::create_tenant` return signature (#1692) to include the initial timeline's ID in addition to the new tenant's ID. Context: follow-up of https://github.com/neondatabase/neon/pull/1689 --- .../batch_others/test_ancestor_branch.py | 2 +- test_runner/batch_others/test_tenant_conf.py | 2 +- .../batch_others/test_tenant_relocation.py | 2 +- test_runner/batch_others/test_tenants.py | 4 ++-- test_runner/batch_others/test_zenith_cli.py | 6 +++--- test_runner/fixtures/zenith_fixtures.py | 17 +++++++++++------ .../performance/test_bulk_tenant_create.py | 2 +- 7 files changed, 20 insertions(+), 15 deletions(-) diff --git a/test_runner/batch_others/test_ancestor_branch.py b/test_runner/batch_others/test_ancestor_branch.py index 982921084f..c07b9d6dd1 100644 --- a/test_runner/batch_others/test_ancestor_branch.py +++ b/test_runner/batch_others/test_ancestor_branch.py @@ -21,7 +21,7 @@ def test_ancestor_branch(zenith_env_builder: ZenithEnvBuilder): # Override defaults, 1M gc_horizon and 4M checkpoint_distance. # Extend compaction_period and gc_period to disable background compaction and gc. - tenant = env.zenith_cli.create_tenant( + tenant, _ = env.zenith_cli.create_tenant( conf={ 'gc_period': '10 m', 'gc_horizon': '1048576', diff --git a/test_runner/batch_others/test_tenant_conf.py b/test_runner/batch_others/test_tenant_conf.py index b85a541f10..d627d8a6ee 100644 --- a/test_runner/batch_others/test_tenant_conf.py +++ b/test_runner/batch_others/test_tenant_conf.py @@ -16,7 +16,7 @@ tenant_config={checkpoint_distance = 10000, compaction_target_size = 1048576}''' env = zenith_env_builder.init_start() """Test per tenant configuration""" - tenant = env.zenith_cli.create_tenant(conf={ + tenant, _ = env.zenith_cli.create_tenant(conf={ 'checkpoint_distance': '20000', 'gc_period': '30sec', }) diff --git a/test_runner/batch_others/test_tenant_relocation.py b/test_runner/batch_others/test_tenant_relocation.py index 7e71c0a157..20694a240c 100644 --- a/test_runner/batch_others/test_tenant_relocation.py +++ b/test_runner/batch_others/test_tenant_relocation.py @@ -107,7 +107,7 @@ def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, # create folder for remote storage mock remote_storage_mock_path = env.repo_dir / 'local_fs_remote_storage' - tenant = env.zenith_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209")) + tenant, _ = env.zenith_cli.create_tenant(UUID("74ee8b079a0e437eb0afea7d26a07209")) log.info("tenant to relocate %s", tenant) # attach does not download ancestor branches (should it?), just use root branch for now diff --git a/test_runner/batch_others/test_tenants.py b/test_runner/batch_others/test_tenants.py index 682af8de49..1b593cfee3 100644 --- a/test_runner/batch_others/test_tenants.py +++ b/test_runner/batch_others/test_tenants.py @@ -12,8 +12,8 @@ def test_tenants_normal_work(zenith_env_builder: ZenithEnvBuilder, with_safekeep env = zenith_env_builder.init_start() """Tests tenants with and without wal acceptors""" - tenant_1 = env.zenith_cli.create_tenant() - tenant_2 = env.zenith_cli.create_tenant() + tenant_1, _ = env.zenith_cli.create_tenant() + tenant_2, _ = env.zenith_cli.create_tenant() env.zenith_cli.create_timeline(f'test_tenants_normal_work_with_safekeepers{with_safekeepers}', tenant_id=tenant_1) diff --git a/test_runner/batch_others/test_zenith_cli.py b/test_runner/batch_others/test_zenith_cli.py index 81567dba12..bff17fa679 100644 --- a/test_runner/batch_others/test_zenith_cli.py +++ b/test_runner/batch_others/test_zenith_cli.py @@ -64,13 +64,13 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv): helper_compare_tenant_list(pageserver_http_client, env) # Create new tenant - tenant1 = env.zenith_cli.create_tenant() + tenant1, _ = env.zenith_cli.create_tenant() # check tenant1 appeared helper_compare_tenant_list(pageserver_http_client, env) # Create new tenant - tenant2 = env.zenith_cli.create_tenant() + tenant2, _ = env.zenith_cli.create_tenant() # check tenant2 appeared helper_compare_tenant_list(pageserver_http_client, env) @@ -85,7 +85,7 @@ def test_cli_tenant_list(zenith_simple_env: ZenithEnv): def test_cli_tenant_create(zenith_simple_env: ZenithEnv): env = zenith_simple_env - tenant_id = env.zenith_cli.create_tenant() + tenant_id, _ = env.zenith_cli.create_tenant() timelines = env.zenith_cli.list_timelines(tenant_id) # an initial timeline should be created upon tenant creation diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 3bb7c606d3..fe20f1abbf 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -831,20 +831,25 @@ class ZenithCli: def create_tenant(self, tenant_id: Optional[uuid.UUID] = None, - conf: Optional[Dict[str, str]] = None) -> uuid.UUID: + timeline_id: Optional[uuid.UUID] = None, + conf: Optional[Dict[str, str]] = None) -> Tuple[uuid.UUID, uuid.UUID]: """ Creates a new tenant, returns its id and its initial timeline's id. """ if tenant_id is None: tenant_id = uuid.uuid4() + if timeline_id is None: + timeline_id = uuid.uuid4() if conf is None: - res = self.raw_cli(['tenant', 'create', '--tenant-id', tenant_id.hex]) + res = self.raw_cli([ + 'tenant', 'create', '--tenant-id', tenant_id.hex, '--timeline-id', timeline_id.hex + ]) else: - res = self.raw_cli( - ['tenant', 'create', '--tenant-id', tenant_id.hex] + - sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), [])) + res = self.raw_cli([ + 'tenant', 'create', '--tenant-id', tenant_id.hex, '--timeline-id', timeline_id.hex + ] + sum(list(map(lambda kv: (['-c', kv[0] + ':' + kv[1]]), conf.items())), [])) res.check_returncode() - return tenant_id + return tenant_id, timeline_id def config_tenant(self, tenant_id: uuid.UUID, conf: Dict[str, str]): """ diff --git a/test_runner/performance/test_bulk_tenant_create.py b/test_runner/performance/test_bulk_tenant_create.py index f0729d3a07..0e16d3e749 100644 --- a/test_runner/performance/test_bulk_tenant_create.py +++ b/test_runner/performance/test_bulk_tenant_create.py @@ -30,7 +30,7 @@ def test_bulk_tenant_create( for i in range(tenants_count): start = timeit.default_timer() - tenant = env.zenith_cli.create_tenant() + tenant, _ = env.zenith_cli.create_tenant() env.zenith_cli.create_timeline( f'test_bulk_tenant_create_{tenants_count}_{i}_{use_safekeepers}', tenant_id=tenant) From 85884a1599895a9875c7f0139854aa7dae21148e Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 13 May 2022 00:42:13 +0300 Subject: [PATCH 14/23] Disable tenant relocation python test --- test_runner/batch_others/test_tenant_relocation.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test_runner/batch_others/test_tenant_relocation.py b/test_runner/batch_others/test_tenant_relocation.py index 20694a240c..279b3a0a25 100644 --- a/test_runner/batch_others/test_tenant_relocation.py +++ b/test_runner/batch_others/test_tenant_relocation.py @@ -95,6 +95,10 @@ def load(pg: Postgres, stop_event: threading.Event, load_ok_event: threading.Eve log.info('load thread stopped') +@pytest.mark.skip( + reason= + "needs to replace callmemaybe call with better idea how to migrate timelines between pageservers" +) @pytest.mark.parametrize('with_load', ['with_load', 'without_load']) def test_tenant_relocation(zenith_env_builder: ZenithEnvBuilder, port_distributor: PortDistributor, From 0030da57a8c6deb9795d8d9789b9996a976ad9c9 Mon Sep 17 00:00:00 2001 From: Stas Kelvich Date: Fri, 13 May 2022 02:24:08 +0300 Subject: [PATCH 15/23] compute-tools: grant rw priveleges to the all created users --- compute_tools/src/spec.rs | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 27114b8202..334e0a9e05 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -136,13 +136,20 @@ pub fn handle_roles(spec: &ClusterSpec, client: &mut Client) -> Result<()> { xact.execute(query.as_str(), &[])?; } } else { - info!("role name {}", &name); + info!("role name: '{}'", &name); let mut query: String = format!("CREATE ROLE {} ", name.quote()); - info!("role create query {}", &query); + info!("role create query: '{}'", &query); info_print!(" -> create"); query.push_str(&role.to_pg_options()); xact.execute(query.as_str(), &[])?; + + let grant_query = format!( + "grant pg_read_all_data, pg_write_all_data to {}", + name.quote() + ); + xact.execute(grant_query.as_str(), &[])?; + info!("role grant query: '{}'", &grant_query); } info_print!("\n"); From 51c0f9ab2b394a31358cfd187c7fdeb34372553e Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 13 May 2022 00:56:15 +0300 Subject: [PATCH 16/23] Force git version to be up to date via decl macro --- Cargo.lock | 4 ++++ libs/utils/build.rs | 3 --- libs/utils/src/lib.rs | 20 ++++++++++++++------ neon_local/Cargo.toml | 1 + neon_local/src/main.rs | 3 ++- pageserver/Cargo.toml | 1 + pageserver/src/bin/dump_layerfile.rs | 4 +++- pageserver/src/bin/pageserver.rs | 9 +++++---- pageserver/src/bin/update_metadata.rs | 4 +++- proxy/Cargo.toml | 1 + proxy/src/main.rs | 6 ++++-- safekeeper/Cargo.toml | 1 + safekeeper/src/bin/safekeeper.rs | 6 ++++-- 13 files changed, 43 insertions(+), 20 deletions(-) delete mode 100644 libs/utils/build.rs diff --git a/Cargo.lock b/Cargo.lock index 148517a777..e1e1a0f067 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1582,6 +1582,7 @@ dependencies = [ "clap 3.0.14", "comfy-table", "control_plane", + "git-version", "pageserver", "postgres", "postgres_ffi", @@ -1773,6 +1774,7 @@ dependencies = [ "daemonize", "fail", "futures", + "git-version", "hex", "hex-literal", "humantime", @@ -2164,6 +2166,7 @@ dependencies = [ "bytes", "clap 3.0.14", "futures", + "git-version", "hashbrown", "hex", "hmac 0.12.1", @@ -2616,6 +2619,7 @@ dependencies = [ "daemonize", "etcd_broker", "fs2", + "git-version", "hex", "humantime", "hyper", diff --git a/libs/utils/build.rs b/libs/utils/build.rs deleted file mode 100644 index ee3346ae66..0000000000 --- a/libs/utils/build.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("cargo:rerun-if-env-changed=GIT_VERSION"); -} diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index de266efe64..0398ce5e15 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -76,9 +76,17 @@ pub mod signals; // so if we changed the index state git_version will pick that up and rerun the macro. // // Note that with git_version prefix is `git:` and in case of git version from env its `git-env:`. -use git_version::git_version; -pub const GIT_VERSION: &str = git_version!( - prefix = "git:", - fallback = concat!("git-env:", env!("GIT_VERSION")), - args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha -); +#[macro_export] +// TODO kb add identifier into the capture +macro_rules! project_git_version { + () => { + const GIT_VERSION: &str = git_version::git_version!( + prefix = "git:", + fallback = concat!( + "git-env:", + env!("GIT_VERSION", "Missing GIT_VERSION envvar") + ), + args = ["--abbrev=40", "--always", "--dirty=-modified"] // always use full sha + ); + }; +} diff --git a/neon_local/Cargo.toml b/neon_local/Cargo.toml index 78d339789f..8ebd7d5c17 100644 --- a/neon_local/Cargo.toml +++ b/neon_local/Cargo.toml @@ -9,6 +9,7 @@ anyhow = "1.0" serde_json = "1" comfy-table = "5.0.1" postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } +git-version = "0.3.5" # FIXME: 'pageserver' is needed for BranchInfo. Refactor pageserver = { path = "../pageserver" } diff --git a/neon_local/src/main.rs b/neon_local/src/main.rs index 75944fe107..2f470309ff 100644 --- a/neon_local/src/main.rs +++ b/neon_local/src/main.rs @@ -21,7 +21,7 @@ use utils::{ lsn::Lsn, postgres_backend::AuthType, zid::{ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId}, - GIT_VERSION, + project_git_version, }; use pageserver::timelines::TimelineInfo; @@ -30,6 +30,7 @@ use pageserver::timelines::TimelineInfo; const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1); const DEFAULT_PAGESERVER_ID: ZNodeId = ZNodeId(1); const DEFAULT_BRANCH_NAME: &str = "main"; +project_git_version!(); fn default_conf() -> String { format!( diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index d4cceafc61..9cc8444531 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -52,6 +52,7 @@ nix = "0.23" once_cell = "1.8.0" crossbeam-utils = "0.8.5" fail = "0.5.0" +git-version = "0.3.5" postgres_ffi = { path = "../libs/postgres_ffi" } metrics = { path = "../libs/metrics" } diff --git a/pageserver/src/bin/dump_layerfile.rs b/pageserver/src/bin/dump_layerfile.rs index af73ef6bdb..cb08acadff 100644 --- a/pageserver/src/bin/dump_layerfile.rs +++ b/pageserver/src/bin/dump_layerfile.rs @@ -7,7 +7,9 @@ use pageserver::layered_repository::dump_layerfile_from_path; use pageserver::page_cache; use pageserver::virtual_file; use std::path::PathBuf; -use utils::GIT_VERSION; +use utils::project_git_version; + +project_git_version!(); fn main() -> Result<()> { let arg_matches = App::new("Zenith dump_layerfile utility") diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 9cb7e6f13d..73ef5c5f4d 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -20,17 +20,18 @@ use utils::{ http::endpoint, logging, postgres_backend::AuthType, + project_git_version, shutdown::exit_now, signals::{self, Signal}, tcp_listener, zid::{ZTenantId, ZTimelineId}, - GIT_VERSION, }; +project_git_version!(); + fn version() -> String { format!( - "{} profiling:{} failpoints:{}", - GIT_VERSION, + "{GIT_VERSION} profiling:{} failpoints:{}", cfg!(feature = "profiling"), fail::has_failpoints() ) @@ -217,7 +218,7 @@ fn start_pageserver(conf: &'static PageServerConf, daemonize: bool) -> Result<() // Initialize logger let log_file = logging::init(LOG_FILE_NAME, daemonize)?; - info!("version: {}", GIT_VERSION); + info!("version: {GIT_VERSION}"); // TODO: Check that it looks like a valid repository before going further diff --git a/pageserver/src/bin/update_metadata.rs b/pageserver/src/bin/update_metadata.rs index fae5e5c2e3..3e69ad5c66 100644 --- a/pageserver/src/bin/update_metadata.rs +++ b/pageserver/src/bin/update_metadata.rs @@ -6,7 +6,9 @@ use clap::{App, Arg}; use pageserver::layered_repository::metadata::TimelineMetadata; use std::path::PathBuf; use std::str::FromStr; -use utils::{lsn::Lsn, GIT_VERSION}; +use utils::{lsn::Lsn, project_git_version}; + +project_git_version!(); fn main() -> Result<()> { let arg_matches = App::new("Zenith update metadata utility") diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 43880d645a..4e45698e3e 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -33,6 +33,7 @@ tokio = { version = "1.17", features = ["macros"] } tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } tokio-rustls = "0.23.0" url = "2.2.2" +git-version = "0.3.5" utils = { path = "../libs/utils" } metrics = { path = "../libs/metrics" } diff --git a/proxy/src/main.rs b/proxy/src/main.rs index fc2a368b85..7d5105c88f 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -25,7 +25,9 @@ use config::ProxyConfig; use futures::FutureExt; use std::{future::Future, net::SocketAddr}; use tokio::{net::TcpListener, task::JoinError}; -use utils::GIT_VERSION; +use utils::project_git_version; + +project_git_version!(); /// Flattens `Result>` into `Result`. async fn flatten_err( @@ -124,7 +126,7 @@ async fn main() -> anyhow::Result<()> { auth_link_uri: arg_matches.value_of("uri").unwrap().parse()?, })); - println!("Version: {}", GIT_VERSION); + println!("Version: {GIT_VERSION}"); // Check that we can bind to address before further initialization println!("Starting http on {}", http_address); diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index 5e1ceee02e..417cf58cd5 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -29,6 +29,7 @@ hex = "0.4.3" const_format = "0.2.21" tokio-postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="d052ee8b86fff9897c77b0fe89ea9daba0e1fa38" } tokio-util = { version = "0.7", features = ["io"] } +git-version = "0.3.5" postgres_ffi = { path = "../libs/postgres_ffi" } metrics = { path = "../libs/metrics" } diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index d0df7093ff..06a15a90b0 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -22,11 +22,13 @@ use safekeeper::SafeKeeperConf; use safekeeper::{broker, callmemaybe}; use safekeeper::{http, s3_offload}; use utils::{ - http::endpoint, logging, shutdown::exit_now, signals, tcp_listener, zid::ZNodeId, GIT_VERSION, + http::endpoint, logging, project_git_version, shutdown::exit_now, signals, tcp_listener, + zid::ZNodeId, }; const LOCK_FILE_NAME: &str = "safekeeper.lock"; const ID_FILE_NAME: &str = "safekeeper.id"; +project_git_version!(); fn main() -> Result<()> { metrics::set_common_metrics_prefix("safekeeper"); @@ -193,7 +195,7 @@ fn main() -> Result<()> { fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bool) -> Result<()> { let log_file = logging::init("safekeeper.log", conf.daemonize)?; - info!("version: {}", GIT_VERSION); + info!("version: {GIT_VERSION}"); // Prevent running multiple safekeepers on the same directory let lock_file_path = conf.workdir.join(LOCK_FILE_NAME); From b683308791d81f005089aed35981c73d78fbb93c Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Fri, 13 May 2022 01:05:55 +0300 Subject: [PATCH 17/23] Return GIT_VERSION back to storage binaries --- libs/utils/src/lib.rs | 55 +++++++++++++++------------ neon_local/src/main.rs | 4 +- pageserver/src/bin/dump_layerfile.rs | 2 +- pageserver/src/bin/pageserver.rs | 2 +- pageserver/src/bin/update_metadata.rs | 2 +- proxy/src/main.rs | 2 +- safekeeper/src/bin/safekeeper.rs | 2 +- 7 files changed, 37 insertions(+), 32 deletions(-) diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 0398ce5e15..4810909712 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -54,33 +54,38 @@ pub mod nonblock; // Default signal handling pub mod signals; -// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages -// -// we have several cases: -// * building locally from git repo -// * building in CI from git repo -// * building in docker (either in CI or locally) -// -// One thing to note is that .git is not available in docker (and it is bad to include it there). -// So everything becides docker build is covered by git_version crate. -// For docker use environment variable to pass git version, which is then retrieved by buildscript (build.rs). -// It takes variable from build process env and puts it to the rustc env. And then we can retrieve it here by using env! macro. -// Git version received from environment variable used as a fallback in git_version invokation. -// And to avoid running buildscript every recompilation, we use rerun-if-env-changed option. -// So the build script will be run only when GIT_VERSION envvar has changed. -// -// Why not to use buildscript to get git commit sha directly without procmacro from different crate? -// Caching and workspaces complicates that. In case `utils` is not -// recompiled due to caching then version may become outdated. -// git_version crate handles that case by introducing a dependency on .git internals via include_bytes! macro, -// so if we changed the index state git_version will pick that up and rerun the macro. -// -// Note that with git_version prefix is `git:` and in case of git version from env its `git-env:`. +/// This is a shortcut to embed git sha into binaries and avoid copying the same build script to all packages +/// +/// we have several cases: +/// * building locally from git repo +/// * building in CI from git repo +/// * building in docker (either in CI or locally) +/// +/// One thing to note is that .git is not available in docker (and it is bad to include it there). +/// So everything becides docker build is covered by git_version crate, and docker uses a `GIT_VERSION` argument to get the value required. +/// It takes variable from build process env and puts it to the rustc env. And then we can retrieve it here by using env! macro. +/// Git version received from environment variable used as a fallback in git_version invokation. +/// And to avoid running buildscript every recompilation, we use rerun-if-env-changed option. +/// So the build script will be run only when GIT_VERSION envvar has changed. +/// +/// Why not to use buildscript to get git commit sha directly without procmacro from different crate? +/// Caching and workspaces complicates that. In case `utils` is not +/// recompiled due to caching then version may become outdated. +/// git_version crate handles that case by introducing a dependency on .git internals via include_bytes! macro, +/// so if we changed the index state git_version will pick that up and rerun the macro. +/// +/// Note that with git_version prefix is `git:` and in case of git version from env its `git-env:`. +/// +/// ############################################################################################# +/// TODO this macro is not the way the library is intended to be used, see https://github.com/neondatabase/neon/issues/1565 for details. +/// We use `cachepot` to reduce our current CI build times: https://github.com/neondatabase/cloud/pull/1033#issuecomment-1100935036 +/// Yet, it seems to ignore the GIT_VERSION env variable, passed to Docker build, even with build.rs that contains +/// `println!("cargo:rerun-if-env-changed=GIT_VERSION");` code for cachepot cache invalidation. +/// The problem needs further investigation and regular `const` declaration instead of a macro. #[macro_export] -// TODO kb add identifier into the capture macro_rules! project_git_version { - () => { - const GIT_VERSION: &str = git_version::git_version!( + ($const_identifier:ident) => { + const $const_identifier: &str = git_version::git_version!( prefix = "git:", fallback = concat!( "git-env:", diff --git a/neon_local/src/main.rs b/neon_local/src/main.rs index 2f470309ff..6538cdefc4 100644 --- a/neon_local/src/main.rs +++ b/neon_local/src/main.rs @@ -20,8 +20,8 @@ use utils::{ auth::{Claims, Scope}, lsn::Lsn, postgres_backend::AuthType, - zid::{ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId}, project_git_version, + zid::{ZNodeId, ZTenantId, ZTenantTimelineId, ZTimelineId}, }; use pageserver::timelines::TimelineInfo; @@ -30,7 +30,7 @@ use pageserver::timelines::TimelineInfo; const DEFAULT_SAFEKEEPER_ID: ZNodeId = ZNodeId(1); const DEFAULT_PAGESERVER_ID: ZNodeId = ZNodeId(1); const DEFAULT_BRANCH_NAME: &str = "main"; -project_git_version!(); +project_git_version!(GIT_VERSION); fn default_conf() -> String { format!( diff --git a/pageserver/src/bin/dump_layerfile.rs b/pageserver/src/bin/dump_layerfile.rs index cb08acadff..87390a1b06 100644 --- a/pageserver/src/bin/dump_layerfile.rs +++ b/pageserver/src/bin/dump_layerfile.rs @@ -9,7 +9,7 @@ use pageserver::virtual_file; use std::path::PathBuf; use utils::project_git_version; -project_git_version!(); +project_git_version!(GIT_VERSION); fn main() -> Result<()> { let arg_matches = App::new("Zenith dump_layerfile utility") diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 73ef5c5f4d..190e38e341 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -27,7 +27,7 @@ use utils::{ zid::{ZTenantId, ZTimelineId}, }; -project_git_version!(); +project_git_version!(GIT_VERSION); fn version() -> String { format!( diff --git a/pageserver/src/bin/update_metadata.rs b/pageserver/src/bin/update_metadata.rs index 3e69ad5c66..983fdb8647 100644 --- a/pageserver/src/bin/update_metadata.rs +++ b/pageserver/src/bin/update_metadata.rs @@ -8,7 +8,7 @@ use std::path::PathBuf; use std::str::FromStr; use utils::{lsn::Lsn, project_git_version}; -project_git_version!(); +project_git_version!(GIT_VERSION); fn main() -> Result<()> { let arg_matches = App::new("Zenith update metadata utility") diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 7d5105c88f..f46e19e5d6 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -27,7 +27,7 @@ use std::{future::Future, net::SocketAddr}; use tokio::{net::TcpListener, task::JoinError}; use utils::project_git_version; -project_git_version!(); +project_git_version!(GIT_VERSION); /// Flattens `Result>` into `Result`. async fn flatten_err( diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 06a15a90b0..65e71fcc74 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -28,7 +28,7 @@ use utils::{ const LOCK_FILE_NAME: &str = "safekeeper.lock"; const ID_FILE_NAME: &str = "safekeeper.id"; -project_git_version!(); +project_git_version!(GIT_VERSION); fn main() -> Result<()> { metrics::set_common_metrics_prefix("safekeeper"); From 22d997049c4cf5415b208a6fb397e1c3174980b8 Mon Sep 17 00:00:00 2001 From: Egor Suvorov Date: Fri, 6 May 2022 20:03:28 +0300 Subject: [PATCH 18/23] libs/utils/http/request: add ensure_no_body --- libs/utils/src/http/request.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/libs/utils/src/http/request.rs b/libs/utils/src/http/request.rs index 3bc8993c26..8e3d357397 100644 --- a/libs/utils/src/http/request.rs +++ b/libs/utils/src/http/request.rs @@ -1,7 +1,7 @@ use std::str::FromStr; use super::error::ApiError; -use hyper::{Body, Request}; +use hyper::{body::HttpBody, Body, Request}; use routerify::ext::RequestExt; pub fn get_request_param<'a>( @@ -31,3 +31,10 @@ pub fn parse_request_param( ))), } } + +pub async fn ensure_no_body(request: &mut Request) -> Result<(), ApiError> { + match request.body_mut().data().await { + Some(_) => Err(ApiError::BadRequest("Unexpected request body".into())), + None => Ok(()), + } +} From 07b85e7cfcf7d69c12e528ddde42d51444bbed27 Mon Sep 17 00:00:00 2001 From: Egor Suvorov Date: Thu, 12 May 2022 19:55:01 +0300 Subject: [PATCH 19/23] Safekeeper refactor: move callmemaybe_tx from SafekeeperPostgresBackend to Timeline --- safekeeper/src/bin/safekeeper.rs | 8 +-- safekeeper/src/handler.rs | 8 +-- safekeeper/src/receive_wal.rs | 11 +--- safekeeper/src/send_wal.rs | 6 +-- safekeeper/src/timeline.rs | 90 ++++++++++++++++++-------------- safekeeper/src/wal_service.rs | 19 ++----- 6 files changed, 66 insertions(+), 76 deletions(-) diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index 65e71fcc74..6955d2aa5c 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -17,6 +17,7 @@ use url::{ParseError, Url}; use safekeeper::control_file::{self}; use safekeeper::defaults::{DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_PG_LISTEN_ADDR}; use safekeeper::remove_wal; +use safekeeper::timeline::GlobalTimelines; use safekeeper::wal_service; use safekeeper::SafeKeeperConf; use safekeeper::{broker, callmemaybe}; @@ -251,6 +252,8 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: b let signals = signals::install_shutdown_handlers()?; let mut threads = vec![]; + let (callmemaybe_tx, callmemaybe_rx) = mpsc::unbounded_channel(); + GlobalTimelines::set_callmemaybe_tx(callmemaybe_tx); let conf_ = conf.clone(); threads.push( @@ -279,13 +282,12 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: b ); } - let (tx, rx) = mpsc::unbounded_channel(); let conf_cloned = conf.clone(); let safekeeper_thread = thread::Builder::new() .name("Safekeeper thread".into()) .spawn(|| { // thread code - let thread_result = wal_service::thread_main(conf_cloned, pg_listener, tx); + let thread_result = wal_service::thread_main(conf_cloned, pg_listener); if let Err(e) = thread_result { info!("safekeeper thread terminated: {}", e); } @@ -299,7 +301,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: b .name("callmemaybe thread".into()) .spawn(|| { // thread code - let thread_result = callmemaybe::thread_main(conf_cloned, rx); + let thread_result = callmemaybe::thread_main(conf_cloned, callmemaybe_rx); if let Err(e) = thread_result { error!("callmemaybe thread terminated: {}", e); } diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 7d86523b0e..9af78661f9 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -21,9 +21,6 @@ use utils::{ zid::{ZTenantId, ZTenantTimelineId, ZTimelineId}, }; -use crate::callmemaybe::CallmeEvent; -use tokio::sync::mpsc::UnboundedSender; - /// Safekeeper handler of postgres commands pub struct SafekeeperPostgresHandler { pub conf: SafeKeeperConf, @@ -33,8 +30,6 @@ pub struct SafekeeperPostgresHandler { pub ztimelineid: Option, pub timeline: Option>, pageserver_connstr: Option, - //sender to communicate with callmemaybe thread - pub tx: UnboundedSender, } /// Parsed Postgres command. @@ -140,7 +135,7 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler { } impl SafekeeperPostgresHandler { - pub fn new(conf: SafeKeeperConf, tx: UnboundedSender) -> Self { + pub fn new(conf: SafeKeeperConf) -> Self { SafekeeperPostgresHandler { conf, appname: None, @@ -148,7 +143,6 @@ impl SafekeeperPostgresHandler { ztimelineid: None, timeline: None, pageserver_connstr: None, - tx, } } diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 3ad99ab0df..0ef335c9ed 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -5,7 +5,6 @@ use anyhow::{anyhow, bail, Result}; use bytes::BytesMut; -use tokio::sync::mpsc::UnboundedSender; use tracing::*; use crate::timeline::Timeline; @@ -28,8 +27,6 @@ use utils::{ sock_split::ReadStream, }; -use crate::callmemaybe::CallmeEvent; - pub struct ReceiveWalConn<'pg> { /// Postgres connection pg_backend: &'pg mut PostgresBackend, @@ -91,10 +88,9 @@ impl<'pg> ReceiveWalConn<'pg> { // Register the connection and defer unregister. spg.timeline .get() - .on_compute_connect(self.pageserver_connstr.as_ref(), &spg.tx)?; + .on_compute_connect(self.pageserver_connstr.as_ref())?; let _guard = ComputeConnectionGuard { timeline: Arc::clone(spg.timeline.get()), - callmemaybe_tx: spg.tx.clone(), }; let mut next_msg = Some(next_msg); @@ -194,13 +190,10 @@ impl ProposerPollStream { struct ComputeConnectionGuard { timeline: Arc, - callmemaybe_tx: UnboundedSender, } impl Drop for ComputeConnectionGuard { fn drop(&mut self) { - self.timeline - .on_compute_disconnect(&self.callmemaybe_tx) - .unwrap(); + self.timeline.on_compute_disconnect().unwrap(); } } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 960f70d154..d52dd6ea57 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -264,13 +264,13 @@ impl ReplicationConn { } else { let pageserver_connstr = pageserver_connstr.expect("there should be a pageserver connection string since this is not a wal_proposer_recovery"); let zttid = spg.timeline.get().zttid; - let tx_clone = spg.tx.clone(); + let tx_clone = spg.timeline.get().callmemaybe_tx.clone(); let subscription_key = SubscriptionStateKey::new( zttid.tenant_id, zttid.timeline_id, pageserver_connstr.clone(), ); - spg.tx + tx_clone .send(CallmeEvent::Pause(subscription_key)) .unwrap_or_else(|e| { error!("failed to send Pause request to callmemaybe thread {}", e); @@ -315,7 +315,7 @@ impl ReplicationConn { } else { // TODO: also check once in a while whether we are walsender // to right pageserver. - if spg.timeline.get().check_deactivate(replica_id, &spg.tx)? { + if spg.timeline.get().check_deactivate(replica_id)? { // Shut down, timeline is suspended. // TODO create proper error type for this bail!("end streaming to {:?}", spg.appname); diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index a12f628e06..c73d6af4ac 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -275,15 +275,21 @@ impl SharedState { /// Database instance (tenant) pub struct Timeline { pub zttid: ZTenantTimelineId, + pub callmemaybe_tx: UnboundedSender, mutex: Mutex, /// conditional variable used to notify wal senders cond: Condvar, } impl Timeline { - fn new(zttid: ZTenantTimelineId, shared_state: SharedState) -> Timeline { + fn new( + zttid: ZTenantTimelineId, + callmemaybe_tx: UnboundedSender, + shared_state: SharedState, + ) -> Timeline { Timeline { zttid, + callmemaybe_tx, mutex: Mutex::new(shared_state), cond: Condvar::new(), } @@ -292,34 +298,27 @@ impl Timeline { /// Register compute connection, starting timeline-related activity if it is /// not running yet. /// Can fail only if channel to a static thread got closed, which is not normal at all. - pub fn on_compute_connect( - &self, - pageserver_connstr: Option<&String>, - callmemaybe_tx: &UnboundedSender, - ) -> Result<()> { + pub fn on_compute_connect(&self, pageserver_connstr: Option<&String>) -> Result<()> { let mut shared_state = self.mutex.lock().unwrap(); shared_state.num_computes += 1; // FIXME: currently we always adopt latest pageserver connstr, but we // should have kind of generations assigned by compute to distinguish // the latest one or even pass it through consensus to reliably deliver // to all safekeepers. - shared_state.activate(&self.zttid, pageserver_connstr, callmemaybe_tx)?; + shared_state.activate(&self.zttid, pageserver_connstr, &self.callmemaybe_tx)?; Ok(()) } /// De-register compute connection, shutting down timeline activity if /// pageserver doesn't need catchup. /// Can fail only if channel to a static thread got closed, which is not normal at all. - pub fn on_compute_disconnect( - &self, - callmemaybe_tx: &UnboundedSender, - ) -> Result<()> { + pub fn on_compute_disconnect(&self) -> Result<()> { let mut shared_state = self.mutex.lock().unwrap(); shared_state.num_computes -= 1; // If there is no pageserver, can suspend right away; otherwise let // walsender do that. if shared_state.num_computes == 0 && shared_state.pageserver_connstr.is_none() { - shared_state.deactivate(&self.zttid, callmemaybe_tx)?; + shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?; } Ok(()) } @@ -327,11 +326,7 @@ impl Timeline { /// Deactivate tenant if there is no computes and pageserver is caughtup, /// assuming the pageserver status is in replica_id. /// Returns true if deactivated. - pub fn check_deactivate( - &self, - replica_id: usize, - callmemaybe_tx: &UnboundedSender, - ) -> Result { + pub fn check_deactivate(&self, replica_id: usize) -> Result { let mut shared_state = self.mutex.lock().unwrap(); if !shared_state.active { // already suspended @@ -343,7 +338,7 @@ impl Timeline { (replica_state.last_received_lsn != Lsn::MAX && // Lsn::MAX means that we don't know the latest LSN yet. replica_state.last_received_lsn >= shared_state.sk.inmem.commit_lsn); if deactivate { - shared_state.deactivate(&self.zttid, callmemaybe_tx)?; + shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?; return Ok(true); } } @@ -508,22 +503,35 @@ impl TimelineTools for Option> { } } +struct GlobalTimelinesState { + timelines: HashMap>, + callmemaybe_tx: Option>, +} + lazy_static! { - pub static ref TIMELINES: Mutex>> = - Mutex::new(HashMap::new()); + static ref TIMELINES_STATE: Mutex = Mutex::new(GlobalTimelinesState { + timelines: HashMap::new(), + callmemaybe_tx: None + }); } /// A zero-sized struct used to manage access to the global timelines map. pub struct GlobalTimelines; impl GlobalTimelines { + pub fn set_callmemaybe_tx(callmemaybe_tx: UnboundedSender) { + let mut state = TIMELINES_STATE.lock().unwrap(); + assert!(state.callmemaybe_tx.is_none()); + state.callmemaybe_tx = Some(callmemaybe_tx); + } + fn create_internal( - mut timelines: MutexGuard>>, + mut state: MutexGuard, conf: &SafeKeeperConf, zttid: ZTenantTimelineId, peer_ids: Vec, ) -> Result> { - match timelines.get(&zttid) { + match state.timelines.get(&zttid) { Some(_) => bail!("timeline {} already exists", zttid), None => { // TODO: check directory existence @@ -532,8 +540,12 @@ impl GlobalTimelines { let shared_state = SharedState::create(conf, &zttid, peer_ids) .context("failed to create shared state")?; - let new_tli = Arc::new(Timeline::new(zttid, shared_state)); - timelines.insert(zttid, Arc::clone(&new_tli)); + let new_tli = Arc::new(Timeline::new( + zttid, + state.callmemaybe_tx.as_ref().unwrap().clone(), + shared_state, + )); + state.timelines.insert(zttid, Arc::clone(&new_tli)); Ok(new_tli) } } @@ -544,20 +556,20 @@ impl GlobalTimelines { zttid: ZTenantTimelineId, peer_ids: Vec, ) -> Result> { - let timelines = TIMELINES.lock().unwrap(); - GlobalTimelines::create_internal(timelines, conf, zttid, peer_ids) + let state = TIMELINES_STATE.lock().unwrap(); + GlobalTimelines::create_internal(state, conf, zttid, peer_ids) } - /// Get a timeline with control file loaded from the global TIMELINES map. + /// Get a timeline with control file loaded from the global TIMELINES_STATE.timelines map. /// If control file doesn't exist and create=false, bails out. pub fn get( conf: &SafeKeeperConf, zttid: ZTenantTimelineId, create: bool, ) -> Result> { - let mut timelines = TIMELINES.lock().unwrap(); + let mut state = TIMELINES_STATE.lock().unwrap(); - match timelines.get(&zttid) { + match state.timelines.get(&zttid) { Some(result) => Ok(Arc::clone(result)), None => { let shared_state = @@ -573,20 +585,19 @@ impl GlobalTimelines { .contains("No such file or directory") && create { - return GlobalTimelines::create_internal( - timelines, - conf, - zttid, - vec![], - ); + return GlobalTimelines::create_internal(state, conf, zttid, vec![]); } else { return Err(error); } } }; - let new_tli = Arc::new(Timeline::new(zttid, shared_state)); - timelines.insert(zttid, Arc::clone(&new_tli)); + let new_tli = Arc::new(Timeline::new( + zttid, + state.callmemaybe_tx.as_ref().unwrap().clone(), + shared_state, + )); + state.timelines.insert(zttid, Arc::clone(&new_tli)); Ok(new_tli) } } @@ -594,8 +605,9 @@ impl GlobalTimelines { /// Get ZTenantTimelineIDs of all active timelines. pub fn get_active_timelines() -> Vec { - let timelines = TIMELINES.lock().unwrap(); - timelines + let state = TIMELINES_STATE.lock().unwrap(); + state + .timelines .iter() .filter(|&(_, tli)| tli.is_active()) .map(|(zttid, _)| *zttid) diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 468ac28526..5980160788 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -8,29 +8,22 @@ use std::net::{TcpListener, TcpStream}; use std::thread; use tracing::*; -use crate::callmemaybe::CallmeEvent; use crate::handler::SafekeeperPostgresHandler; use crate::SafeKeeperConf; -use tokio::sync::mpsc::UnboundedSender; use utils::postgres_backend::{AuthType, PostgresBackend}; /// Accept incoming TCP connections and spawn them into a background thread. -pub fn thread_main( - conf: SafeKeeperConf, - listener: TcpListener, - tx: UnboundedSender, -) -> Result<()> { +pub fn thread_main(conf: SafeKeeperConf, listener: TcpListener) -> Result<()> { loop { match listener.accept() { Ok((socket, peer_addr)) => { debug!("accepted connection from {}", peer_addr); let conf = conf.clone(); - let tx_clone = tx.clone(); let _ = thread::Builder::new() .name("WAL service thread".into()) .spawn(move || { - if let Err(err) = handle_socket(socket, conf, tx_clone) { + if let Err(err) = handle_socket(socket, conf) { error!("connection handler exited: {}", err); } }) @@ -51,16 +44,12 @@ fn get_tid() -> u64 { /// This is run by `thread_main` above, inside a background thread. /// -fn handle_socket( - socket: TcpStream, - conf: SafeKeeperConf, - tx: UnboundedSender, -) -> Result<()> { +fn handle_socket(socket: TcpStream, conf: SafeKeeperConf) -> Result<()> { let _enter = info_span!("", tid = ?get_tid()).entered(); socket.set_nodelay(true)?; - let mut conn_handler = SafekeeperPostgresHandler::new(conf, tx); + let mut conn_handler = SafekeeperPostgresHandler::new(conf); let pgbackend = PostgresBackend::new(socket, AuthType::Trust, None, false)?; // libpq replication protocol between safekeeper and replicas/pagers pgbackend.run(&mut conn_handler)?; From bf899a57d9a2b20ba812a4002c0ac3234f064d26 Mon Sep 17 00:00:00 2001 From: Egor Suvorov Date: Thu, 12 May 2022 23:40:29 +0300 Subject: [PATCH 20/23] Safekeeper: add timeline/tenant force delete HTTP endpoings (closes #895) * There is no auth in Safekeeper HTTP at all currently, so simply calling `check_permission` is not enough. * There are no checks of Safekeeper still working with the data, as "still working" is burry now: a timeline may be "active" while there are no compute nodes and all data is propagated. * Still, callmemaybe is deactivated, and timeline is removed from the internal map. It can easily sneak back in case of race conditions and implicit creations, though. --- safekeeper/src/http/routes.rs | 48 +++++++- safekeeper/src/lib.rs | 9 +- safekeeper/src/timeline.rs | 98 ++++++++++++++- test_runner/batch_others/test_wal_acceptor.py | 113 ++++++++++++++++++ test_runner/fixtures/zenith_fixtures.py | 15 +++ 5 files changed, 277 insertions(+), 6 deletions(-) diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index e731db5617..62fbd2ff2f 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -3,19 +3,20 @@ use hyper::{Body, Request, Response, StatusCode}; use serde::Serialize; use serde::Serializer; +use std::collections::HashMap; use std::fmt::Display; use std::sync::Arc; use crate::safekeeper::Term; use crate::safekeeper::TermHistory; -use crate::timeline::GlobalTimelines; +use crate::timeline::{GlobalTimelines, TimelineDeleteForceResult}; use crate::SafeKeeperConf; use utils::{ http::{ endpoint, error::ApiError, json::{json_request, json_response}, - request::parse_request_param, + request::{ensure_no_body, parse_request_param}, RequestExt, RouterBuilder, }, lsn::Lsn, @@ -130,6 +131,44 @@ async fn timeline_create_handler(mut request: Request) -> Result, +) -> Result, ApiError> { + let zttid = ZTenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + ensure_no_body(&mut request).await?; + json_response( + StatusCode::OK, + GlobalTimelines::delete_force(get_conf(&request), &zttid).map_err(ApiError::from_err)?, + ) +} + +/// Deactivates all timelines for the tenant and removes its data directory. +/// See `timeline_delete_force_handler`. +async fn tenant_delete_force_handler( + mut request: Request, +) -> Result, ApiError> { + let tenant_id = parse_request_param(&request, "tenant_id")?; + ensure_no_body(&mut request).await?; + json_response( + StatusCode::OK, + GlobalTimelines::delete_force_all_for_tenant(get_conf(&request), &tenant_id) + .map_err(ApiError::from_err)? + .iter() + .map(|(zttid, resp)| (format!("{}", zttid.timeline_id), *resp)) + .collect::>(), + ) +} + /// Used only in tests to hand craft required data. async fn record_safekeeper_info(mut request: Request) -> Result, ApiError> { let zttid = ZTenantTimelineId::new( @@ -155,6 +194,11 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder timeline_status_handler, ) .post("/v1/timeline", timeline_create_handler) + .delete( + "/v1/tenant/:tenant_id/timeline/:timeline_id", + timeline_delete_force_handler, + ) + .delete("/v1/tenant/:tenant_id", tenant_delete_force_handler) // for tests .post( "/v1/record_safekeeper_info/:tenant_id/:timeline_id", diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 03236d4e65..09b2e68a49 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use std::time::Duration; use url::Url; -use utils::zid::{ZNodeId, ZTenantTimelineId}; +use utils::zid::{ZNodeId, ZTenantId, ZTenantTimelineId}; pub mod broker; pub mod callmemaybe; @@ -57,9 +57,12 @@ pub struct SafeKeeperConf { } impl SafeKeeperConf { + pub fn tenant_dir(&self, tenant_id: &ZTenantId) -> PathBuf { + self.workdir.join(tenant_id.to_string()) + } + pub fn timeline_dir(&self, zttid: &ZTenantTimelineId) -> PathBuf { - self.workdir - .join(zttid.tenant_id.to_string()) + self.tenant_dir(&zttid.tenant_id) .join(zttid.timeline_id.to_string()) } } diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index c73d6af4ac..84ad53d72d 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -7,6 +7,8 @@ use etcd_broker::SkTimelineInfo; use lazy_static::lazy_static; use postgres_ffi::xlog_utils::XLogSegNo; +use serde::Serialize; + use std::cmp::{max, min}; use std::collections::HashMap; use std::fs::{self}; @@ -19,7 +21,7 @@ use tracing::*; use utils::{ lsn::Lsn, pq_proto::ZenithFeedback, - zid::{ZNodeId, ZTenantTimelineId}, + zid::{ZNodeId, ZTenantId, ZTenantTimelineId}, }; use crate::callmemaybe::{CallmeEvent, SubscriptionStateKey}; @@ -345,6 +347,20 @@ impl Timeline { Ok(false) } + /// Deactivates the timeline, assuming it is being deleted. + /// Returns whether the timeline was already active. + /// + /// The callmemaybe thread is stopped by the deactivation message. We assume all other threads + /// will stop by themselves eventually (possibly with errors, but no panics). There should be no + /// compute threads (as we're deleting the timeline), actually. Some WAL may be left unsent, but + /// we're deleting the timeline anyway. + pub fn deactivate_for_delete(&self) -> Result { + let mut shared_state = self.mutex.lock().unwrap(); + let was_active = shared_state.active; + shared_state.deactivate(&self.zttid, &self.callmemaybe_tx)?; + Ok(was_active) + } + fn is_active(&self) -> bool { let shared_state = self.mutex.lock().unwrap(); shared_state.active @@ -515,6 +531,12 @@ lazy_static! { }); } +#[derive(Clone, Copy, Serialize)] +pub struct TimelineDeleteForceResult { + pub dir_existed: bool, + pub was_active: bool, +} + /// A zero-sized struct used to manage access to the global timelines map. pub struct GlobalTimelines; @@ -613,4 +635,78 @@ impl GlobalTimelines { .map(|(zttid, _)| *zttid) .collect() } + + fn delete_force_internal( + conf: &SafeKeeperConf, + zttid: &ZTenantTimelineId, + was_active: bool, + ) -> Result { + match std::fs::remove_dir_all(conf.timeline_dir(zttid)) { + Ok(_) => Ok(TimelineDeleteForceResult { + dir_existed: true, + was_active, + }), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(TimelineDeleteForceResult { + dir_existed: false, + was_active, + }), + Err(e) => Err(e.into()), + } + } + + /// Deactivates and deletes the timeline, see `Timeline::deactivate_for_delete()`, the deletes + /// the corresponding data directory. + /// We assume all timeline threads do not care about `GlobalTimelines` not containing the timeline + /// anymore, and they will eventually terminate without panics. + /// + /// There are multiple ways the timeline may be accidentally "re-created" (so we end up with two + /// `Timeline` objects in memory): + /// a) a compute node connects after this method is called, or + /// b) an HTTP GET request about the timeline is made and it's able to restore the current state, or + /// c) an HTTP POST request for timeline creation is made after the timeline is already deleted. + /// TODO: ensure all of the above never happens. + pub fn delete_force( + conf: &SafeKeeperConf, + zttid: &ZTenantTimelineId, + ) -> Result { + info!("deleting timeline {}", zttid); + let was_active = match TIMELINES_STATE.lock().unwrap().timelines.remove(zttid) { + None => false, + Some(tli) => tli.deactivate_for_delete()?, + }; + GlobalTimelines::delete_force_internal(conf, zttid, was_active) + } + + /// Deactivates and deletes all timelines for the tenant, see `delete()`. + /// Returns map of all timelines which the tenant had, `true` if a timeline was active. + pub fn delete_force_all_for_tenant( + conf: &SafeKeeperConf, + tenant_id: &ZTenantId, + ) -> Result> { + info!("deleting all timelines for tenant {}", tenant_id); + let mut state = TIMELINES_STATE.lock().unwrap(); + let mut deleted = HashMap::new(); + for (zttid, tli) in &state.timelines { + if zttid.tenant_id == *tenant_id { + deleted.insert( + *zttid, + GlobalTimelines::delete_force_internal( + conf, + zttid, + tli.deactivate_for_delete()?, + )?, + ); + } + } + // TODO: test that the exact subset of timelines is removed. + state + .timelines + .retain(|zttid, _| !deleted.contains_key(zttid)); + match std::fs::remove_dir_all(conf.tenant_dir(tenant_id)) { + Ok(_) => (), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => (), + e => e?, + }; + Ok(deleted) + } } diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index 702c27a79b..e297f91f2c 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -850,3 +850,116 @@ def test_wal_deleted_after_broadcast(zenith_env_builder: ZenithEnvBuilder): # there shouldn't be more than 2 WAL segments (but dir may have archive_status files) assert wal_size_after_checkpoint < 16 * 2.5 + + +def test_delete_force(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.num_safekeepers = 1 + env = zenith_env_builder.init_start() + + # Create two tenants: one will be deleted, other should be preserved. + tenant_id = env.initial_tenant.hex + timeline_id_1 = env.zenith_cli.create_branch('br1').hex # Acive, delete explicitly + timeline_id_2 = env.zenith_cli.create_branch('br2').hex # Inactive, delete explictly + timeline_id_3 = env.zenith_cli.create_branch('br3').hex # Active, delete with the tenant + timeline_id_4 = env.zenith_cli.create_branch('br4').hex # Inactive, delete with the tenant + + tenant_id_other = env.zenith_cli.create_tenant().hex + timeline_id_other = env.zenith_cli.create_root_branch( + 'br-other', tenant_id=uuid.UUID(hex=tenant_id_other)).hex + + # Populate branches + pg_1 = env.postgres.create_start('br1') + pg_2 = env.postgres.create_start('br2') + pg_3 = env.postgres.create_start('br3') + pg_4 = env.postgres.create_start('br4') + pg_other = env.postgres.create_start('br-other', tenant_id=uuid.UUID(hex=tenant_id_other)) + for pg in [pg_1, pg_2, pg_3, pg_4, pg_other]: + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + cur.execute('CREATE TABLE t(key int primary key)') + sk = env.safekeepers[0] + sk_data_dir = Path(sk.data_dir()) + sk_http = sk.http_client() + assert (sk_data_dir / tenant_id / timeline_id_1).is_dir() + assert (sk_data_dir / tenant_id / timeline_id_2).is_dir() + assert (sk_data_dir / tenant_id / timeline_id_3).is_dir() + assert (sk_data_dir / tenant_id / timeline_id_4).is_dir() + assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + + # Stop branches which should be inactive and restart Safekeeper to drop its in-memory state. + pg_2.stop_and_destroy() + pg_4.stop_and_destroy() + sk.stop() + sk.start() + + # Ensure connections to Safekeeper are established + for pg in [pg_1, pg_3, pg_other]: + with closing(pg.connect()) as conn: + with conn.cursor() as cur: + cur.execute('INSERT INTO t (key) VALUES (1)') + + # Remove initial tenant's br1 (active) + assert sk_http.timeline_delete_force(tenant_id, timeline_id_1) == { + "dir_existed": True, + "was_active": True, + } + assert not (sk_data_dir / tenant_id / timeline_id_1).exists() + assert (sk_data_dir / tenant_id / timeline_id_2).is_dir() + assert (sk_data_dir / tenant_id / timeline_id_3).is_dir() + assert (sk_data_dir / tenant_id / timeline_id_4).is_dir() + assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + + # Ensure repeated deletion succeeds + assert sk_http.timeline_delete_force(tenant_id, timeline_id_1) == { + "dir_existed": False, "was_active": False + } + assert not (sk_data_dir / tenant_id / timeline_id_1).exists() + assert (sk_data_dir / tenant_id / timeline_id_2).is_dir() + assert (sk_data_dir / tenant_id / timeline_id_3).is_dir() + assert (sk_data_dir / tenant_id / timeline_id_4).is_dir() + assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + + # Remove initial tenant's br2 (inactive) + assert sk_http.timeline_delete_force(tenant_id, timeline_id_2) == { + "dir_existed": True, + "was_active": False, + } + assert not (sk_data_dir / tenant_id / timeline_id_1).exists() + assert not (sk_data_dir / tenant_id / timeline_id_2).exists() + assert (sk_data_dir / tenant_id / timeline_id_3).is_dir() + assert (sk_data_dir / tenant_id / timeline_id_4).is_dir() + assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + + # Remove non-existing branch, should succeed + assert sk_http.timeline_delete_force(tenant_id, '00' * 16) == { + "dir_existed": False, + "was_active": False, + } + assert not (sk_data_dir / tenant_id / timeline_id_1).exists() + assert not (sk_data_dir / tenant_id / timeline_id_2).exists() + assert (sk_data_dir / tenant_id / timeline_id_3).exists() + assert (sk_data_dir / tenant_id / timeline_id_4).is_dir() + assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + + # Remove initial tenant fully (two branches are active) + response = sk_http.tenant_delete_force(tenant_id) + assert response == { + timeline_id_3: { + "dir_existed": True, + "was_active": True, + } + } + assert not (sk_data_dir / tenant_id).exists() + assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + + # Remove initial tenant again. + response = sk_http.tenant_delete_force(tenant_id) + assert response == {} + assert not (sk_data_dir / tenant_id).exists() + assert (sk_data_dir / tenant_id_other / timeline_id_other).is_dir() + + # Ensure the other tenant still works + sk_http.timeline_status(tenant_id_other, timeline_id_other) + with closing(pg_other.connect()) as conn: + with conn.cursor() as cur: + cur.execute('INSERT INTO t (key) VALUES (123)') diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index fe20f1abbf..357db4c16d 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1800,6 +1800,21 @@ class SafekeeperHttpClient(requests.Session): json=body) res.raise_for_status() + def timeline_delete_force(self, tenant_id: str, timeline_id: str) -> Dict[Any, Any]: + res = self.delete( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}") + res.raise_for_status() + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + + def tenant_delete_force(self, tenant_id: str) -> Dict[Any, Any]: + res = self.delete(f"http://localhost:{self.port}/v1/tenant/{tenant_id}") + res.raise_for_status() + res_json = res.json() + assert isinstance(res_json, dict) + return res_json + def get_metrics(self) -> SafekeeperMetrics: request_result = self.get(f"http://localhost:{self.port}/metrics") request_result.raise_for_status() From aa7c601eca425d82e616e0fc0468dac8a2a35db2 Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Thu, 12 May 2022 20:53:40 +0300 Subject: [PATCH 21/23] Fix pitr_interval check in GC: Use timestamp->LSN mapping instead of file modification time. Fix 'latest_gc_cutoff_lsn' - set it to the minimum of pitr_cutoff and gc_cutoff. Add new test: test_pitr_gc --- pageserver/src/layered_repository.rs | 76 +++++++++++++++-------- test_runner/batch_others/test_pitr_gc.py | 77 ++++++++++++++++++++++++ test_runner/fixtures/utils.py | 3 +- 3 files changed, 131 insertions(+), 25 deletions(-) create mode 100644 test_runner/batch_others/test_pitr_gc.py diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index b02ab00a21..24f9bcff37 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -74,6 +74,7 @@ pub mod metadata; mod par_fsync; mod storage_layer; +use crate::pgdatadir_mapping::LsnForTimestamp; use delta_layer::{DeltaLayer, DeltaLayerWriter}; use ephemeral_file::is_ephemeral_file; use filename::{DeltaFileName, ImageFileName}; @@ -81,6 +82,7 @@ use image_layer::{ImageLayer, ImageLayerWriter}; use inmemory_layer::InMemoryLayer; use layer_map::LayerMap; use layer_map::SearchResult; +use postgres_ffi::xlog_utils::to_pg_timestamp; use storage_layer::{Layer, ValueReconstructResult, ValueReconstructState}; // re-export this function so that page_cache.rs can use it. @@ -2118,11 +2120,49 @@ impl LayeredTimeline { let cutoff = gc_info.cutoff; let pitr = gc_info.pitr; + // Calculate pitr cutoff point. + // By default, we don't want to GC anything. + let mut pitr_cutoff_lsn: Lsn = *self.get_latest_gc_cutoff_lsn(); + + if let Ok(timeline) = + tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id) + { + // First, calculate pitr_cutoff_timestamp and then convert it to LSN. + // If we don't have enough data to convert to LSN, + // play safe and don't remove any layers. + if let Some(pitr_cutoff_timestamp) = now.checked_sub(pitr) { + let pitr_timestamp = to_pg_timestamp(pitr_cutoff_timestamp); + + match timeline.find_lsn_for_timestamp(pitr_timestamp)? { + LsnForTimestamp::Present(lsn) => pitr_cutoff_lsn = lsn, + LsnForTimestamp::Future(lsn) => { + debug!("future({})", lsn); + } + LsnForTimestamp::Past(lsn) => { + debug!("past({})", lsn); + } + } + debug!("pitr_cutoff_lsn = {:?}", pitr_cutoff_lsn) + } + } else { + // We don't have local timeline in mocked cargo tests. + // So, just ignore pitr_interval setting in this case. + pitr_cutoff_lsn = cutoff; + } + + let new_gc_cutoff = Lsn::min(cutoff, pitr_cutoff_lsn); + + // Nothing to GC. Return early. + if *self.get_latest_gc_cutoff_lsn() == new_gc_cutoff { + result.elapsed = now.elapsed()?; + return Ok(result); + } + let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %cutoff).entered(); // We need to ensure that no one branches at a point before latest_gc_cutoff_lsn. // See branch_timeline() for details. - *self.latest_gc_cutoff_lsn.write().unwrap() = cutoff; + *self.latest_gc_cutoff_lsn.write().unwrap() = new_gc_cutoff; info!("GC starting"); @@ -2162,30 +2202,18 @@ impl LayeredTimeline { result.layers_needed_by_cutoff += 1; continue 'outer; } - // 2. It is newer than PiTR interval? - // We use modification time of layer file to estimate update time. - // This estimation is not quite precise but maintaining LSN->timestamp map seems to be overkill. - // It is not expected that users will need high precision here. And this estimation - // is conservative: modification time of file is always newer than actual time of version - // creation. So it is safe for users. - // TODO A possible "bloat" issue still persists here. - // If modification time changes because of layer upload/download, we will keep these files - // longer than necessary. - // https://github.com/neondatabase/neon/issues/1554 - // - if let Ok(metadata) = fs::metadata(&l.filename()) { - let last_modified = metadata.modified()?; - if now.duration_since(last_modified)? < pitr { - debug!( - "keeping {} because it's modification time {:?} is newer than PITR {:?}", - l.filename().display(), - last_modified, - pitr - ); - result.layers_needed_by_pitr += 1; - continue 'outer; - } + + // 2. It is newer than PiTR cutoff point? + if l.get_lsn_range().end > pitr_cutoff_lsn { + debug!( + "keeping {} because it's newer than pitr_cutoff_lsn {}", + l.filename().display(), + pitr_cutoff_lsn + ); + result.layers_needed_by_pitr += 1; + continue 'outer; } + // 3. Is it needed by a child branch? // NOTE With that wee would keep data that // might be referenced by child branches forever. diff --git a/test_runner/batch_others/test_pitr_gc.py b/test_runner/batch_others/test_pitr_gc.py new file mode 100644 index 0000000000..fe9159b4bb --- /dev/null +++ b/test_runner/batch_others/test_pitr_gc.py @@ -0,0 +1,77 @@ +import subprocess +from contextlib import closing + +import psycopg2.extras +import pytest +from fixtures.log_helper import log +from fixtures.utils import print_gc_result +from fixtures.zenith_fixtures import ZenithEnvBuilder + + +# +# Check pitr_interval GC behavior. +# Insert some data, run GC and create a branch in the past. +# +def test_pitr_gc(zenith_env_builder: ZenithEnvBuilder): + + zenith_env_builder.num_safekeepers = 1 + # Set pitr interval such that we need to keep the data + zenith_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '1day', gc_horizon = 0}" + + env = zenith_env_builder.init_start() + pgmain = env.postgres.create_start('main') + log.info("postgres is running on 'main' branch") + + main_pg_conn = pgmain.connect() + main_cur = main_pg_conn.cursor() + + main_cur.execute("SHOW zenith.zenith_timeline") + timeline = main_cur.fetchone()[0] + + # Create table + main_cur.execute('CREATE TABLE foo (t text)') + + for i in range(10000): + main_cur.execute(''' + INSERT INTO foo + SELECT 'long string to consume some space'; + ''') + + if i == 99: + # keep some early lsn to test branch creation after GC + main_cur.execute('SELECT pg_current_wal_insert_lsn(), txid_current()') + res = main_cur.fetchone() + lsn_a = res[0] + xid_a = res[1] + log.info(f'LSN after 100 rows: {lsn_a} xid {xid_a}') + + main_cur.execute('SELECT pg_current_wal_insert_lsn(), txid_current()') + res = main_cur.fetchone() + debug_lsn = res[0] + debug_xid = res[1] + log.info(f'LSN after 10000 rows: {debug_lsn} xid {debug_xid}') + + # run GC + with closing(env.pageserver.connect()) as psconn: + with psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) as pscur: + pscur.execute(f"compact {env.initial_tenant.hex} {timeline}") + # perform agressive GC. Data still should be kept because of the PITR setting. + pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0") + row = pscur.fetchone() + print_gc_result(row) + + # Branch at the point where only 100 rows were inserted + # It must have been preserved by PITR setting + env.zenith_cli.create_branch('test_pitr_gc_hundred', 'main', ancestor_start_lsn=lsn_a) + + pg_hundred = env.postgres.create_start('test_pitr_gc_hundred') + + # On the 'hundred' branch, we should see only 100 rows + hundred_pg_conn = pg_hundred.connect() + hundred_cur = hundred_pg_conn.cursor() + hundred_cur.execute('SELECT count(*) FROM foo') + assert hundred_cur.fetchone() == (100, ) + + # All the rows are visible on the main branch + main_cur.execute('SELECT count(*) FROM foo') + assert main_cur.fetchone() == (10000, ) diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 98af511036..7b95e729d9 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -75,7 +75,8 @@ def lsn_from_hex(lsn_hex: str) -> int: def print_gc_result(row): log.info("GC duration {elapsed} ms".format_map(row)) log.info( - " total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}" + " total: {layers_total}, needed_by_cutoff {layers_needed_by_cutoff}, needed_by_pitr {layers_needed_by_pitr}" + " needed_by_branches: {layers_needed_by_branches}, not_updated: {layers_not_updated}, removed: {layers_removed}" .format_map(row)) From a2561f0a78116fc775732cb36c7df992d4d3a07a Mon Sep 17 00:00:00 2001 From: Anastasia Lubennikova Date: Fri, 13 May 2022 16:01:41 +0300 Subject: [PATCH 22/23] Use tenant's pitr_interval instead of hardroded 0 in the command. Adjust python tests that use the --- pageserver/src/layered_repository.rs | 11 ++++++++--- pageserver/src/page_service.rs | 5 +++-- test_runner/batch_others/test_branch_behind.py | 2 ++ test_runner/batch_others/test_gc_aggressive.py | 11 +++++++---- .../batch_others/test_old_request_lsn.py | 17 ++++++++++++----- test_runner/batch_others/test_pitr_gc.py | 2 +- test_runner/performance/test_bulk_insert.py | 1 - test_runner/performance/test_random_writes.py | 1 - 8 files changed, 33 insertions(+), 17 deletions(-) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 24f9bcff37..c7536cc959 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -2121,7 +2121,7 @@ impl LayeredTimeline { let pitr = gc_info.pitr; // Calculate pitr cutoff point. - // By default, we don't want to GC anything. + // If we cannot determine a cutoff LSN, be conservative and don't GC anything. let mut pitr_cutoff_lsn: Lsn = *self.get_latest_gc_cutoff_lsn(); if let Ok(timeline) = @@ -2137,6 +2137,7 @@ impl LayeredTimeline { LsnForTimestamp::Present(lsn) => pitr_cutoff_lsn = lsn, LsnForTimestamp::Future(lsn) => { debug!("future({})", lsn); + pitr_cutoff_lsn = cutoff; } LsnForTimestamp::Past(lsn) => { debug!("past({})", lsn); @@ -2144,7 +2145,7 @@ impl LayeredTimeline { } debug!("pitr_cutoff_lsn = {:?}", pitr_cutoff_lsn) } - } else { + } else if cfg!(test) { // We don't have local timeline in mocked cargo tests. // So, just ignore pitr_interval setting in this case. pitr_cutoff_lsn = cutoff; @@ -2153,7 +2154,11 @@ impl LayeredTimeline { let new_gc_cutoff = Lsn::min(cutoff, pitr_cutoff_lsn); // Nothing to GC. Return early. - if *self.get_latest_gc_cutoff_lsn() == new_gc_cutoff { + if *self.get_latest_gc_cutoff_lsn() >= new_gc_cutoff { + info!( + "Nothing to GC for timeline {}. cutoff_lsn {}", + self.timeline_id, new_gc_cutoff + ); result.elapsed = now.elapsed()?; return Ok(result); } diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 88273cfa57..28d6bf2621 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -19,7 +19,6 @@ use std::net::TcpListener; use std::str; use std::str::FromStr; use std::sync::{Arc, RwLockReadGuard}; -use std::time::Duration; use tracing::*; use utils::{ auth::{self, Claims, JwtAuth, Scope}, @@ -796,7 +795,9 @@ impl postgres_backend::Handler for PageServerHandler { .unwrap_or_else(|| Ok(repo.get_gc_horizon()))?; let repo = tenant_mgr::get_repository_for_tenant(tenantid)?; - let result = repo.gc_iteration(Some(timelineid), gc_horizon, Duration::ZERO, true)?; + // Use tenant's pitr setting + let pitr = repo.get_pitr_interval(); + let result = repo.gc_iteration(Some(timelineid), gc_horizon, pitr, true)?; pgb.write_message_noflush(&BeMessage::RowDescription(&[ RowDescriptor::int8_col(b"layers_total"), RowDescriptor::int8_col(b"layers_needed_by_cutoff"), diff --git a/test_runner/batch_others/test_branch_behind.py b/test_runner/batch_others/test_branch_behind.py index 4e2be352f4..fc84af5283 100644 --- a/test_runner/batch_others/test_branch_behind.py +++ b/test_runner/batch_others/test_branch_behind.py @@ -19,6 +19,8 @@ def test_branch_behind(zenith_env_builder: ZenithEnvBuilder): # # See https://github.com/zenithdb/zenith/issues/1068 zenith_env_builder.num_safekeepers = 1 + # Disable pitr, because here we want to test branch creation after GC + zenith_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}" env = zenith_env_builder.init_start() # Branch at the point where only 100 rows were inserted diff --git a/test_runner/batch_others/test_gc_aggressive.py b/test_runner/batch_others/test_gc_aggressive.py index e4e4aa9f4a..519a6dda1c 100644 --- a/test_runner/batch_others/test_gc_aggressive.py +++ b/test_runner/batch_others/test_gc_aggressive.py @@ -1,7 +1,7 @@ import asyncio import random -from fixtures.zenith_fixtures import ZenithEnv, Postgres +from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, Postgres from fixtures.log_helper import log # Test configuration @@ -50,9 +50,12 @@ async def update_and_gc(env: ZenithEnv, pg: Postgres, timeline: str): # # (repro for https://github.com/zenithdb/zenith/issues/1047) # -def test_gc_aggressive(zenith_simple_env: ZenithEnv): - env = zenith_simple_env - env.zenith_cli.create_branch("test_gc_aggressive", "empty") +def test_gc_aggressive(zenith_env_builder: ZenithEnvBuilder): + + # Disable pitr, because here we want to test branch creation after GC + zenith_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}" + env = zenith_env_builder.init_start() + env.zenith_cli.create_branch("test_gc_aggressive", "main") pg = env.postgres.create_start('test_gc_aggressive') log.info('postgres is running on test_gc_aggressive branch') diff --git a/test_runner/batch_others/test_old_request_lsn.py b/test_runner/batch_others/test_old_request_lsn.py index e7400cff96..cf7fe09b1e 100644 --- a/test_runner/batch_others/test_old_request_lsn.py +++ b/test_runner/batch_others/test_old_request_lsn.py @@ -1,5 +1,7 @@ -from fixtures.zenith_fixtures import ZenithEnv +from fixtures.zenith_fixtures import ZenithEnvBuilder from fixtures.log_helper import log +from fixtures.utils import print_gc_result +import psycopg2.extras # @@ -12,9 +14,11 @@ from fixtures.log_helper import log # just a hint that the page hasn't been modified since that LSN, and the page # server should return the latest page version regardless of the LSN. # -def test_old_request_lsn(zenith_simple_env: ZenithEnv): - env = zenith_simple_env - env.zenith_cli.create_branch("test_old_request_lsn", "empty") +def test_old_request_lsn(zenith_env_builder: ZenithEnvBuilder): + # Disable pitr, because here we want to test branch creation after GC + zenith_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '0 sec'}" + env = zenith_env_builder.init_start() + env.zenith_cli.create_branch("test_old_request_lsn", "main") pg = env.postgres.create_start('test_old_request_lsn') log.info('postgres is running on test_old_request_lsn branch') @@ -26,7 +30,7 @@ def test_old_request_lsn(zenith_simple_env: ZenithEnv): timeline = cur.fetchone()[0] psconn = env.pageserver.connect() - pscur = psconn.cursor() + pscur = psconn.cursor(cursor_factory=psycopg2.extras.DictCursor) # Create table, and insert some rows. Make it big enough that it doesn't fit in # shared_buffers. @@ -53,6 +57,9 @@ def test_old_request_lsn(zenith_simple_env: ZenithEnv): # garbage collections so that the page server will remove old page versions. for i in range(10): pscur.execute(f"do_gc {env.initial_tenant.hex} {timeline} 0") + row = pscur.fetchone() + print_gc_result(row) + for j in range(100): cur.execute('UPDATE foo SET val = val + 1 WHERE id = 1;') diff --git a/test_runner/batch_others/test_pitr_gc.py b/test_runner/batch_others/test_pitr_gc.py index fe9159b4bb..ee19bddfe8 100644 --- a/test_runner/batch_others/test_pitr_gc.py +++ b/test_runner/batch_others/test_pitr_gc.py @@ -16,7 +16,7 @@ def test_pitr_gc(zenith_env_builder: ZenithEnvBuilder): zenith_env_builder.num_safekeepers = 1 # Set pitr interval such that we need to keep the data - zenith_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '1day', gc_horizon = 0}" + zenith_env_builder.pageserver_config_override = "tenant_config={pitr_interval = '1 day', gc_horizon = 0}" env = zenith_env_builder.init_start() pgmain = env.postgres.create_start('main') diff --git a/test_runner/performance/test_bulk_insert.py b/test_runner/performance/test_bulk_insert.py index 4e73bedcc0..3b57ac73cc 100644 --- a/test_runner/performance/test_bulk_insert.py +++ b/test_runner/performance/test_bulk_insert.py @@ -18,7 +18,6 @@ from fixtures.compare_fixtures import PgCompare, VanillaCompare, ZenithCompare def test_bulk_insert(zenith_with_baseline: PgCompare): env = zenith_with_baseline - # Get the timeline ID of our branch. We need it for the 'do_gc' command with closing(env.pg.connect()) as conn: with conn.cursor() as cur: cur.execute("create table huge (i int, j int);") diff --git a/test_runner/performance/test_random_writes.py b/test_runner/performance/test_random_writes.py index ba9eabcd97..205388bd90 100644 --- a/test_runner/performance/test_random_writes.py +++ b/test_runner/performance/test_random_writes.py @@ -8,7 +8,6 @@ from fixtures.log_helper import log import psycopg2.extras import random import time -from fixtures.utils import print_gc_result # This is a clear-box test that demonstrates the worst case scenario for the From 768c846eeb9f90450e06185ce477ed1a566a0f22 Mon Sep 17 00:00:00 2001 From: Egor Suvorov Date: Fri, 13 May 2022 17:06:25 +0300 Subject: [PATCH 23/23] Fix test_delete_force from #1653 conflicting with #1692 --- test_runner/batch_others/test_wal_acceptor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index e297f91f2c..67c9d6070e 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -863,16 +863,16 @@ def test_delete_force(zenith_env_builder: ZenithEnvBuilder): timeline_id_3 = env.zenith_cli.create_branch('br3').hex # Active, delete with the tenant timeline_id_4 = env.zenith_cli.create_branch('br4').hex # Inactive, delete with the tenant - tenant_id_other = env.zenith_cli.create_tenant().hex - timeline_id_other = env.zenith_cli.create_root_branch( - 'br-other', tenant_id=uuid.UUID(hex=tenant_id_other)).hex + tenant_id_other_uuid, timeline_id_other_uuid = env.zenith_cli.create_tenant() + tenant_id_other = tenant_id_other_uuid.hex + timeline_id_other = timeline_id_other_uuid.hex # Populate branches pg_1 = env.postgres.create_start('br1') pg_2 = env.postgres.create_start('br2') pg_3 = env.postgres.create_start('br3') pg_4 = env.postgres.create_start('br4') - pg_other = env.postgres.create_start('br-other', tenant_id=uuid.UUID(hex=tenant_id_other)) + pg_other = env.postgres.create_start('main', tenant_id=uuid.UUID(hex=tenant_id_other)) for pg in [pg_1, pg_2, pg_3, pg_4, pg_other]: with closing(pg.connect()) as conn: with conn.cursor() as cur: