Compare commits

..

1 Commits

Author SHA1 Message Date
Alexey Kondratov
1e3a6cc813 [pageserver] Do several adjustements to the find_lsn_for_timestamp
1. Use `max(find_lsn_for_timestamp, ancestor_lsn)` as a lower boundary
   for search. We use this method to figure out the branching LSN for
   new branch, but GC cutoff could be before branching point and we
   cannot create new branch with LSN < `ancestor_lsn`.

2. Search for the first commit **before** specified timestamp. This
   solves two drawbacks: i) newly created branch won't contain physical
   changes from later commits that will be marked as aborted, and will
   need to be vacuumed away; and ii) we can still figure out a
   reasonable branching LSN if there were no new commits since the
   specified timestamp.

3. Change `get_lsn_by_timestamp` API method to return LSN even if we
   only found commit **before** the specified timestamp.

Resolves #3414
2023-02-22 20:33:05 +01:00
18 changed files with 121 additions and 351 deletions

View File

@@ -27,8 +27,6 @@ storage:
ansible_host: i-062227ba7f119eb8c
pageserver-1.us-east-2.aws.neon.tech:
ansible_host: i-0b3ec0afab5968938
pageserver-2.us-east-2.aws.neon.tech:
ansible_host: i-0d7a1c4325e71421d
safekeepers:
hosts:

2
Cargo.lock generated
View File

@@ -854,7 +854,6 @@ dependencies = [
"opentelemetry",
"postgres",
"regex",
"reqwest",
"serde",
"serde_json",
"tar",
@@ -4544,7 +4543,6 @@ dependencies = [
"tracing",
"tracing-subscriber",
"url",
"uuid",
"workspace_hack",
]

View File

@@ -32,15 +32,11 @@ RUN cd postgres && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install && \
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/interfaces/libpq install && \
# Enable some of contrib extensions
echo 'trusted = true' >> /usr/local/pgsql/share/extension/autoinc.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/bloom.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/insert_username.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/intagg.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/moddatetime.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/intagg.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control
#########################################################################################
@@ -210,21 +206,6 @@ RUN wget https://github.com/HypoPG/hypopg/archive/refs/tags/1.3.1.tar.gz -O hypo
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/hypopg.control
#########################################################################################
#
# Layer "pg-hashids-pg-build"
# compile pg_hashids extension
#
#########################################################################################
FROM build-deps AS pg-hashids-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
mkdir pg_hashids-src && cd pg_hashids-src && tar xvzf ../pg_hashids.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_hashids.control
#########################################################################################
#
# Layer "rust extensions"
@@ -265,8 +246,6 @@ FROM rust-extensions-build AS pg-jsonschema-pg-build
RUN git clone --depth=1 --single-branch --branch neon_abi_v0.1.4 https://github.com/vadim2404/pg_jsonschema/ && \
cd pg_jsonschema && \
cargo pgx install --release && \
# it's needed to enable extension because it uses untrusted C language
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_jsonschema.control && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
#########################################################################################
@@ -281,8 +260,6 @@ FROM rust-extensions-build AS pg-graphql-pg-build
RUN git clone --depth=1 --single-branch --branch neon_abi_v1.1.0 https://github.com/vadim2404/pg_graphql && \
cd pg_graphql && \
cargo pgx install --release && \
# it's needed to enable extension because it uses untrusted C language
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_graphql.control && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_graphql.control
#########################################################################################
@@ -303,7 +280,6 @@ COPY --from=pgjwt-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-jsonschema-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-graphql-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=hypopg-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-hashids-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \

View File

@@ -17,7 +17,6 @@ regex.workspace = true
serde.workspace = true
serde_json.workspace = true
tar.workspace = true
reqwest = { workspace = true, features = ["json"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tokio-postgres.workspace = true
tracing.workspace = true

View File

@@ -65,9 +65,6 @@ fn main() -> Result<()> {
let spec = matches.get_one::<String>("spec");
let spec_path = matches.get_one::<String>("spec-path");
let compute_id = matches.get_one::<String>("compute-id");
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
// Try to use just 'postgres' if no path is provided
let pgbin = matches.get_one::<String>("pgbin").unwrap();
@@ -80,27 +77,8 @@ fn main() -> Result<()> {
let path = Path::new(sp);
let file = File::open(path)?;
serde_json::from_reader(file)?
} else if let Some(id) = compute_id {
if let Some(cp_base) = control_plane_uri {
let cp_uri = format!("{cp_base}/management/api/v1/{id}/spec");
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
Ok(v) => v,
Err(_) => "".to_string(),
};
reqwest::blocking::Client::new()
.get(cp_uri)
.header("Authorization", jwt)
.send()?
.json()?
} else {
panic!(
"must specify --control-plane-uri \"{:#?}\" and --compute-id \"{:#?}\"",
control_plane_uri, compute_id
);
}
} else {
panic!("compute spec should be provided via --spec or --spec-path argument");
panic!("cluster spec should be provided via --spec or --spec-path argument");
}
}
};
@@ -249,18 +227,6 @@ fn cli() -> clap::Command {
.long("spec-path")
.value_name("SPEC_PATH"),
)
.arg(
Arg::new("compute-id")
.short('i')
.long("compute-id")
.value_name("COMPUTE_ID"),
)
.arg(
Arg::new("control-plane-uri")
.short('p')
.long("control-plane-uri")
.value_name("CONTROL_PLANE"),
)
}
#[test]

View File

@@ -98,15 +98,6 @@ impl RelTag {
name
}
pub fn with_forknum(&self, forknum: u8) -> Self {
RelTag {
forknum,
spcnode: self.spcnode,
dbnode: self.dbnode,
relnode: self.relnode,
}
}
}
///

View File

@@ -39,7 +39,7 @@ pq_proto.workspace = true
workspace_hack.workspace = true
url.workspace = true
uuid = { version = "1.2", features = ["v4", "serde"] }
[dev-dependencies]
byteorder.workspace = true
bytes.workspace = true

View File

@@ -63,26 +63,8 @@ async fn prometheus_metrics_handler(_req: Request<Body>) -> Result<Response<Body
Ok(response)
}
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
) -> Middleware<B, ApiError> {
Middleware::pre(move |mut req| async move {
let headers = req.headers_mut();
let name = HeaderName::from_str("UUID").expect("created header name");
let request_id = uuid::Uuid::new_v4().to_string();
let value = HeaderValue::from_str(&request_id).unwrap();
headers.insert(name, value);
if req.method() == Method::GET {
tracing::debug!("{} {} {}", req.method(), req.uri().path(), request_id);
} else {
tracing::info!("{} {} {}", req.method(), req.uri().path(), request_id);
}
Ok(req)
})
}
pub fn make_router() -> RouterBuilder<hyper::Body, ApiError> {
Router::builder()
.middleware(add_request_id_middleware())
.middleware(Middleware::post_with_info(logger))
.get("/metrics", prometheus_metrics_handler)
.err_handler(error::handler)

View File

@@ -33,7 +33,6 @@ use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
use postgres_ffi::TransactionId;
use postgres_ffi::XLogFileName;
use postgres_ffi::PG_TLI;
@@ -191,31 +190,14 @@ where
{
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
// If full backup is requested, include all relation files.
// Otherwise only include init forks of unlogged relations.
let rels = self
.timeline
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
.await?;
for &rel in rels.iter() {
// Send init fork as main fork to provide well formed empty
// contents of UNLOGGED relations. Postgres copies it in
// `reinit.c` during recovery.
if rel.forknum == INIT_FORKNUM {
// I doubt we need _init fork itself, but having it at least
// serves as a marker relation is unlogged.
self.add_rel(rel, rel).await?;
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
continue;
}
if self.full_backup {
if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
{
// skip this, will include it when we reach the init fork
continue;
}
self.add_rel(rel, rel).await?;
// Gather and send relational files in each database if full backup is requested.
if self.full_backup {
for rel in self
.timeline
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
.await?
{
self.add_rel(rel).await?;
}
}
}
@@ -238,16 +220,15 @@ where
Ok(())
}
/// Add contents of relfilenode `src`, naming it as `dst`.
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
async fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
let nblocks = self
.timeline
.get_rel_size(src, self.lsn, false, self.ctx)
.get_rel_size(tag, self.lsn, false, self.ctx)
.await?;
// If the relation is empty, create an empty file
if nblocks == 0 {
let file_name = dst.to_segfile_name(0);
let file_name = tag.to_segfile_name(0);
let header = new_tar_header(&file_name, 0)?;
self.ar.append(&header, &mut io::empty()).await?;
return Ok(());
@@ -263,12 +244,12 @@ where
for blknum in startblk..endblk {
let img = self
.timeline
.get_rel_page_at_lsn(src, blknum, self.lsn, false, self.ctx)
.get_rel_page_at_lsn(tag, blknum, self.lsn, false, self.ctx)
.await?;
segment_data.extend_from_slice(&img[..]);
}
let file_name = dst.to_segfile_name(seg as u32);
let file_name = tag.to_segfile_name(seg as u32);
let header = new_tar_header(&file_name, segment_data.len() as u64)?;
self.ar.append(&header, segment_data.as_slice()).await?;

View File

@@ -1,14 +1,9 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use anyhow::{anyhow, Context, Result};
use futures::Future;
use hyper::body::HttpBody;
use hyper::header::HeaderName;
use hyper::http::HeaderValue;
use hyper::StatusCode;
use hyper::{Body, Request, Response, Uri};
use hyper::{Method, StatusCode};
use metrics::launch_timestamp::LaunchTimestamp;
use pageserver_api::models::DownloadRemoteLayersTaskSpawnRequest;
use remote_storage::GenericRemoteStorage;
@@ -335,7 +330,7 @@ async fn get_lsn_by_timestamp_handler(request: Request<Body>) -> Result<Response
let result = match result {
LsnForTimestamp::Present(lsn) => format!("{lsn}"),
LsnForTimestamp::Future(_lsn) => "future".into(),
LsnForTimestamp::Future(lsn) => format!("{lsn}"),
LsnForTimestamp::Past(_lsn) => "past".into(),
LsnForTimestamp::NoData(_lsn) => "nodata".into(),
};
@@ -1162,42 +1157,6 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id/layer/:layer_file_name",
evict_timeline_layer_handler,
)
.get(
"/v1/panic",
wrap_span("always_panic_handler", always_panic_handler),
)
.get("/v1/panic", always_panic_handler)
.any(handler_404))
}
fn wrap_span<H, R, B, E>(
handler_name: &'static str,
handler: H,
) -> impl Fn(Request<hyper::Body>) -> R
where
H: Fn(Request<hyper::Body>) -> R + Send + Sync + 'static,
B: HttpBody + Send + Sync + 'static,
E: Into<Box<dyn std::error::Error + Send + Sync>>,
R: Future<Output = Result<Response<B>, E>> + Send + 'static,
{
move |r| -> R {
async {
let headers = r.headers_mut();
let name = HeaderName::from_str("UUID").expect("created header name");
let request_id = "foo";
let value = HeaderValue::from_str(&request_id).unwrap();
headers.insert(name, value);
if r.method() == Method::GET {
tracing::debug!("{} {} {}", r.method(), r.uri().path(), request_id);
} else {
tracing::info!("{} {} {}", r.method(), r.uri().path(), request_id);
}
handler(r)
.instrument(info_span!(
"request",
handler = handler_name,
request_id = request_id
))
.await
}
}
}

View File

@@ -290,6 +290,7 @@ impl Timeline {
}
}
///
/// Locate LSN, such that all transactions that committed before
/// 'search_timestamp' are visible, but nothing newer is.
///
@@ -303,7 +304,11 @@ impl Timeline {
ctx: &RequestContext,
) -> Result<LsnForTimestamp, PageReconstructError> {
let gc_cutoff_lsn_guard = self.get_latest_gc_cutoff_lsn();
let min_lsn = *gc_cutoff_lsn_guard;
// We use this method to figure out the branching LSN for new branch, but
// GC cutoff could be before branching point and we cannot create new branch
// with LSN < `ancestor_lsn`. Thus, pick the maximum of these two just to be
// on the safe side.
let min_lsn = std::cmp::max(*gc_cutoff_lsn_guard, self.get_ancestor_lsn());
let max_lsn = self.get_last_record_lsn();
// LSNs are always 8-byte aligned. low/mid/high represent the
@@ -327,12 +332,22 @@ impl Timeline {
)
.await?;
if cmp {
high = mid;
} else {
if !cmp {
// We either 1) found commit with timestamp **before** `search_timestamp`;
// or 2) we haven't found any commit records at all.
// Search with a larger LSN, in case 1) to try to find a more recent commit
// (but still **before** target timestamp); and in case 2) to fetch more
// SLRU segments for `clog`.
low = mid + 1;
} else {
// We found only more recent commits, search in the older range.
high = mid;
}
}
// If `found_smaller == true`, `low` is the LSN of the first commit record
// **before** the `search_timestamp` + 1 (to hit the while loop exit condition).
// Substitute 1 to get exactly the commit LSN.
let commit_lsn = Lsn((low - 1) * 8);
match (found_smaller, found_larger) {
(false, false) => {
// This can happen if no commit records have been processed yet, e.g.
@@ -340,32 +355,26 @@ impl Timeline {
Ok(LsnForTimestamp::NoData(max_lsn))
}
(true, false) => {
// Didn't find any commit timestamps larger than the request
Ok(LsnForTimestamp::Future(max_lsn))
// Only found a commit with timestamp smaller than the request.
// It's still a valid case for branch creation, return it.
// And `update_gc_info()` ignores LSN for a `LsnForTimestamp::Future`
// case, anyway.
Ok(LsnForTimestamp::Future(commit_lsn))
}
(false, true) => {
// Didn't find any commit timestamps smaller than the request
Ok(LsnForTimestamp::Past(max_lsn))
}
(true, true) => {
// low is the LSN of the first commit record *after* the search_timestamp,
// Back off by one to get to the point just before the commit.
//
// FIXME: it would be better to get the LSN of the previous commit.
// Otherwise, if you restore to the returned LSN, the database will
// include physical changes from later commits that will be marked
// as aborted, and will need to be vacuumed away.
Ok(LsnForTimestamp::Present(Lsn((low - 1) * 8)))
}
(true, true) => Ok(LsnForTimestamp::Present(commit_lsn)),
}
}
///
/// Subroutine of find_lsn_for_timestamp(). Returns true, if there are any
/// commits that committed after 'search_timestamp', at LSN 'probe_lsn'.
/// Subroutine of `find_lsn_for_timestamp()`. Returns `true`, if there are any
/// commits that committed after `search_timestamp`, at LSN `probe_lsn`.
///
/// Additionally, sets 'found_smaller'/'found_Larger, if encounters any commits
/// with a smaller/larger timestamp.
/// Additionally, sets `found_smaller` / `found_larger`, if encounters any commits
/// with a smaller / larger timestamp.
///
pub async fn is_latest_commit_timestamp_ge_than(
&self,

View File

@@ -37,7 +37,7 @@ use crate::walrecord::*;
use crate::ZERO_PAGE;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::CheckPoint;
@@ -762,7 +762,7 @@ impl<'a> WalIngest<'a> {
)?;
for xnode in &parsed.xnodes {
for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
for forknum in MAIN_FORKNUM..=VISIBILITYMAP_FORKNUM {
let rel = RelTag {
forknum,
spcnode: xnode.spcnode,

View File

@@ -1669,7 +1669,7 @@ class AbstractNeonCli(abc.ABC):
timeout=timeout,
)
if not res.returncode:
log.info(f"Run {res.args} success: {res.stdout}")
log.info(f"Run success: {res.stdout}")
elif check_return_code:
# this way command output will be in recorded and shown in CI in failure message
msg = f"""\
@@ -3463,14 +3463,6 @@ def wait_for_last_flush_lsn(
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
def wait_for_wal_insert_lsn(
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
) -> Lsn:
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
def fork_at_current_lsn(
env: NeonEnv,
pg: Postgres,

View File

@@ -17,7 +17,9 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
cur = pgmain.connect().cursor()
# Create table, and insert rows, each in a separate transaction
# Disable synchronous_commit to make this initialization go faster.
# Disable `synchronous_commit`` to make this initialization go faster.
# XXX: on my laptop this test takes 7s, and setting `synchronous_commit=off`
# doesn't change anything.
#
# Each row contains current insert LSN and the current timestamp, when
# the row was inserted.
@@ -32,20 +34,23 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Execute one more transaction with synchronous_commit enabled, to flush
# all the previous transactions
cur.execute("SET synchronous_commit=on")
cur.execute("INSERT INTO foo VALUES (-1)")
# Wait until WAL is received by pageserver
wait_for_last_flush_lsn(env, pgmain, env.initial_tenant, new_timeline_id)
with env.pageserver.http_client() as client:
# Check edge cases: timestamp in the future
# Check edge cases
# Timestamp is in the future
probe_timestamp = tbl[-1][1] + timedelta(hours=1)
result = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
assert result == "future"
# We should still return LSN of the first commit before timestamp
assert result not in ["past", "nodata"]
# timestamp too the far history
# Timestamp is in the unreachable past
probe_timestamp = tbl[0][1] - timedelta(hours=10)
result = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
@@ -55,10 +60,12 @@ def test_lsn_mapping(neon_env_builder: NeonEnvBuilder):
# Probe a bunch of timestamps in the valid range
for i in range(1, len(tbl), 100):
probe_timestamp = tbl[i][1]
# Call get_lsn_by_timestamp to get the LSN
lsn = client.timeline_get_lsn_by_timestamp(
env.initial_tenant, new_timeline_id, f"{probe_timestamp.isoformat()}Z"
)
# Call get_lsn_by_timestamp to get the LSN
assert lsn not in ["past", "nodata"]
# Launch a new read-only node at that LSN, and check that only the rows
# that were supposed to be committed at that point in time are visible.
pg_here = env.postgres.create_start(

View File

@@ -3,15 +3,8 @@ from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
PageserverHttpClient,
Postgres,
wait_for_last_flush_lsn,
wait_for_wal_insert_lsn,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn
from fixtures.types import Lsn
def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
@@ -331,7 +324,7 @@ def test_single_branch_get_tenant_size_grows(
# inserts is larger than gc_horizon. for example 0x20000 here hid the fact
# that there next_gc_cutoff could be smaller than initdb_lsn, which will
# obviously lead to issues when calculating the size.
gc_horizon = 0x38000
gc_horizon = 0x30000
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
env = neon_env_builder.init_start()
@@ -341,75 +334,29 @@ def test_single_branch_get_tenant_size_grows(
http_client = env.pageserver.http_client()
collected_responses: List[Tuple[str, Lsn, int]] = []
collected_responses: List[Tuple[Lsn, int]] = []
size_debug_file = open(test_output_dir / "size_debug.html", "w")
def check_size_change(
current_lsn: Lsn, initdb_lsn: Lsn, gc_horizon: int, size: int, prev_size: int
):
if current_lsn - initdb_lsn >= gc_horizon:
def check_size_change(current_lsn: Lsn, initdb_lsn: Lsn, gc_horizon: int, size: int, prev: int):
if current_lsn - initdb_lsn > gc_horizon:
assert (
size >= prev_size
size >= prev
), "tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size"
else:
assert (
size > prev_size
size > prev
), "tenant_size should grow, because we continue to add WAL to initial snapshot size"
def get_current_consistent_size(
env: NeonEnv,
pg: Postgres,
size_debug_file, # apparently there is no public signature for open()...
http_client: PageserverHttpClient,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Tuple[Lsn, int]:
consistent = False
size_debug = None
current_lsn = wait_for_wal_insert_lsn(env, pg, tenant_id, timeline_id)
# We want to make sure we have a self-consistent set of values.
# Size changes with WAL, so only if both before and after getting
# the size of the tenant reports the same WAL insert LSN, we're OK
# to use that (size, LSN) combination.
# Note that 'wait_for_wal_flush_lsn' is not accurate enough: There
# can be more wal after the flush LSN that can arrive on the
# pageserver before we're requesting the page size.
# Anyway, in general this is only one iteration, so in general
# this is fine.
while not consistent:
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
size_debug = http_client.tenant_size_debug(tenant_id)
after_lsn = wait_for_wal_insert_lsn(env, pg, tenant_id, timeline_id)
consistent = current_lsn == after_lsn
current_lsn = after_lsn
size_debug_file.write(size_debug)
return (current_lsn, size)
with env.postgres.create_start(
branch_name,
tenant_id=tenant_id,
### autovacuum is disabled to limit WAL logging.
config_lines=["autovacuum=off"],
) as pg:
(initdb_lsn, size) = get_current_consistent_size(
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
collected_responses.append(("INITDB", initdb_lsn, size))
with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg:
initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
with pg.cursor() as cur:
cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL) WITH (fillfactor = 40)")
(current_lsn, size) = get_current_consistent_size(
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
collected_responses.append(("CREATE", current_lsn, size))
cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL)")
batch_size = 100
for i in range(3):
i = 0
while True:
with pg.cursor() as cur:
cur.execute(
f"INSERT INTO t0(i) SELECT i FROM generate_series({batch_size} * %s, ({batch_size} * (%s + 1)) - 1) s(i)",
@@ -418,24 +365,27 @@ def test_single_branch_get_tenant_size_grows(
i += 1
(current_lsn, size) = get_current_consistent_size(
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
prev_size = collected_responses[-1][2]
if size == 0:
assert prev_size == 0
else:
# branch start shouldn't be past gc_horizon yet
# thus the size should grow as we insert more data
# "gc_horizon" is tuned so that it kicks in _after_ the
# insert phase, but before the update phase ends.
assert (
current_lsn - initdb_lsn <= gc_horizon
), "Tuning of GC window is likely out-of-date"
assert size > prev_size
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
collected_responses.append(("INSERT", current_lsn, size))
size_debug = http_client.tenant_size_debug(tenant_id)
size_debug_file.write(size_debug)
if len(collected_responses) > 0:
prev = collected_responses[-1][1]
if size == 0:
assert prev == 0
else:
# branch start shouldn't be past gc_horizon yet
# thus the size should grow as we insert more data
assert current_lsn - initdb_lsn <= gc_horizon
assert size > prev
collected_responses.append((current_lsn, size))
if len(collected_responses) > 2:
break
while True:
with pg.cursor() as cur:
@@ -447,15 +397,18 @@ def test_single_branch_get_tenant_size_grows(
if updated == 0:
break
(current_lsn, size) = get_current_consistent_size(
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
prev_size = collected_responses[-1][2]
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
size_debug = http_client.tenant_size_debug(tenant_id)
size_debug_file.write(size_debug)
collected_responses.append(("UPDATE", current_lsn, size))
prev = collected_responses[-1][1]
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
collected_responses.append((current_lsn, size))
while True:
with pg.cursor() as cur:
@@ -465,47 +418,40 @@ def test_single_branch_get_tenant_size_grows(
if deleted == 0:
break
(current_lsn, size) = get_current_consistent_size(
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
prev_size = collected_responses[-1][2]
size = http_client.tenant_size(tenant_id)
prev = collected_responses[-1][1]
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
collected_responses.append(("DELETE", current_lsn, size))
collected_responses.append((current_lsn, size))
with pg.cursor() as cur:
cur.execute("DROP TABLE t0")
# The size of the tenant should still be as large as before we dropped
# the table, because the drop operation can still be undone in the PITR
# defined by gc_horizon.
(current_lsn, size) = get_current_consistent_size(
env, pg, size_debug_file, http_client, tenant_id, timeline_id
)
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
prev_size = collected_responses[-1][2]
size = http_client.tenant_size(tenant_id)
prev = collected_responses[-1][1]
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
collected_responses.append(("DROP", current_lsn, size))
collected_responses.append((current_lsn, size))
# this isn't too many lines to forget for a while. observed while
# developing these tests that locally the value is a bit more than what we
# get in the ci.
for phase, lsn, size in collected_responses:
log.info(f"collected: {phase}, {lsn}, {size}")
for lsn, size in collected_responses:
log.info(f"collected: {lsn}, {size}")
env.pageserver.stop()
env.pageserver.start()
size_after = http_client.tenant_size(tenant_id)
size_debug = http_client.tenant_size_debug(tenant_id)
size_debug_file.write(size_debug)
size_debug_file.close()
prev = collected_responses[-1][2]
size_after = http_client.tenant_size(tenant_id)
prev = collected_responses[-1][1]
assert size_after == prev, "size after restarting pageserver should not have changed"

View File

@@ -1,34 +0,0 @@
from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
#
# Test UNLOGGED tables/relations. Postgres copies init fork contents to main
# fork to reset them during recovery. In Neon, pageserver directly sends init
# fork contents as main fork during basebackup.
#
def test_unlogged(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_unlogged", "empty")
pg = env.postgres.create_start("test_unlogged")
conn = pg.connect()
cur = conn.cursor()
cur.execute("CREATE UNLOGGED TABLE iut (id int);")
# create index to test unlogged index relation as well
cur.execute("CREATE UNIQUE INDEX iut_idx ON iut (id);")
cur.execute("INSERT INTO iut values (42);")
# create another compute to fetch inital empty contents from pageserver
fork_at_current_lsn(env, pg, "test_unlogged_basebackup", "test_unlogged")
pg2 = env.postgres.create_start(
"test_unlogged_basebackup",
)
conn2 = pg2.connect()
cur2 = conn2.cursor()
# after restart table should be empty but valid
cur2.execute("PREPARE iut_plan (int) AS INSERT INTO iut VALUES ($1)")
cur2.execute("EXECUTE iut_plan (43);")
cur2.execute("SELECT * FROM iut")
assert cur2.fetchall() == [(43,)]