From 9ede38b6c4aec5a1d49f0e83278f112f1eb4069e Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Tue, 3 May 2022 09:28:57 +0300 Subject: [PATCH] Support finding LSN from a commit timestamp. A new `get_lsn_by_timestamp` command is added to the libpq page service API. An extra timestamp field is now stored in an extra field after each Clog page. It is the timestamp of the latest commit, among all the transactions on the Clog page. To find the overall latest commit, we need to scan all Clog pages, but this isn't a very frequent operation so that's not too bad. To find the LSN that corresponds to a timestamp, we perform a binary search. The binary search starts with min = last LSN when GC ran, and max = latest LSN on the timeline. On each iteration of the search we check if there are any commits with a higher-than-requested timestamp at that LSN. Implements github issue 1361. --- libs/postgres_ffi/src/xlog_utils.rs | 6 +- libs/utils/src/pq_proto.rs | 12 +++ pageserver/src/basebackup.rs | 12 ++- pageserver/src/page_service.rs | 30 +++++- pageserver/src/pgdatadir_mapping.rs | 108 +++++++++++++++++++ pageserver/src/walingest.rs | 10 +- pageserver/src/walrecord.rs | 5 +- pageserver/src/walredo.rs | 22 +++- test_runner/batch_others/test_lsn_mapping.py | 84 +++++++++++++++ test_runner/fixtures/zenith_fixtures.py | 1 + 10 files changed, 282 insertions(+), 8 deletions(-) create mode 100644 test_runner/batch_others/test_lsn_mapping.py diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 1645c44de5..bd4b7df690 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -118,11 +118,15 @@ pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn { } pub fn get_current_timestamp() -> TimestampTz { + to_pg_timestamp(SystemTime::now()) +} + +pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz { const UNIX_EPOCH_JDATE: u64 = 2440588; /* == date2j(1970, 1, 1) */ const POSTGRES_EPOCH_JDATE: u64 = 2451545; /* == date2j(2000, 1, 1) */ const SECS_PER_DAY: u64 = 86400; const USECS_PER_SEC: u64 = 1000000; - match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { + match time.duration_since(SystemTime::UNIX_EPOCH) { Ok(n) => { ((n.as_secs() - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY)) * USECS_PER_SEC diff --git a/libs/utils/src/pq_proto.rs b/libs/utils/src/pq_proto.rs index e1677f4311..ce86cf8c91 100644 --- a/libs/utils/src/pq_proto.rs +++ b/libs/utils/src/pq_proto.rs @@ -503,6 +503,18 @@ impl RowDescriptor<'_> { formatcode: 0, } } + + pub const fn text_col(name: &[u8]) -> RowDescriptor { + RowDescriptor { + name, + tableoid: 0, + attnum: 0, + typoid: TEXT_OID, + typlen: -1, + typmod: 0, + formatcode: 0, + } + } } #[derive(Debug)] diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 78a27e460f..14e6d40759 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -154,9 +154,17 @@ impl<'a> Basebackup<'a> { let img = self .timeline .get_slru_page_at_lsn(slru, segno, blknum, self.lsn)?; - ensure!(img.len() == pg_constants::BLCKSZ as usize); - slru_buf.extend_from_slice(&img); + if slru == SlruKind::Clog { + ensure!( + img.len() == pg_constants::BLCKSZ as usize + || img.len() == pg_constants::BLCKSZ as usize + 8 + ); + } else { + ensure!(img.len() == pg_constants::BLCKSZ as usize); + } + + slru_buf.extend_from_slice(&img[..pg_constants::BLCKSZ as usize]); } let segname = format!("{}/{:>04X}", slru.to_str(), segno); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 0adafab8ba..e584a101cd 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -31,7 +31,7 @@ use utils::{ use crate::basebackup; use crate::config::{PageServerConf, ProfilingConfig}; -use crate::pgdatadir_mapping::DatadirTimeline; +use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp}; use crate::profiling::profpoint_start; use crate::reltag::RelTag; use crate::repository::Repository; @@ -42,6 +42,7 @@ use crate::thread_mgr::ThreadKind; use crate::walreceiver; use crate::CheckpointConfig; use metrics::{register_histogram_vec, HistogramVec}; +use postgres_ffi::xlog_utils::to_pg_timestamp; // Wrapped in libpq CopyData enum PagestreamFeMessage { @@ -805,6 +806,33 @@ impl postgres_backend::Handler for PageServerHandler { pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + } else if query_string.starts_with("get_lsn_by_timestamp ") { + // Locate LSN of last transaction with timestamp less or equal than sppecified + // TODO lazy static + let re = Regex::new(r"^get_lsn_by_timestamp ([[:xdigit:]]+) ([[:xdigit:]]+) '(.*)'$") + .unwrap(); + let caps = re + .captures(query_string) + .with_context(|| format!("invalid get_lsn_by_timestamp: '{}'", query_string))?; + + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid) + .context("Cannot load local timeline")?; + + let timestamp = humantime::parse_rfc3339(caps.get(3).unwrap().as_str())?; + let timestamp_pg = to_pg_timestamp(timestamp); + + pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col( + b"lsn", + )]))?; + let result = match timeline.find_lsn_for_timestamp(timestamp_pg)? { + LsnForTimestamp::Present(lsn) => format!("{}", lsn), + LsnForTimestamp::Future(_lsn) => "future".into(), + LsnForTimestamp::Past(_lsn) => "past".into(), + }; + pgb.write_message_noflush(&BeMessage::DataRow(&[Some(result.as_bytes())]))?; + pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; } else { bail!("unknown command"); } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 071eccc05d..c052aa3d69 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -13,6 +13,7 @@ use crate::repository::{Repository, Timeline}; use crate::walrecord::ZenithWalRecord; use anyhow::{bail, ensure, Result}; use bytes::{Buf, Bytes}; +use postgres_ffi::xlog_utils::TimestampTz; use postgres_ffi::{pg_constants, Oid, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; @@ -45,6 +46,13 @@ where current_logical_size: AtomicIsize, } +#[derive(Debug)] +pub enum LsnForTimestamp { + Present(Lsn), + Future(Lsn), + Past(Lsn), +} + impl DatadirTimeline { pub fn new(tline: Arc, repartition_threshold: u64) -> Self { DatadirTimeline { @@ -202,6 +210,106 @@ impl DatadirTimeline { Ok(exists) } + /// Locate LSN, such that all transactions that committed before + /// 'search_timestamp' are visible, but nothing newer is. + /// + /// This is not exact. Commit timestamps are not guaranteed to be ordered, + /// so it's not well defined which LSN you get if there were multiple commits + /// "in flight" at that point in time. + /// + pub fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result { + let gc_cutoff_lsn_guard = self.tline.get_latest_gc_cutoff_lsn(); + let min_lsn = *gc_cutoff_lsn_guard; + let max_lsn = self.tline.get_last_record_lsn(); + + // LSNs are always 8-byte aligned. low/mid/high represent the + // LSN divided by 8. + let mut low = min_lsn.0 / 8; + let mut high = max_lsn.0 / 8 + 1; + + let mut found_smaller = false; + let mut found_larger = false; + while low < high { + // cannot overflow, high and low are both smaller than u64::MAX / 2 + let mid = (high + low) / 2; + + let cmp = self.is_latest_commit_timestamp_ge_than( + search_timestamp, + Lsn(mid * 8), + &mut found_smaller, + &mut found_larger, + )?; + + if cmp { + high = mid; + } else { + low = mid + 1; + } + } + match (found_smaller, found_larger) { + (false, false) => { + // This can happen if no commit records have been processed yet, e.g. + // just after importing a cluster. + bail!("no commit timestamps found"); + } + (true, false) => { + // Didn't find any commit timestamps larger than the request + Ok(LsnForTimestamp::Future(max_lsn)) + } + (false, true) => { + // Didn't find any commit timestamps smaller than the request + Ok(LsnForTimestamp::Past(max_lsn)) + } + (true, true) => { + // low is the LSN of the first commit record *after* the search_timestamp, + // Back off by one to get to the point just before the commit. + // + // FIXME: it would be better to get the LSN of the previous commit. + // Otherwise, if you restore to the returned LSN, the database will + // include physical changes from later commits that will be marked + // as aborted, and will need to be vacuumed away. + Ok(LsnForTimestamp::Present(Lsn((low - 1) * 8))) + } + } + } + + /// + /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any + /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'. + /// + /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits + /// with a smaller/larger timestamp. + /// + fn is_latest_commit_timestamp_ge_than( + &self, + search_timestamp: TimestampTz, + probe_lsn: Lsn, + found_smaller: &mut bool, + found_larger: &mut bool, + ) -> Result { + for segno in self.list_slru_segments(SlruKind::Clog, probe_lsn)? { + let nblocks = self.get_slru_segment_size(SlruKind::Clog, segno, probe_lsn)?; + for blknum in (0..nblocks).rev() { + let clog_page = + self.get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn)?; + + if clog_page.len() == pg_constants::BLCKSZ as usize + 8 { + let mut timestamp_bytes = [0u8; 8]; + timestamp_bytes.copy_from_slice(&clog_page[pg_constants::BLCKSZ as usize..]); + let timestamp = TimestampTz::from_be_bytes(timestamp_bytes); + + if timestamp >= search_timestamp { + *found_larger = true; + return Ok(true); + } else { + *found_smaller = true; + } + } + } + } + Ok(false) + } + /// Get a list of SLRU segments pub fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result> { // fetch directory entry diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 583cdecb1d..a929e290ad 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -635,7 +635,10 @@ impl<'a, R: Repository> WalIngest<'a, R> { segno, rpageno, if is_commit { - ZenithWalRecord::ClogSetCommitted { xids: page_xids } + ZenithWalRecord::ClogSetCommitted { + xids: page_xids, + timestamp: parsed.xact_time, + } } else { ZenithWalRecord::ClogSetAborted { xids: page_xids } }, @@ -652,7 +655,10 @@ impl<'a, R: Repository> WalIngest<'a, R> { segno, rpageno, if is_commit { - ZenithWalRecord::ClogSetCommitted { xids: page_xids } + ZenithWalRecord::ClogSetCommitted { + xids: page_xids, + timestamp: parsed.xact_time, + } } else { ZenithWalRecord::ClogSetAborted { xids: page_xids } }, diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 5947a0c147..e8699cfa22 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -24,7 +24,10 @@ pub enum ZenithWalRecord { flags: u8, }, /// Mark transaction IDs as committed on a CLOG page - ClogSetCommitted { xids: Vec }, + ClogSetCommitted { + xids: Vec, + timestamp: TimestampTz, + }, /// Mark transaction IDs as aborted on a CLOG page ClogSetAborted { xids: Vec }, /// Extend multixact offsets SLRU diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 6338b839ae..777718b311 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -283,6 +283,11 @@ impl PostgresRedoManager { // If something went wrong, don't try to reuse the process. Kill it, and // next request will launch a new one. if result.is_err() { + error!( + "error applying {} WAL records to reconstruct page image at LSN {}", + records.len(), + lsn + ); let process = process_guard.take().unwrap(); process.kill(); } @@ -387,7 +392,7 @@ impl PostgresRedoManager { } // Non-relational WAL records are handled here, with custom code that has the // same effects as the corresponding Postgres WAL redo function. - ZenithWalRecord::ClogSetCommitted { xids } => { + ZenithWalRecord::ClogSetCommitted { xids, timestamp } => { let (slru_kind, segno, blknum) = key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?; assert_eq!( @@ -421,6 +426,21 @@ impl PostgresRedoManager { page, ); } + + // Append the timestamp + if page.len() == pg_constants::BLCKSZ as usize + 8 { + page.truncate(pg_constants::BLCKSZ as usize); + } + if page.len() == pg_constants::BLCKSZ as usize { + page.extend_from_slice(×tamp.to_be_bytes()); + } else { + warn!( + "CLOG blk {} in seg {} has invalid size {}", + blknum, + segno, + page.len() + ); + } } ZenithWalRecord::ClogSetAborted { xids } => { let (slru_kind, segno, blknum) = diff --git a/test_runner/batch_others/test_lsn_mapping.py b/test_runner/batch_others/test_lsn_mapping.py new file mode 100644 index 0000000000..37113b46f2 --- /dev/null +++ b/test_runner/batch_others/test_lsn_mapping.py @@ -0,0 +1,84 @@ +from contextlib import closing +from datetime import timedelta, timezone, tzinfo +import math +from uuid import UUID +import psycopg2.extras +import psycopg2.errors +from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, Postgres +from fixtures.log_helper import log +import time + + +# +# Test pageserver get_lsn_by_timestamp API +# +def test_lsn_mapping(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.num_safekeepers = 1 + env = zenith_env_builder.init_start() + + new_timeline_id = env.zenith_cli.create_branch('test_lsn_mapping') + pgmain = env.postgres.create_start("test_lsn_mapping") + log.info("postgres is running on 'test_lsn_mapping' branch") + + ps_conn = env.pageserver.connect() + ps_cur = ps_conn.cursor() + conn = pgmain.connect() + cur = conn.cursor() + + # Create table, and insert rows, each in a separate transaction + # Disable synchronous_commit to make this initialization go faster. + # + # Each row contains current insert LSN and the current timestamp, when + # the row was inserted. + cur.execute("SET synchronous_commit=off") + cur.execute("CREATE TABLE foo (x integer)") + tbl = [] + for i in range(1000): + cur.execute(f"INSERT INTO foo VALUES({i})") + cur.execute(f'SELECT clock_timestamp()') + # Get the timestamp at UTC + after_timestamp = cur.fetchone()[0].replace(tzinfo=None) + tbl.append([i, after_timestamp]) + + # Execute one more transaction with synchronous_commit enabled, to flush + # all the previous transactions + cur.execute("SET synchronous_commit=on") + cur.execute("INSERT INTO foo VALUES (-1)") + + # Check edge cases: timestamp in the future + probe_timestamp = tbl[-1][1] + timedelta(hours=1) + ps_cur.execute( + f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'" + ) + result = ps_cur.fetchone()[0] + assert result == 'future' + + # timestamp too the far history + probe_timestamp = tbl[0][1] - timedelta(hours=10) + ps_cur.execute( + f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'" + ) + result = ps_cur.fetchone()[0] + assert result == 'past' + + # Probe a bunch of timestamps in the valid range + for i in range(1, len(tbl), 100): + probe_timestamp = tbl[i][1] + + # Call get_lsn_by_timestamp to get the LSN + ps_cur.execute( + f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'" + ) + lsn = ps_cur.fetchone()[0] + + # Launch a new read-only node at that LSN, and check that only the rows + # that were supposed to be committed at that point in time are visible. + pg_here = env.postgres.create_start(branch_name='test_lsn_mapping', + node_name='test_lsn_mapping_read', + lsn=lsn) + with closing(pg_here.connect()) as conn_here: + with conn_here.cursor() as cur_here: + cur_here.execute("SELECT max(x) FROM foo") + assert cur_here.fetchone()[0] == i + + pg_here.stop_and_destroy() diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 5614cea68b..5b25b1c457 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1572,6 +1572,7 @@ class Postgres(PgProtocol): assert self.node_name is not None self.env.zenith_cli.pg_stop(self.node_name, self.tenant_id, True) self.node_name = None + self.running = False return self