mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 04:00:38 +00:00
Gc cutoff rwlock (#1139)
* Reproduce github issue #1047. * Use RwLock to protect gc_cuttof_lsn * Eeduce number of updates in test_gc_aggressive * Change test_prohibit_get_page_at_lsn_for_garbage_collected_pages test * Change test_prohibit_get_page_at_lsn_for_garbage_collected_pages * Lock latest_gc_cutoff_lsn in all operations accessing storage to prevent race conditions with GC * Remove random sleep between wait_for_lsn and get_page_at_lsn * Initialize latest_gc_cutoff with initdb_lsn and remove separate check that lsn >= initdb_lsn * Update test_prohibit_branch_creation_on_pre_initdb_lsn test Co-authored-by: Heikki Linnakangas <heikki@zenith.tech>
This commit is contained in:
committed by
GitHub
parent
c44695f34b
commit
79f0e44a20
@@ -28,7 +28,7 @@ use std::io::Write;
|
||||
use std::ops::{Bound::Included, Deref};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::atomic::{self, AtomicBool, AtomicUsize};
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use self::metadata::{metadata_path, TimelineMetadata, METADATA_FILE_NAME};
|
||||
@@ -167,7 +167,7 @@ impl Repository for LayeredRepository {
|
||||
// Create the timeline directory, and write initial metadata to file.
|
||||
crashsafe_dir::create_dir_all(self.conf.timeline_path(&timelineid, &self.tenantid))?;
|
||||
|
||||
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), Lsn(0), initdb_lsn);
|
||||
let metadata = TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn);
|
||||
Self::save_metadata(self.conf, timelineid, self.tenantid, &metadata, true)?;
|
||||
|
||||
let timeline = LayeredTimeline::new(
|
||||
@@ -201,9 +201,10 @@ impl Repository for LayeredRepository {
|
||||
bail!("Cannot branch off the timeline {} that's not local", src)
|
||||
}
|
||||
};
|
||||
let latest_gc_cutoff_lsn = src_timeline.get_latest_gc_cutoff_lsn();
|
||||
|
||||
src_timeline
|
||||
.check_lsn_is_in_scope(start_lsn)
|
||||
.check_lsn_is_in_scope(start_lsn, &latest_gc_cutoff_lsn)
|
||||
.context("invalid branch start lsn")?;
|
||||
|
||||
let RecordLsn {
|
||||
@@ -231,7 +232,7 @@ impl Repository for LayeredRepository {
|
||||
dst_prev,
|
||||
Some(src),
|
||||
start_lsn,
|
||||
src_timeline.latest_gc_cutoff_lsn.load(),
|
||||
*src_timeline.latest_gc_cutoff_lsn.read().unwrap(),
|
||||
src_timeline.initdb_lsn,
|
||||
);
|
||||
crashsafe_dir::create_dir_all(self.conf.timeline_path(&dst, &self.tenantid))?;
|
||||
@@ -783,7 +784,7 @@ pub struct LayeredTimeline {
|
||||
checkpoint_cs: Mutex<()>,
|
||||
|
||||
// Needed to ensure that we can't create a branch at a point that was already garbage collected
|
||||
latest_gc_cutoff_lsn: AtomicLsn,
|
||||
latest_gc_cutoff_lsn: RwLock<Lsn>,
|
||||
|
||||
// It may change across major versions so for simplicity
|
||||
// keep it after running initdb for a timeline.
|
||||
@@ -827,6 +828,10 @@ impl Timeline for LayeredTimeline {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_latest_gc_cutoff_lsn(&self) -> RwLockReadGuard<Lsn> {
|
||||
self.latest_gc_cutoff_lsn.read().unwrap()
|
||||
}
|
||||
|
||||
/// Look up given page version.
|
||||
fn get_page_at_lsn(&self, rel: RelishTag, rel_blknum: BlockNumber, lsn: Lsn) -> Result<Bytes> {
|
||||
if !rel.is_blocky() && rel_blknum != 0 {
|
||||
@@ -837,14 +842,6 @@ impl Timeline for LayeredTimeline {
|
||||
);
|
||||
}
|
||||
debug_assert!(lsn <= self.get_last_record_lsn());
|
||||
let latest_gc_cutoff_lsn = self.latest_gc_cutoff_lsn.load();
|
||||
// error instead of assert to simplify testing
|
||||
ensure!(
|
||||
lsn >= latest_gc_cutoff_lsn,
|
||||
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
||||
lsn, latest_gc_cutoff_lsn
|
||||
);
|
||||
|
||||
let (seg, seg_blknum) = SegmentTag::from_blknum(rel, rel_blknum);
|
||||
|
||||
if let Some((layer, lsn)) = self.get_layer_for_read(seg, lsn)? {
|
||||
@@ -1015,21 +1012,16 @@ impl Timeline for LayeredTimeline {
|
||||
///
|
||||
/// Validate lsn against initdb_lsn and latest_gc_cutoff_lsn.
|
||||
///
|
||||
fn check_lsn_is_in_scope(&self, lsn: Lsn) -> Result<()> {
|
||||
let initdb_lsn = self.initdb_lsn;
|
||||
fn check_lsn_is_in_scope(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
|
||||
) -> Result<()> {
|
||||
ensure!(
|
||||
lsn >= initdb_lsn,
|
||||
"LSN {} is earlier than initdb lsn {}",
|
||||
lsn,
|
||||
initdb_lsn,
|
||||
);
|
||||
|
||||
let latest_gc_cutoff_lsn = self.latest_gc_cutoff_lsn.load();
|
||||
ensure!(
|
||||
lsn >= latest_gc_cutoff_lsn,
|
||||
lsn >= **latest_gc_cutoff_lsn,
|
||||
"LSN {} is earlier than latest GC horizon {} (we might've already garbage collected needed data)",
|
||||
lsn,
|
||||
latest_gc_cutoff_lsn,
|
||||
**latest_gc_cutoff_lsn,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
@@ -1143,7 +1135,7 @@ impl LayeredTimeline {
|
||||
write_lock: Mutex::new(()),
|
||||
checkpoint_cs: Mutex::new(()),
|
||||
|
||||
latest_gc_cutoff_lsn: AtomicLsn::from(metadata.latest_gc_cutoff_lsn()),
|
||||
latest_gc_cutoff_lsn: RwLock::new(metadata.latest_gc_cutoff_lsn()),
|
||||
initdb_lsn: metadata.initdb_lsn(),
|
||||
}
|
||||
}
|
||||
@@ -1575,7 +1567,7 @@ impl LayeredTimeline {
|
||||
ondisk_prev_record_lsn,
|
||||
ancestor_timelineid,
|
||||
self.ancestor_lsn,
|
||||
self.latest_gc_cutoff_lsn.load(),
|
||||
*self.latest_gc_cutoff_lsn.read().unwrap(),
|
||||
self.initdb_lsn,
|
||||
);
|
||||
|
||||
@@ -1677,12 +1669,13 @@ impl LayeredTimeline {
|
||||
let now = Instant::now();
|
||||
let mut result: GcResult = Default::default();
|
||||
let disk_consistent_lsn = self.get_disk_consistent_lsn();
|
||||
let _checkpoint_cs = self.checkpoint_cs.lock().unwrap();
|
||||
|
||||
let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered();
|
||||
|
||||
// We need to ensure that no one branches at a point before latest_gc_cutoff_lsn.
|
||||
// See branch_timeline() for details.
|
||||
self.latest_gc_cutoff_lsn.store(cutoff);
|
||||
*self.latest_gc_cutoff_lsn.write().unwrap() = cutoff;
|
||||
|
||||
info!("GC starting");
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ use std::io;
|
||||
use std::net::TcpListener;
|
||||
use std::str;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, RwLockReadGuard};
|
||||
use tracing::*;
|
||||
use zenith_metrics::{register_histogram_vec, HistogramVec};
|
||||
use zenith_utils::auth::{self, JwtAuth};
|
||||
@@ -398,7 +398,12 @@ impl PageServerHandler {
|
||||
/// In either case, if the page server hasn't received the WAL up to the
|
||||
/// requested LSN yet, we will wait for it to arrive. The return value is
|
||||
/// the LSN that should be used to look up the page versions.
|
||||
fn wait_or_get_last_lsn(timeline: &dyn Timeline, lsn: Lsn, latest: bool) -> Result<Lsn> {
|
||||
fn wait_or_get_last_lsn(
|
||||
timeline: &dyn Timeline,
|
||||
mut lsn: Lsn,
|
||||
latest: bool,
|
||||
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
|
||||
) -> Result<Lsn> {
|
||||
if latest {
|
||||
// Latest page version was requested. If LSN is given, it is a hint
|
||||
// to the page server that there have been no modifications to the
|
||||
@@ -419,22 +424,26 @@ impl PageServerHandler {
|
||||
// walsender completes the authentication and starts streaming the
|
||||
// WAL.
|
||||
if lsn <= last_record_lsn {
|
||||
Ok(last_record_lsn)
|
||||
lsn = last_record_lsn;
|
||||
} else {
|
||||
timeline.wait_lsn(lsn)?;
|
||||
// Since we waited for 'lsn' to arrive, that is now the last
|
||||
// record LSN. (Or close enough for our purposes; the
|
||||
// last-record LSN can advance immediately after we return
|
||||
// anyway)
|
||||
Ok(lsn)
|
||||
}
|
||||
} else {
|
||||
if lsn == Lsn(0) {
|
||||
bail!("invalid LSN(0) in request");
|
||||
}
|
||||
timeline.wait_lsn(lsn)?;
|
||||
Ok(lsn)
|
||||
}
|
||||
ensure!(
|
||||
lsn >= **latest_gc_cutoff_lsn,
|
||||
"tried to request a page version that was garbage collected. requested at {} gc cutoff {}",
|
||||
lsn, **latest_gc_cutoff_lsn
|
||||
);
|
||||
Ok(lsn)
|
||||
}
|
||||
|
||||
fn handle_get_rel_exists_request(
|
||||
@@ -445,7 +454,8 @@ impl PageServerHandler {
|
||||
let _enter = info_span!("get_rel_exists", rel = %req.rel, req_lsn = %req.lsn).entered();
|
||||
|
||||
let tag = RelishTag::Relation(req.rel);
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?;
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
|
||||
|
||||
let exists = timeline.get_rel_exists(tag, lsn)?;
|
||||
|
||||
@@ -461,7 +471,8 @@ impl PageServerHandler {
|
||||
) -> Result<PagestreamBeMessage> {
|
||||
let _enter = info_span!("get_nblocks", rel = %req.rel, req_lsn = %req.lsn).entered();
|
||||
let tag = RelishTag::Relation(req.rel);
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?;
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
|
||||
|
||||
let n_blocks = timeline.get_relish_size(tag, lsn)?;
|
||||
|
||||
@@ -482,8 +493,16 @@ impl PageServerHandler {
|
||||
let _enter = info_span!("get_page", rel = %req.rel, blkno = &req.blkno, req_lsn = %req.lsn)
|
||||
.entered();
|
||||
let tag = RelishTag::Relation(req.rel);
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest)?;
|
||||
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(timeline, req.lsn, req.latest, &latest_gc_cutoff_lsn)?;
|
||||
/*
|
||||
// Add a 1s delay to some requests. The delayed causes the requests to
|
||||
// hit the race condition from github issue #1047 more easily.
|
||||
use rand::Rng;
|
||||
if rand::thread_rng().gen::<u8>() < 5 {
|
||||
std::thread::sleep(std::time::Duration::from_millis(1000));
|
||||
}
|
||||
*/
|
||||
let page = timeline.get_page_at_lsn(tag, req.blkno, lsn)?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
|
||||
@@ -504,9 +523,10 @@ impl PageServerHandler {
|
||||
// check that the timeline exists
|
||||
let timeline = tenant_mgr::get_timeline_for_tenant(tenantid, timelineid)
|
||||
.context("Cannot handle basebackup request for a remote timeline")?;
|
||||
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
|
||||
if let Some(lsn) = lsn {
|
||||
timeline
|
||||
.check_lsn_is_in_scope(lsn)
|
||||
.check_lsn_is_in_scope(lsn, &latest_gc_cutoff_lsn)
|
||||
.context("invalid basebackup lsn")?;
|
||||
}
|
||||
|
||||
|
||||
@@ -7,7 +7,7 @@ use postgres_ffi::{MultiXactId, MultiXactOffset, TransactionId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashSet;
|
||||
use std::ops::{AddAssign, Deref};
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, RwLockReadGuard};
|
||||
use std::time::Duration;
|
||||
use zenith_utils::lsn::{Lsn, RecordLsn};
|
||||
use zenith_utils::zid::ZTimelineId;
|
||||
@@ -184,6 +184,9 @@ pub trait Timeline: Send + Sync {
|
||||
///
|
||||
fn wait_lsn(&self, lsn: Lsn) -> Result<()>;
|
||||
|
||||
/// Lock and get timeline's GC cuttof
|
||||
fn get_latest_gc_cutoff_lsn(&self) -> RwLockReadGuard<Lsn>;
|
||||
|
||||
/// Look up given page version.
|
||||
fn get_page_at_lsn(&self, tag: RelishTag, blknum: BlockNumber, lsn: Lsn) -> Result<Bytes>;
|
||||
|
||||
@@ -235,7 +238,11 @@ pub trait Timeline: Send + Sync {
|
||||
|
||||
///
|
||||
/// Check that it is valid to request operations with that lsn.
|
||||
fn check_lsn_is_in_scope(&self, lsn: Lsn) -> Result<()>;
|
||||
fn check_lsn_is_in_scope(
|
||||
&self,
|
||||
lsn: Lsn,
|
||||
latest_gc_cutoff_lsn: &RwLockReadGuard<Lsn>,
|
||||
) -> Result<()>;
|
||||
|
||||
/// Retrieve current logical size of the timeline
|
||||
///
|
||||
@@ -987,7 +994,7 @@ mod tests {
|
||||
.source()
|
||||
.unwrap()
|
||||
.to_string()
|
||||
.contains("is earlier than initdb lsn"));
|
||||
.contains("is earlier than latest GC horizon"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1004,12 +1011,11 @@ mod tests {
|
||||
make_some_layers(&tline, Lsn(0x20))?;
|
||||
|
||||
repo.gc_iteration(Some(TIMELINE_ID), 0x10, false)?;
|
||||
|
||||
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
|
||||
assert!(*latest_gc_cutoff_lsn > Lsn(0x25));
|
||||
match tline.get_page_at_lsn(TESTREL_A, 0, Lsn(0x25)) {
|
||||
Ok(_) => panic!("request for page should have failed"),
|
||||
Err(err) => assert!(err
|
||||
.to_string()
|
||||
.contains("tried to request a page version that was garbage collected")),
|
||||
Err(err) => assert!(err.to_string().contains("not found at")),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
84
test_runner/batch_others/test_gc_aggressive.py
Normal file
84
test_runner/batch_others/test_gc_aggressive.py
Normal file
@@ -0,0 +1,84 @@
|
||||
from contextlib import closing
|
||||
|
||||
import asyncio
|
||||
import asyncpg
|
||||
import random
|
||||
|
||||
from fixtures.zenith_fixtures import ZenithEnv, Postgres, Safekeeper
|
||||
from fixtures.log_helper import log
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
# Test configuration
|
||||
#
|
||||
# Create a table with {num_rows} rows, and perform {updates_to_perform} random
|
||||
# UPDATEs on it, using {num_connections} separate connections.
|
||||
num_connections = 10
|
||||
num_rows = 100000
|
||||
updates_to_perform = 10000
|
||||
|
||||
updates_performed = 0
|
||||
|
||||
|
||||
# Run random UPDATEs on test table
|
||||
async def update_table(pg: Postgres):
|
||||
global updates_performed
|
||||
pg_conn = await pg.connect_async()
|
||||
|
||||
while updates_performed < updates_to_perform:
|
||||
updates_performed += 1
|
||||
id = random.randrange(0, num_rows)
|
||||
row = await pg_conn.fetchrow(f'UPDATE foo SET counter = counter + 1 WHERE id = {id}')
|
||||
|
||||
|
||||
# Perform aggressive GC with 0 horizon
|
||||
async def gc(env: ZenithEnv, timeline: str):
|
||||
psconn = await env.pageserver.connect_async()
|
||||
|
||||
while updates_performed < updates_to_perform:
|
||||
await psconn.execute(f"do_gc {env.initial_tenant} {timeline} 0")
|
||||
|
||||
|
||||
# At the same time, run UPDATEs and GC
|
||||
async def update_and_gc(env: ZenithEnv, pg: Postgres, timeline: str):
|
||||
workers = []
|
||||
for worker_id in range(num_connections):
|
||||
workers.append(asyncio.create_task(update_table(pg)))
|
||||
workers.append(asyncio.create_task(gc(env, timeline)))
|
||||
|
||||
# await all workers
|
||||
await asyncio.gather(*workers)
|
||||
|
||||
|
||||
#
|
||||
# Aggressively force GC, while running queries.
|
||||
#
|
||||
# (repro for https://github.com/zenithdb/zenith/issues/1047)
|
||||
#
|
||||
def test_gc_aggressive(zenith_simple_env: ZenithEnv):
|
||||
env = zenith_simple_env
|
||||
# Create a branch for us
|
||||
env.zenith_cli(["branch", "test_gc_aggressive", "empty"])
|
||||
|
||||
pg = env.postgres.create_start('test_gc_aggressive')
|
||||
log.info('postgres is running on test_gc_aggressive branch')
|
||||
|
||||
conn = pg.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("SHOW zenith.zenith_timeline")
|
||||
timeline = cur.fetchone()[0]
|
||||
|
||||
# Create table, and insert the first 100 rows
|
||||
cur.execute('CREATE TABLE foo (id int, counter int, t text)')
|
||||
cur.execute(f'''
|
||||
INSERT INTO foo
|
||||
SELECT g, 0, 'long string to consume some space' || g
|
||||
FROM generate_series(1, {num_rows}) g
|
||||
''')
|
||||
cur.execute('CREATE INDEX ON foo(id)')
|
||||
|
||||
asyncio.run(update_and_gc(env, pg, timeline))
|
||||
|
||||
row = cur.execute('SELECT COUNT(*), SUM(counter) FROM foo')
|
||||
assert cur.fetchone() == (num_rows, updates_to_perform)
|
||||
@@ -366,6 +366,10 @@ impl PostgresBackend {
|
||||
AuthType::Trust => {
|
||||
self.write_message_noflush(&BeMessage::AuthenticationOk)?
|
||||
.write_message_noflush(&BeParameterStatusMessage::encoding())?
|
||||
// The async python driver requires a valid server_version
|
||||
.write_message_noflush(&BeMessage::ParameterStatus(
|
||||
BeParameterStatusMessage::ServerVersion("14.1"),
|
||||
))?
|
||||
.write_message(&BeMessage::ReadyForQuery)?;
|
||||
self.state = ProtoState::Established;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user