diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 3803bd36b6..3ef9d81a49 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -28,7 +28,7 @@ use std::io::Write; use std::ops::{Bound::Included, Deref}; use std::path::{Path, PathBuf}; use std::sync::atomic::{self, AtomicBool, AtomicUsize}; -use std::sync::{Arc, Mutex, MutexGuard}; +use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; use std::time::{Duration, Instant}; use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME}; @@ -167,7 +167,7 @@ impl Repository for LayeredRepository { // Create the timeline directory, and write initial metadata to file. crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenantid))?; - let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), Lsn(0), initdb_lsn); + let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn); Self::save_metadata(self.conf, timelineid, self.tenantid, &metadata, true)?; let timeline = LayeredTimeline::new( @@ -201,9 +201,10 @@ impl Repository for LayeredRepository { bail!("Cannot branch off the timeline {} that's not local", src) } }; + let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn(); src_timeline - .check_lsn_is_in_scope(start_lsn) + .check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn) .context("invalid branch start lsn")?; let RecordLsn { @@ -231,7 +232,7 @@ impl Repository for LayeredRepository { dst_prev, Some(src), start_lsn, - src_timeline.latest_gc_cutoff_lsn.load(), + *src_timeline.latest_gc_cutoff_lsn.read().unwrap(), src_timeline.initdb_lsn, ); crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenantid))?; @@ -783,7 +784,7 @@ pub struct LayeredTimeline { checkpoint_cs: Mutex<()>, // Needed to ensure that we can't create a branch at a point that was already garbage collected - latest_gc_cutoff_lsn: AtomicLsn, + latest_gc_cutoff_lsn: RwLock, // It may change across major versions so for simplicity // keep it after running initdb for a timeline. @@ -827,6 +828,10 @@ impl Timeline for LayeredTimeline { Ok(()) } + fn get_latest_gc_cutoff_lsn(&self) -> RwLockReadGuard { + self.latest_gc_cutoff_lsn.read().unwrap() + } + /// Look up given page version. fn get_page_at_lsn(&self, rel: RelishTag, rel_blknum: BlockNumber, lsn: Lsn) -> Result { if !rel.is_blocky() && rel_blknum != 0 { @@ -837,14 +842,6 @@ impl Timeline for LayeredTimeline { ); } debug_assert!(lsn <= self.get_last_record_lsn()); - let latest_gc_cutoff_lsn = self.latest_gc_cutoff_lsn.load(); - // error instead of assert to simplify testing - ensure!( - lsn >= latest_gc_cutoff_lsn, - "tried to request a page version that was garbage collected. requested at {} gc cutoff {}", - lsn, latest_gc_cutoff_lsn - ); - let (seg, seg_blknum) = SegmentTag::from_blknum(rel, rel_blknum); if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? { @@ -1015,21 +1012,16 @@ impl Timeline for LayeredTimeline { /// /// Validate lsn against initdb_lsn and latest_gc_cutoff_lsn. /// - fn check_lsn_is_in_scope(&self, lsn: Lsn) -> Result<()> { - let initdb_lsn = self.initdb_lsn; + fn check_lsn_is_in_scope( + &self, + lsn: Lsn, + latest_gc_cutoff_lsn: &RwLockReadGuard, + ) -> Result<()> { ensure!( - lsn >= initdb_lsn, - "LSN {} is earlier than initdb lsn {}", - lsn, - initdb_lsn, - ); - - let latest_gc_cutoff_lsn = self.latest_gc_cutoff_lsn.load(); - ensure!( - lsn >= latest_gc_cutoff_lsn, + lsn >= **latest_gc_cutoff_lsn, "LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)", lsn, - latest_gc_cutoff_lsn, + **latest_gc_cutoff_lsn, ); Ok(()) } @@ -1143,7 +1135,7 @@ impl LayeredTimeline { write_lock: Mutex::new(()), checkpoint_cs: Mutex::new(()), - latest_gc_cutoff_lsn: AtomicLsn::from(metadata.latest_gc_cutoff_lsn()), + latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()), initdb_lsn: metadata.initdb_lsn(), } } @@ -1575,7 +1567,7 @@ impl LayeredTimeline { ondisk_prev_record_lsn, ancestor_timelineid, self.ancestor_lsn, - self.latest_gc_cutoff_lsn.load(), + *self.latest_gc_cutoff_lsn.read().unwrap(), self.initdb_lsn, ); @@ -1677,12 +1669,13 @@ impl LayeredTimeline { let now = Instant::now(); let mut result: GcResult = Default::default(); let disk_consistent_lsn = self.get_disk_consistent_lsn(); + let _checkpoint_cs = self.checkpoint_cs.lock().unwrap(); let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered(); // We need to ensure that no one branches at a point before latest_gc_cutoff_lsn. // See branch_timeline() for details. - self.latest_gc_cutoff_lsn.store(cutoff); + *self.latest_gc_cutoff_lsn.write().unwrap() = cutoff; info!("GC starting"); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d8ee59375b..7b69779e9a 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -18,7 +18,7 @@ use std::io; use std::net::TcpListener; use std::str; use std::str::FromStr; -use std::sync::Arc; +use std::sync::{Arc, RwLockReadGuard}; use tracing::*; use zenith_metrics::{register_histogram_vec, HistogramVec}; use zenith_utils::auth::{self, JwtAuth}; @@ -398,7 +398,12 @@ impl PageServerHandler { /// In either case, if the page server hasn't received the WAL up to the /// requested LSN yet, we will wait for it to arrive. The return value is /// the LSN that should be used to look up the page versions. - fn wait_or_get_last_lsn(timeline: &dyn Timeline, lsn: Lsn, latest: bool) -> Result { + fn wait_or_get_last_lsn( + timeline: &dyn Timeline, + mut lsn: Lsn, + latest: bool, + latest_gc_cutoff_lsn: &RwLockReadGuard, + ) -> Result { if latest { // Latest page version was requested. If LSN is given, it is a hint // to the page server that there have been no modifications to the @@ -419,22 +424,26 @@ impl PageServerHandler { // walsender completes the authentication and starts streaming the // WAL. if lsn <= last_record_lsn { - Ok(last_record_lsn) + lsn = last_record_lsn; } else { timeline.wait_lsn(lsn)?; // Since we waited for 'lsn' to arrive, that is now the last // record LSN. (Or close enough for our purposes; the // last-record LSN can advance immediately after we return // anyway) - Ok(lsn) } } else { if lsn == Lsn(0) { bail!("invalid LSN(0) in request"); } timeline.wait_lsn(lsn)?; - Ok(lsn) } + ensure!( + lsn >= **latest_gc_cutoff_lsn, + "tried to request a page version that was garbage collected. requested at {} gc cutoff {}", + lsn, **latest_gc_cutoff_lsn + ); + Ok(lsn) } fn handle_get_rel_exists_request( @@ -445,7 +454,8 @@ impl PageServerHandler { let _enter = info_span!("get_rel_exists", rel = %req.rel, req_lsn = %req.lsn).entered(); let tag = RelishTag::Relation(req.rel); - let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?; + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); + let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?; let exists = timeline.get_rel_exists(tag, lsn)?; @@ -461,7 +471,8 @@ impl PageServerHandler { ) -> Result { let _enter = info_span!("get_nblocks", rel = %req.rel, req_lsn = %req.lsn).entered(); let tag = RelishTag::Relation(req.rel); - let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?; + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); + let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?; let n_blocks = timeline.get_relish_size(tag, lsn)?; @@ -482,8 +493,16 @@ impl PageServerHandler { let _enter = info_span!("get_page", rel = %req.rel, blkno = &req.blkno, req_lsn = %req.lsn) .entered(); let tag = RelishTag::Relation(req.rel); - let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?; - + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); + let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?; + /* + // Add a 1s delay to some requests. The delayed causes the requests to + // hit the race condition from github issue #1047 more easily. + use rand::Rng; + if rand::thread_rng().gen::() < 5 { + std::thread::sleep(std::time::Duration::from_millis(1000)); + } + */ let page = timeline.get_page_at_lsn(tag, req.blkno, lsn)?; Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse { @@ -504,9 +523,10 @@ impl PageServerHandler { // check that the timeline exists let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid) .context("Cannot handle basebackup request for a remote timeline")?; + let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn(); if let Some(lsn) = lsn { timeline - .check_lsn_is_in_scope(lsn) + .check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn) .context("invalid basebackup lsn")?; } diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 94d2b3209a..94eadcb7a4 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -7,7 +7,7 @@ use postgres_ffi::{MultiXactId, MultiXactOffset, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::HashSet; use std::ops::{AddAssign, Deref}; -use std::sync::Arc; +use std::sync::{Arc, RwLockReadGuard}; use std::time::Duration; use zenith_utils::lsn::{Lsn, RecordLsn}; use zenith_utils::zid::ZTimelineId; @@ -184,6 +184,9 @@ pub trait Timeline: Send + Sync { /// fn wait_lsn(&self, lsn: Lsn) -> Result<()>; + /// Lock and get timeline's GC cuttof + fn get_latest_gc_cutoff_lsn(&self) -> RwLockReadGuard; + /// Look up given page version. fn get_page_at_lsn(&self, tag: RelishTag, blknum: BlockNumber, lsn: Lsn) -> Result; @@ -235,7 +238,11 @@ pub trait Timeline: Send + Sync { /// /// Check that it is valid to request operations with that lsn. - fn check_lsn_is_in_scope(&self, lsn: Lsn) -> Result<()>; + fn check_lsn_is_in_scope( + &self, + lsn: Lsn, + latest_gc_cutoff_lsn: &RwLockReadGuard, + ) -> Result<()>; /// Retrieve current logical size of the timeline /// @@ -987,7 +994,7 @@ mod tests { .source() .unwrap() .to_string() - .contains("is earlier than initdb lsn")); + .contains("is earlier than latest GC horizon")); } } @@ -1004,12 +1011,11 @@ mod tests { make_some_layers(&tline, Lsn(0x20))?; repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?; - + let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn(); + assert!(*latest_gc_cutoff_lsn > Lsn(0x25)); match tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x25)) { Ok(_) => panic!("request for page should have failed"), - Err(err) => assert!(err - .to_string() - .contains("tried to request a page version that was garbage collected")), + Err(err) => assert!(err.to_string().contains("not found at")), } Ok(()) } diff --git a/test_runner/batch_others/test_gc_aggressive.py b/test_runner/batch_others/test_gc_aggressive.py new file mode 100644 index 0000000000..7601542f75 --- /dev/null +++ b/test_runner/batch_others/test_gc_aggressive.py @@ -0,0 +1,84 @@ +from contextlib import closing + +import asyncio +import asyncpg +import random + +from fixtures.zenith_fixtures import ZenithEnv, Postgres, Safekeeper +from fixtures.log_helper import log + +pytest_plugins = ("fixtures.zenith_fixtures") + +# Test configuration +# +# Create a table with {num_rows} rows, and perform {updates_to_perform} random +# UPDATEs on it, using {num_connections} separate connections. +num_connections = 10 +num_rows = 100000 +updates_to_perform = 10000 + +updates_performed = 0 + + +# Run random UPDATEs on test table +async def update_table(pg: Postgres): + global updates_performed + pg_conn = await pg.connect_async() + + while updates_performed < updates_to_perform: + updates_performed += 1 + id = random.randrange(0, num_rows) + row = await pg_conn.fetchrow(f'UPDATE foo SET counter = counter + 1 WHERE id = {id}') + + +# Perform aggressive GC with 0 horizon +async def gc(env: ZenithEnv, timeline: str): + psconn = await env.pageserver.connect_async() + + while updates_performed < updates_to_perform: + await psconn.execute(f"do_gc {env.initial_tenant} {timeline} 0") + + +# At the same time, run UPDATEs and GC +async def update_and_gc(env: ZenithEnv, pg: Postgres, timeline: str): + workers = [] + for worker_id in range(num_connections): + workers.append(asyncio.create_task(update_table(pg))) + workers.append(asyncio.create_task(gc(env, timeline))) + + # await all workers + await asyncio.gather(*workers) + + +# +# Aggressively force GC, while running queries. +# +# (repro for https://github.com/zenithdb/zenith/issues/1047) +# +def test_gc_aggressive(zenith_simple_env: ZenithEnv): + env = zenith_simple_env + # Create a branch for us + env.zenith_cli(["branch", "test_gc_aggressive", "empty"]) + + pg = env.postgres.create_start('test_gc_aggressive') + log.info('postgres is running on test_gc_aggressive branch') + + conn = pg.connect() + cur = conn.cursor() + + cur.execute("SHOW zenith.zenith_timeline") + timeline = cur.fetchone()[0] + + # Create table, and insert the first 100 rows + cur.execute('CREATE TABLE foo (id int, counter int, t text)') + cur.execute(f''' + INSERT INTO foo + SELECT g, 0, 'long string to consume some space' || g + FROM generate_series(1, {num_rows}) g + ''') + cur.execute('CREATE INDEX ON foo(id)') + + asyncio.run(update_and_gc(env, pg, timeline)) + + row = cur.execute('SELECT COUNT(*), SUM(counter) FROM foo') + assert cur.fetchone() == (num_rows, updates_to_perform) diff --git a/zenith_utils/src/postgres_backend.rs b/zenith_utils/src/postgres_backend.rs index 2348f43d2f..47ff1ff0e0 100644 --- a/zenith_utils/src/postgres_backend.rs +++ b/zenith_utils/src/postgres_backend.rs @@ -366,6 +366,10 @@ impl PostgresBackend { AuthType::Trust => { self.write_message_noflush(&BeMessage::AuthenticationOk)? .write_message_noflush(&BeParameterStatusMessage::encoding())? + // The async python driver requires a valid server_version + .write_message_noflush(&BeMessage::ParameterStatus( + BeParameterStatusMessage::ServerVersion("14.1"), + ))? .write_message(&BeMessage::ReadyForQuery)?; self.state = ProtoState::Established; }