diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 1645c44de5..bd4b7df690 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -118,11 +118,15 @@ pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn { } pub fn get_current_timestamp() -> TimestampTz { + to_pg_timestamp(SystemTime::now()) +} + +pub fn to_pg_timestamp(time: SystemTime) -> TimestampTz { const UNIX_EPOCH_JDATE: u64 = 2440588; /* == date2j(1970, 1, 1) */ const POSTGRES_EPOCH_JDATE: u64 = 2451545; /* == date2j(2000, 1, 1) */ const SECS_PER_DAY: u64 = 86400; const USECS_PER_SEC: u64 = 1000000; - match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { + match time.duration_since(SystemTime::UNIX_EPOCH) { Ok(n) => { ((n.as_secs() - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY)) * USECS_PER_SEC diff --git a/libs/utils/src/pq_proto.rs b/libs/utils/src/pq_proto.rs index e1677f4311..ce86cf8c91 100644 --- a/libs/utils/src/pq_proto.rs +++ b/libs/utils/src/pq_proto.rs @@ -503,6 +503,18 @@ impl RowDescriptor<'_> { formatcode: 0, } } + + pub const fn text_col(name: &[u8]) -> RowDescriptor { + RowDescriptor { + name, + tableoid: 0, + attnum: 0, + typoid: TEXT_OID, + typlen: -1, + typmod: 0, + formatcode: 0, + } + } } #[derive(Debug)] diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 78a27e460f..14e6d40759 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -154,9 +154,17 @@ impl<'a> Basebackup<'a> { let img = self .timeline .get_slru_page_at_lsn(slru, segno, blknum, self.lsn)?; - ensure!(img.len() == pg_constants::BLCKSZ as usize); - slru_buf.extend_from_slice(&img); + if slru == SlruKind::Clog { + ensure!( + img.len() == pg_constants::BLCKSZ as usize + || img.len() == pg_constants::BLCKSZ as usize + 8 + ); + } else { + ensure!(img.len() == pg_constants::BLCKSZ as usize); + } + + slru_buf.extend_from_slice(&img[..pg_constants::BLCKSZ as usize]); } let segname = format!("{}/{:>04X}", slru.to_str(), segno); diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 0adafab8ba..e584a101cd 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -31,7 +31,7 @@ use utils::{ use crate::basebackup; use crate::config::{PageServerConf, ProfilingConfig}; -use crate::pgdatadir_mapping::DatadirTimeline; +use crate::pgdatadir_mapping::{DatadirTimeline, LsnForTimestamp}; use crate::profiling::profpoint_start; use crate::reltag::RelTag; use crate::repository::Repository; @@ -42,6 +42,7 @@ use crate::thread_mgr::ThreadKind; use crate::walreceiver; use crate::CheckpointConfig; use metrics::{register_histogram_vec, HistogramVec}; +use postgres_ffi::xlog_utils::to_pg_timestamp; // Wrapped in libpq CopyData enum PagestreamFeMessage { @@ -805,6 +806,33 @@ impl postgres_backend::Handler for PageServerHandler { pgb.write_message_noflush(&SINGLE_COL_ROWDESC)? .write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?; + } else if query_string.starts_with("get_lsn_by_timestamp ") { + // Locate LSN of last transaction with timestamp less or equal than sppecified + // TODO lazy static + let re = Regex::new(r"^get_lsn_by_timestamp ([[:xdigit:]]+) ([[:xdigit:]]+) '(.*)'$") + .unwrap(); + let caps = re + .captures(query_string) + .with_context(|| format!("invalid get_lsn_by_timestamp: '{}'", query_string))?; + + let tenantid = ZTenantId::from_str(caps.get(1).unwrap().as_str())?; + let timelineid = ZTimelineId::from_str(caps.get(2).unwrap().as_str())?; + let timeline = tenant_mgr::get_local_timeline_with_load(tenantid, timelineid) + .context("Cannot load local timeline")?; + + let timestamp = humantime::parse_rfc3339(caps.get(3).unwrap().as_str())?; + let timestamp_pg = to_pg_timestamp(timestamp); + + pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor::text_col( + b"lsn", + )]))?; + let result = match timeline.find_lsn_for_timestamp(timestamp_pg)? { + LsnForTimestamp::Present(lsn) => format!("{}", lsn), + LsnForTimestamp::Future(_lsn) => "future".into(), + LsnForTimestamp::Past(_lsn) => "past".into(), + }; + pgb.write_message_noflush(&BeMessage::DataRow(&[Some(result.as_bytes())]))?; + pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?; } else { bail!("unknown command"); } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 071eccc05d..c052aa3d69 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -13,6 +13,7 @@ use crate::repository::{Repository, Timeline}; use crate::walrecord::ZenithWalRecord; use anyhow::{bail, ensure, Result}; use bytes::{Buf, Bytes}; +use postgres_ffi::xlog_utils::TimestampTz; use postgres_ffi::{pg_constants, Oid, TransactionId}; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; @@ -45,6 +46,13 @@ where current_logical_size: AtomicIsize, } +#[derive(Debug)] +pub enum LsnForTimestamp { + Present(Lsn), + Future(Lsn), + Past(Lsn), +} + impl DatadirTimeline { pub fn new(tline: Arc, repartition_threshold: u64) -> Self { DatadirTimeline { @@ -202,6 +210,106 @@ impl DatadirTimeline { Ok(exists) } + /// Locate LSN, such that all transactions that committed before + /// 'search_timestamp' are visible, but nothing newer is. + /// + /// This is not exact. Commit timestamps are not guaranteed to be ordered, + /// so it's not well defined which LSN you get if there were multiple commits + /// "in flight" at that point in time. + /// + pub fn find_lsn_for_timestamp(&self, search_timestamp: TimestampTz) -> Result { + let gc_cutoff_lsn_guard = self.tline.get_latest_gc_cutoff_lsn(); + let min_lsn = *gc_cutoff_lsn_guard; + let max_lsn = self.tline.get_last_record_lsn(); + + // LSNs are always 8-byte aligned. low/mid/high represent the + // LSN divided by 8. + let mut low = min_lsn.0 / 8; + let mut high = max_lsn.0 / 8 + 1; + + let mut found_smaller = false; + let mut found_larger = false; + while low < high { + // cannot overflow, high and low are both smaller than u64::MAX / 2 + let mid = (high + low) / 2; + + let cmp = self.is_latest_commit_timestamp_ge_than( + search_timestamp, + Lsn(mid * 8), + &mut found_smaller, + &mut found_larger, + )?; + + if cmp { + high = mid; + } else { + low = mid + 1; + } + } + match (found_smaller, found_larger) { + (false, false) => { + // This can happen if no commit records have been processed yet, e.g. + // just after importing a cluster. + bail!("no commit timestamps found"); + } + (true, false) => { + // Didn't find any commit timestamps larger than the request + Ok(LsnForTimestamp::Future(max_lsn)) + } + (false, true) => { + // Didn't find any commit timestamps smaller than the request + Ok(LsnForTimestamp::Past(max_lsn)) + } + (true, true) => { + // low is the LSN of the first commit record *after* the search_timestamp, + // Back off by one to get to the point just before the commit. + // + // FIXME: it would be better to get the LSN of the previous commit. + // Otherwise, if you restore to the returned LSN, the database will + // include physical changes from later commits that will be marked + // as aborted, and will need to be vacuumed away. + Ok(LsnForTimestamp::Present(Lsn((low - 1) * 8))) + } + } + } + + /// + /// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any + /// commits that committed after 'search_timestamp', at LSN 'probe_lsn'. + /// + /// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits + /// with a smaller/larger timestamp. + /// + fn is_latest_commit_timestamp_ge_than( + &self, + search_timestamp: TimestampTz, + probe_lsn: Lsn, + found_smaller: &mut bool, + found_larger: &mut bool, + ) -> Result { + for segno in self.list_slru_segments(SlruKind::Clog, probe_lsn)? { + let nblocks = self.get_slru_segment_size(SlruKind::Clog, segno, probe_lsn)?; + for blknum in (0..nblocks).rev() { + let clog_page = + self.get_slru_page_at_lsn(SlruKind::Clog, segno, blknum, probe_lsn)?; + + if clog_page.len() == pg_constants::BLCKSZ as usize + 8 { + let mut timestamp_bytes = [0u8; 8]; + timestamp_bytes.copy_from_slice(&clog_page[pg_constants::BLCKSZ as usize..]); + let timestamp = TimestampTz::from_be_bytes(timestamp_bytes); + + if timestamp >= search_timestamp { + *found_larger = true; + return Ok(true); + } else { + *found_smaller = true; + } + } + } + } + Ok(false) + } + /// Get a list of SLRU segments pub fn list_slru_segments(&self, kind: SlruKind, lsn: Lsn) -> Result> { // fetch directory entry diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 583cdecb1d..a929e290ad 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -635,7 +635,10 @@ impl<'a, R: Repository> WalIngest<'a, R> { segno, rpageno, if is_commit { - ZenithWalRecord::ClogSetCommitted { xids: page_xids } + ZenithWalRecord::ClogSetCommitted { + xids: page_xids, + timestamp: parsed.xact_time, + } } else { ZenithWalRecord::ClogSetAborted { xids: page_xids } }, @@ -652,7 +655,10 @@ impl<'a, R: Repository> WalIngest<'a, R> { segno, rpageno, if is_commit { - ZenithWalRecord::ClogSetCommitted { xids: page_xids } + ZenithWalRecord::ClogSetCommitted { + xids: page_xids, + timestamp: parsed.xact_time, + } } else { ZenithWalRecord::ClogSetAborted { xids: page_xids } }, diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 5947a0c147..e8699cfa22 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -24,7 +24,10 @@ pub enum ZenithWalRecord { flags: u8, }, /// Mark transaction IDs as committed on a CLOG page - ClogSetCommitted { xids: Vec }, + ClogSetCommitted { + xids: Vec, + timestamp: TimestampTz, + }, /// Mark transaction IDs as aborted on a CLOG page ClogSetAborted { xids: Vec }, /// Extend multixact offsets SLRU diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 6338b839ae..777718b311 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -283,6 +283,11 @@ impl PostgresRedoManager { // If something went wrong, don't try to reuse the process. Kill it, and // next request will launch a new one. if result.is_err() { + error!( + "error applying {} WAL records to reconstruct page image at LSN {}", + records.len(), + lsn + ); let process = process_guard.take().unwrap(); process.kill(); } @@ -387,7 +392,7 @@ impl PostgresRedoManager { } // Non-relational WAL records are handled here, with custom code that has the // same effects as the corresponding Postgres WAL redo function. - ZenithWalRecord::ClogSetCommitted { xids } => { + ZenithWalRecord::ClogSetCommitted { xids, timestamp } => { let (slru_kind, segno, blknum) = key_to_slru_block(key).or(Err(WalRedoError::InvalidRecord))?; assert_eq!( @@ -421,6 +426,21 @@ impl PostgresRedoManager { page, ); } + + // Append the timestamp + if page.len() == pg_constants::BLCKSZ as usize + 8 { + page.truncate(pg_constants::BLCKSZ as usize); + } + if page.len() == pg_constants::BLCKSZ as usize { + page.extend_from_slice(×tamp.to_be_bytes()); + } else { + warn!( + "CLOG blk {} in seg {} has invalid size {}", + blknum, + segno, + page.len() + ); + } } ZenithWalRecord::ClogSetAborted { xids } => { let (slru_kind, segno, blknum) = diff --git a/test_runner/batch_others/test_lsn_mapping.py b/test_runner/batch_others/test_lsn_mapping.py new file mode 100644 index 0000000000..37113b46f2 --- /dev/null +++ b/test_runner/batch_others/test_lsn_mapping.py @@ -0,0 +1,84 @@ +from contextlib import closing +from datetime import timedelta, timezone, tzinfo +import math +from uuid import UUID +import psycopg2.extras +import psycopg2.errors +from fixtures.zenith_fixtures import ZenithEnv, ZenithEnvBuilder, Postgres +from fixtures.log_helper import log +import time + + +# +# Test pageserver get_lsn_by_timestamp API +# +def test_lsn_mapping(zenith_env_builder: ZenithEnvBuilder): + zenith_env_builder.num_safekeepers = 1 + env = zenith_env_builder.init_start() + + new_timeline_id = env.zenith_cli.create_branch('test_lsn_mapping') + pgmain = env.postgres.create_start("test_lsn_mapping") + log.info("postgres is running on 'test_lsn_mapping' branch") + + ps_conn = env.pageserver.connect() + ps_cur = ps_conn.cursor() + conn = pgmain.connect() + cur = conn.cursor() + + # Create table, and insert rows, each in a separate transaction + # Disable synchronous_commit to make this initialization go faster. + # + # Each row contains current insert LSN and the current timestamp, when + # the row was inserted. + cur.execute("SET synchronous_commit=off") + cur.execute("CREATE TABLE foo (x integer)") + tbl = [] + for i in range(1000): + cur.execute(f"INSERT INTO foo VALUES({i})") + cur.execute(f'SELECT clock_timestamp()') + # Get the timestamp at UTC + after_timestamp = cur.fetchone()[0].replace(tzinfo=None) + tbl.append([i, after_timestamp]) + + # Execute one more transaction with synchronous_commit enabled, to flush + # all the previous transactions + cur.execute("SET synchronous_commit=on") + cur.execute("INSERT INTO foo VALUES (-1)") + + # Check edge cases: timestamp in the future + probe_timestamp = tbl[-1][1] + timedelta(hours=1) + ps_cur.execute( + f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'" + ) + result = ps_cur.fetchone()[0] + assert result == 'future' + + # timestamp too the far history + probe_timestamp = tbl[0][1] - timedelta(hours=10) + ps_cur.execute( + f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'" + ) + result = ps_cur.fetchone()[0] + assert result == 'past' + + # Probe a bunch of timestamps in the valid range + for i in range(1, len(tbl), 100): + probe_timestamp = tbl[i][1] + + # Call get_lsn_by_timestamp to get the LSN + ps_cur.execute( + f"get_lsn_by_timestamp {env.initial_tenant.hex} {new_timeline_id.hex} '{probe_timestamp.isoformat()}Z'" + ) + lsn = ps_cur.fetchone()[0] + + # Launch a new read-only node at that LSN, and check that only the rows + # that were supposed to be committed at that point in time are visible. + pg_here = env.postgres.create_start(branch_name='test_lsn_mapping', + node_name='test_lsn_mapping_read', + lsn=lsn) + with closing(pg_here.connect()) as conn_here: + with conn_here.cursor() as cur_here: + cur_here.execute("SELECT max(x) FROM foo") + assert cur_here.fetchone()[0] == i + + pg_here.stop_and_destroy() diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 5614cea68b..5b25b1c457 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1572,6 +1572,7 @@ class Postgres(PgProtocol): assert self.node_name is not None self.env.zenith_cli.pg_stop(self.node_name, self.tenant_id, True) self.node_name = None + self.running = False return self