mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-18 13:40:37 +00:00
Compare commits
11 Commits
alexk/get_
...
improve_sy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b8c2410b28 | ||
|
|
1360361f60 | ||
|
|
000eb1b069 | ||
|
|
f51b48fa49 | ||
|
|
9f906ff236 | ||
|
|
c79dd8d458 | ||
|
|
ec4ecdd543 | ||
|
|
20a4d817ce | ||
|
|
5ebf7e5619 | ||
|
|
0692fffbf3 | ||
|
|
093570af20 |
2
.github/ansible/prod.us-east-2.hosts.yaml
vendored
2
.github/ansible/prod.us-east-2.hosts.yaml
vendored
@@ -27,6 +27,8 @@ storage:
|
||||
ansible_host: i-062227ba7f119eb8c
|
||||
pageserver-1.us-east-2.aws.neon.tech:
|
||||
ansible_host: i-0b3ec0afab5968938
|
||||
pageserver-2.us-east-2.aws.neon.tech:
|
||||
ansible_host: i-0d7a1c4325e71421d
|
||||
|
||||
safekeepers:
|
||||
hosts:
|
||||
|
||||
19
Cargo.lock
generated
19
Cargo.lock
generated
@@ -854,6 +854,7 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"postgres",
|
||||
"regex",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tar",
|
||||
@@ -3066,15 +3067,6 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "remove_dir_all"
|
||||
version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
|
||||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reqwest"
|
||||
version = "0.11.14"
|
||||
@@ -3848,16 +3840,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tempfile"
|
||||
version = "3.3.0"
|
||||
version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4"
|
||||
checksum = "af18f7ae1acd354b992402e9ec5864359d693cd8a79dcbef59f76891701c1e95"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"fastrand",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"remove_dir_all",
|
||||
"winapi",
|
||||
"rustix",
|
||||
"windows-sys 0.42.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -150,7 +150,7 @@ workspace_hack = { version = "0.1", path = "./workspace_hack/" }
|
||||
criterion = "0.4"
|
||||
rcgen = "0.10"
|
||||
rstest = "0.16"
|
||||
tempfile = "3.2"
|
||||
tempfile = "3.4"
|
||||
tonic-build = "0.8"
|
||||
|
||||
# This is only needed for proxy's tests.
|
||||
|
||||
@@ -32,11 +32,15 @@ RUN cd postgres && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/interfaces/libpq install && \
|
||||
# Enable some of contrib extensions
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/autoinc.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/bloom.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/intagg.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/insert_username.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/intagg.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/moddatetime.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control
|
||||
|
||||
#########################################################################################
|
||||
@@ -206,6 +210,21 @@ RUN wget https://github.com/HypoPG/hypopg/archive/refs/tags/1.3.1.tar.gz -O hypo
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/hypopg.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-hashids-pg-build"
|
||||
# compile pg_hashids extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-hashids-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
|
||||
mkdir pg_hashids-src && cd pg_hashids-src && tar xvzf ../pg_hashids.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_hashids.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions"
|
||||
@@ -246,6 +265,8 @@ FROM rust-extensions-build AS pg-jsonschema-pg-build
|
||||
RUN git clone --depth=1 --single-branch --branch neon_abi_v0.1.4 https://github.com/vadim2404/pg_jsonschema/ && \
|
||||
cd pg_jsonschema && \
|
||||
cargo pgx install --release && \
|
||||
# it's needed to enable extension because it uses untrusted C language
|
||||
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_jsonschema.control && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
|
||||
|
||||
#########################################################################################
|
||||
@@ -260,6 +281,8 @@ FROM rust-extensions-build AS pg-graphql-pg-build
|
||||
RUN git clone --depth=1 --single-branch --branch neon_abi_v1.1.0 https://github.com/vadim2404/pg_graphql && \
|
||||
cd pg_graphql && \
|
||||
cargo pgx install --release && \
|
||||
# it's needed to enable extension because it uses untrusted C language
|
||||
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_graphql.control && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_graphql.control
|
||||
|
||||
#########################################################################################
|
||||
@@ -280,6 +303,7 @@ COPY --from=pgjwt-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-jsonschema-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-graphql-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=hypopg-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-hashids-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
|
||||
@@ -10,7 +10,6 @@ RUN set -e \
|
||||
&& rm -f /etc/inittab \
|
||||
&& touch /etc/inittab
|
||||
|
||||
ADD vm-cgconfig.conf /etc/cgconfig.conf
|
||||
RUN set -e \
|
||||
&& echo "::sysinit:cgconfigparser -l /etc/cgconfig.conf -s 1664" >> /etc/inittab \
|
||||
&& echo "::respawn:su vm-informant -c '/usr/local/bin/vm-informant --auto-restart --cgroup=neon-postgres'" >> /etc/inittab
|
||||
@@ -26,6 +25,7 @@ RUN apt update && \
|
||||
RUN adduser vm-informant --disabled-password --no-create-home
|
||||
USER postgres
|
||||
|
||||
ADD vm-cgconfig.conf /etc/cgconfig.conf
|
||||
COPY --from=informant /etc/inittab /etc/inittab
|
||||
COPY --from=informant /usr/bin/vm-informant /usr/local/bin/vm-informant
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ regex.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tar.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tokio-postgres.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -65,6 +65,9 @@ fn main() -> Result<()> {
|
||||
let spec = matches.get_one::<String>("spec");
|
||||
let spec_path = matches.get_one::<String>("spec-path");
|
||||
|
||||
let compute_id = matches.get_one::<String>("compute-id");
|
||||
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
|
||||
|
||||
// Try to use just 'postgres' if no path is provided
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap();
|
||||
|
||||
@@ -77,8 +80,27 @@ fn main() -> Result<()> {
|
||||
let path = Path::new(sp);
|
||||
let file = File::open(path)?;
|
||||
serde_json::from_reader(file)?
|
||||
} else if let Some(id) = compute_id {
|
||||
if let Some(cp_base) = control_plane_uri {
|
||||
let cp_uri = format!("{cp_base}/management/api/v1/{id}/spec");
|
||||
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
|
||||
Ok(v) => v,
|
||||
Err(_) => "".to_string(),
|
||||
};
|
||||
|
||||
reqwest::blocking::Client::new()
|
||||
.get(cp_uri)
|
||||
.header("Authorization", jwt)
|
||||
.send()?
|
||||
.json()?
|
||||
} else {
|
||||
panic!(
|
||||
"must specify --control-plane-uri \"{:#?}\" and --compute-id \"{:#?}\"",
|
||||
control_plane_uri, compute_id
|
||||
);
|
||||
}
|
||||
} else {
|
||||
panic!("cluster spec should be provided via --spec or --spec-path argument");
|
||||
panic!("compute spec should be provided via --spec or --spec-path argument");
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -227,6 +249,18 @@ fn cli() -> clap::Command {
|
||||
.long("spec-path")
|
||||
.value_name("SPEC_PATH"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("compute-id")
|
||||
.short('i')
|
||||
.long("compute-id")
|
||||
.value_name("COMPUTE_ID"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("control-plane-uri")
|
||||
.short('p')
|
||||
.long("control-plane-uri")
|
||||
.value_name("CONTROL_PLANE"),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -98,6 +98,15 @@ impl RelTag {
|
||||
|
||||
name
|
||||
}
|
||||
|
||||
pub fn with_forknum(&self, forknum: u8) -> Self {
|
||||
RelTag {
|
||||
forknum,
|
||||
spcnode: self.spcnode,
|
||||
dbnode: self.dbnode,
|
||||
relnode: self.relnode,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
|
||||
@@ -37,10 +37,15 @@ pub struct Segment {
|
||||
/// LSN at this point
|
||||
pub lsn: u64,
|
||||
|
||||
pub parent_lsn: Option<u64>,
|
||||
|
||||
pub wal_from_parent: Option<u64>,
|
||||
|
||||
/// Logical size at this node, if known.
|
||||
pub size: Option<u64>,
|
||||
|
||||
/// If true, the segment from parent to this node is needed by `retention_period`
|
||||
/// if wal_from_parent is needed by `retention_period`?
|
||||
pub needed: bool,
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,8 @@ impl ScenarioBuilder {
|
||||
let init_segment = Segment {
|
||||
parent: None,
|
||||
lsn: 0,
|
||||
parent_lsn: None,
|
||||
wal_from_parent: None,
|
||||
size: Some(0),
|
||||
needed: false, // determined later
|
||||
};
|
||||
@@ -36,6 +38,8 @@ impl ScenarioBuilder {
|
||||
let newseg = Segment {
|
||||
parent: Some(lastseg_id),
|
||||
lsn: lastseg.lsn + lsn_bytes,
|
||||
parent_lsn: Some(lastseg.lsn),
|
||||
wal_from_parent: Some(lsn_bytes),
|
||||
size: Some((lastseg.size.unwrap() as i64 + size_bytes) as u64),
|
||||
needed: false,
|
||||
};
|
||||
|
||||
@@ -33,6 +33,7 @@ use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
|
||||
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
|
||||
use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
|
||||
use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
|
||||
use postgres_ffi::TransactionId;
|
||||
use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::PG_TLI;
|
||||
@@ -190,14 +191,31 @@ where
|
||||
{
|
||||
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
|
||||
|
||||
// Gather and send relational files in each database if full backup is requested.
|
||||
if self.full_backup {
|
||||
for rel in self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
self.add_rel(rel).await?;
|
||||
// If full backup is requested, include all relation files.
|
||||
// Otherwise only include init forks of unlogged relations.
|
||||
let rels = self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?;
|
||||
for &rel in rels.iter() {
|
||||
// Send init fork as main fork to provide well formed empty
|
||||
// contents of UNLOGGED relations. Postgres copies it in
|
||||
// `reinit.c` during recovery.
|
||||
if rel.forknum == INIT_FORKNUM {
|
||||
// I doubt we need _init fork itself, but having it at least
|
||||
// serves as a marker relation is unlogged.
|
||||
self.add_rel(rel, rel).await?;
|
||||
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
if self.full_backup {
|
||||
if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
|
||||
{
|
||||
// skip this, will include it when we reach the init fork
|
||||
continue;
|
||||
}
|
||||
self.add_rel(rel, rel).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -220,15 +238,16 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
|
||||
/// Add contents of relfilenode `src`, naming it as `dst`.
|
||||
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
|
||||
let nblocks = self
|
||||
.timeline
|
||||
.get_rel_size(tag, self.lsn, false, self.ctx)
|
||||
.get_rel_size(src, self.lsn, false, self.ctx)
|
||||
.await?;
|
||||
|
||||
// If the relation is empty, create an empty file
|
||||
if nblocks == 0 {
|
||||
let file_name = tag.to_segfile_name(0);
|
||||
let file_name = dst.to_segfile_name(0);
|
||||
let header = new_tar_header(&file_name, 0)?;
|
||||
self.ar.append(&header, &mut io::empty()).await?;
|
||||
return Ok(());
|
||||
@@ -244,12 +263,12 @@ where
|
||||
for blknum in startblk..endblk {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_rel_page_at_lsn(tag, blknum, self.lsn, false, self.ctx)
|
||||
.get_rel_page_at_lsn(src, blknum, self.lsn, false, self.ctx)
|
||||
.await?;
|
||||
segment_data.extend_from_slice(&img[..]);
|
||||
}
|
||||
|
||||
let file_name = tag.to_segfile_name(seg as u32);
|
||||
let file_name = dst.to_segfile_name(seg as u32);
|
||||
let header = new_tar_header(&file_name, segment_data.len() as u64)?;
|
||||
self.ar.append(&header, segment_data.as_slice()).await?;
|
||||
|
||||
|
||||
@@ -85,23 +85,16 @@ pub struct TimelineInputs {
|
||||
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
|
||||
pub ancestor_id: Option<TimelineId>,
|
||||
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
ancestor_lsn: Lsn,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
last_record: Lsn,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
latest_gc_cutoff: Lsn,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
horizon_cutoff: Lsn,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
pitr_cutoff: Lsn,
|
||||
|
||||
/// Cutoff point based on GC settings
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
next_gc_cutoff: Lsn,
|
||||
|
||||
/// Cutoff point calculated from the user-supplied 'max_retention_period'
|
||||
#[serde_as(as = "Option<serde_with::DisplayFromStr>")]
|
||||
retention_param_cutoff: Option<Lsn>,
|
||||
}
|
||||
|
||||
@@ -252,6 +245,8 @@ pub(super) async fn gather_inputs(
|
||||
segment: Segment {
|
||||
parent: None, // filled in later
|
||||
lsn: branch_start_lsn.0,
|
||||
parent_lsn: None,
|
||||
wal_from_parent: None,
|
||||
size: None, // filled in later
|
||||
needed: branch_start_needed,
|
||||
},
|
||||
@@ -270,6 +265,8 @@ pub(super) async fn gather_inputs(
|
||||
segment: Segment {
|
||||
parent: Some(parent),
|
||||
lsn: lsn.0,
|
||||
parent_lsn: Some(segments[parent].segment.lsn),
|
||||
wal_from_parent: Some(lsn.0 - segments[parent].segment.lsn),
|
||||
size: None,
|
||||
needed: lsn > next_gc_cutoff,
|
||||
},
|
||||
@@ -284,6 +281,8 @@ pub(super) async fn gather_inputs(
|
||||
segment: Segment {
|
||||
parent: Some(parent),
|
||||
lsn: last_record_lsn.0,
|
||||
parent_lsn: Some(segments[parent].segment.lsn),
|
||||
wal_from_parent: Some(last_record_lsn.0 - segments[parent].segment.lsn),
|
||||
size: None, // Filled in later, if necessary
|
||||
needed: true,
|
||||
},
|
||||
@@ -613,32 +612,32 @@ fn verify_size_for_multiple_branches() {
|
||||
"timeline_inputs": [
|
||||
{
|
||||
"timeline_id": "20b129c9b50cff7213e6503a31b2a5ce",
|
||||
"ancestor_lsn": "0/18D3D98",
|
||||
"last_record": "0/2230CD0",
|
||||
"latest_gc_cutoff": "0/1698C48",
|
||||
"horizon_cutoff": "0/2210CD0",
|
||||
"pitr_cutoff": "0/2210CD0",
|
||||
"next_gc_cutoff": "0/2210CD0",
|
||||
"ancestor_lsn": 26033560,
|
||||
"last_record": 35851472,
|
||||
"latest_gc_cutoff": 23694408,
|
||||
"horizon_cutoff": 35720400,
|
||||
"pitr_cutoff": 35720400,
|
||||
"next_gc_cutoff": 35720400,
|
||||
"retention_param_cutoff": null
|
||||
},
|
||||
{
|
||||
"timeline_id": "454626700469f0a9914949b9d018e876",
|
||||
"ancestor_lsn": "0/176D998",
|
||||
"last_record": "0/1837770",
|
||||
"latest_gc_cutoff": "0/1698C48",
|
||||
"horizon_cutoff": "0/1817770",
|
||||
"pitr_cutoff": "0/1817770",
|
||||
"next_gc_cutoff": "0/1817770",
|
||||
"ancestor_lsn": 24566168,
|
||||
"last_record": 25393008,
|
||||
"latest_gc_cutoff": 23694408,
|
||||
"horizon_cutoff": 25261936,
|
||||
"pitr_cutoff": 25261936,
|
||||
"next_gc_cutoff": 25261936,
|
||||
"retention_param_cutoff": null
|
||||
},
|
||||
{
|
||||
"timeline_id": "cb5e3cbe60a4afc00d01880e1a37047f",
|
||||
"ancestor_lsn": "0/0",
|
||||
"last_record": "0/18D3D98",
|
||||
"latest_gc_cutoff": "0/1698C48",
|
||||
"horizon_cutoff": "0/18B3D98",
|
||||
"pitr_cutoff": "0/18B3D98",
|
||||
"next_gc_cutoff": "0/18B3D98",
|
||||
"ancestor_lsn": 0,
|
||||
"last_record": 26033560,
|
||||
"latest_gc_cutoff": 23694408,
|
||||
"horizon_cutoff":25902488,
|
||||
"pitr_cutoff":25902488,
|
||||
"next_gc_cutoff":25902488,
|
||||
"retention_param_cutoff": null
|
||||
}
|
||||
]
|
||||
@@ -688,13 +687,13 @@ fn verify_size_for_one_branch() {
|
||||
"timeline_inputs": [
|
||||
{
|
||||
"timeline_id": "f15ae0cf21cce2ba27e4d80c6709a6cd",
|
||||
"ancestor_lsn": "0/0",
|
||||
"last_record": "47/280A5860",
|
||||
"latest_gc_cutoff": "47/240A5860",
|
||||
"horizon_cutoff": "47/240A5860",
|
||||
"pitr_cutoff": "47/240A5860",
|
||||
"next_gc_cutoff": "47/240A5860",
|
||||
"retention_param_cutoff": "0/0"
|
||||
"ancestor_lsn": 0,
|
||||
"last_record": 305547335776,
|
||||
"latest_gc_cutoff": 305614444640,
|
||||
"horizon_cutoff": 305614444640,
|
||||
"pitr_cutoff": 305614444640,
|
||||
"next_gc_cutoff": 305614444640,
|
||||
"retention_param_cutoff": 0
|
||||
}
|
||||
]
|
||||
}"#;
|
||||
@@ -707,12 +706,15 @@ fn verify_size_for_one_branch() {
|
||||
println!("result: {:?}", serde_json::to_string(&res.segments));
|
||||
|
||||
use utils::lsn::Lsn;
|
||||
let latest_gc_cutoff_lsn: Lsn = "47/240A5860".parse().unwrap();
|
||||
let last_lsn: Lsn = "47/280A5860".parse().unwrap();
|
||||
let latest_gc_cutoff_lsn: Lsn = Lsn(305614444640);
|
||||
let last_lsn: Lsn = Lsn(305547335776);
|
||||
println!(
|
||||
"latest_gc_cutoff lsn 47/240A5860 is {}, last_lsn lsn 47/280A5860 is {}",
|
||||
"latest_gc_cutoff lsn {} is {}, last_lsn lsn {} is {}",
|
||||
latest_gc_cutoff_lsn,
|
||||
u64::from(latest_gc_cutoff_lsn),
|
||||
last_lsn,
|
||||
u64::from(last_lsn)
|
||||
);
|
||||
|
||||
assert_eq!(res.total_size, 220121784320);
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ use crate::walrecord::*;
|
||||
use crate::ZERO_PAGE;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
|
||||
use postgres_ffi::v14::xlog_utils::*;
|
||||
use postgres_ffi::v14::CheckPoint;
|
||||
@@ -762,7 +762,7 @@ impl<'a> WalIngest<'a> {
|
||||
)?;
|
||||
|
||||
for xnode in &parsed.xnodes {
|
||||
for forknum in MAIN_FORKNUM..=VISIBILITYMAP_FORKNUM {
|
||||
for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
|
||||
let rel = RelTag {
|
||||
forknum,
|
||||
spcnode: xnode.spcnode,
|
||||
|
||||
@@ -1669,7 +1669,7 @@ class AbstractNeonCli(abc.ABC):
|
||||
timeout=timeout,
|
||||
)
|
||||
if not res.returncode:
|
||||
log.info(f"Run success: {res.stdout}")
|
||||
log.info(f"Run {res.args} success: {res.stdout}")
|
||||
elif check_return_code:
|
||||
# this way command output will be in recorded and shown in CI in failure message
|
||||
msg = f"""\
|
||||
@@ -3463,6 +3463,14 @@ def wait_for_last_flush_lsn(
|
||||
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
|
||||
|
||||
|
||||
def wait_for_wal_insert_lsn(
|
||||
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
|
||||
) -> Lsn:
|
||||
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
|
||||
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
|
||||
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
|
||||
|
||||
|
||||
def fork_at_current_lsn(
|
||||
env: NeonEnv,
|
||||
pg: Postgres,
|
||||
|
||||
@@ -3,8 +3,15 @@ from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PageserverHttpClient,
|
||||
Postgres,
|
||||
wait_for_last_flush_lsn,
|
||||
wait_for_wal_insert_lsn,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
@@ -45,12 +52,26 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
expected_inputs = {
|
||||
"segments": [
|
||||
{
|
||||
"segment": {"parent": None, "lsn": 23694408, "size": 25362432, "needed": True},
|
||||
"segment": {
|
||||
"parent": None,
|
||||
"lsn": 23694408,
|
||||
"parent_lsn": None,
|
||||
"wal_from_parent": None,
|
||||
"size": 25362432,
|
||||
"needed": True,
|
||||
},
|
||||
"timeline_id": f"{main_timeline_id}",
|
||||
"kind": "BranchStart",
|
||||
},
|
||||
{
|
||||
"segment": {"parent": 0, "lsn": 23694528, "size": None, "needed": True},
|
||||
"segment": {
|
||||
"parent": 0,
|
||||
"lsn": 23694528,
|
||||
"parent_lsn": 23694408,
|
||||
"wal_from_parent": 120,
|
||||
"size": None,
|
||||
"needed": True,
|
||||
},
|
||||
"timeline_id": f"{main_timeline_id}",
|
||||
"kind": "BranchEnd",
|
||||
},
|
||||
@@ -59,16 +80,17 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
{
|
||||
"timeline_id": f"{main_timeline_id}",
|
||||
"ancestor_id": None,
|
||||
"ancestor_lsn": "0/0",
|
||||
"last_record": "0/1698CC0",
|
||||
"latest_gc_cutoff": "0/1698C48",
|
||||
"horizon_cutoff": "0/0",
|
||||
"pitr_cutoff": "0/0",
|
||||
"next_gc_cutoff": "0/0",
|
||||
"ancestor_lsn": 0,
|
||||
"last_record": 23694528,
|
||||
"latest_gc_cutoff": 23694408,
|
||||
"horizon_cutoff": 0,
|
||||
"pitr_cutoff": 0,
|
||||
"next_gc_cutoff": 0,
|
||||
"retention_param_cutoff": None,
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
expected_inputs = mask_model_inputs(expected_inputs)
|
||||
actual_inputs = mask_model_inputs(inputs)
|
||||
|
||||
@@ -324,7 +346,7 @@ def test_single_branch_get_tenant_size_grows(
|
||||
# inserts is larger than gc_horizon. for example 0x20000 here hid the fact
|
||||
# that there next_gc_cutoff could be smaller than initdb_lsn, which will
|
||||
# obviously lead to issues when calculating the size.
|
||||
gc_horizon = 0x30000
|
||||
gc_horizon = 0x38000
|
||||
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -334,29 +356,75 @@ def test_single_branch_get_tenant_size_grows(
|
||||
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
collected_responses: List[Tuple[Lsn, int]] = []
|
||||
collected_responses: List[Tuple[str, Lsn, int]] = []
|
||||
|
||||
size_debug_file = open(test_output_dir / "size_debug.html", "w")
|
||||
|
||||
def check_size_change(current_lsn: Lsn, initdb_lsn: Lsn, gc_horizon: int, size: int, prev: int):
|
||||
if current_lsn - initdb_lsn > gc_horizon:
|
||||
def check_size_change(
|
||||
current_lsn: Lsn, initdb_lsn: Lsn, gc_horizon: int, size: int, prev_size: int
|
||||
):
|
||||
if current_lsn - initdb_lsn >= gc_horizon:
|
||||
assert (
|
||||
size >= prev
|
||||
size >= prev_size
|
||||
), "tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size"
|
||||
else:
|
||||
assert (
|
||||
size > prev
|
||||
size > prev_size
|
||||
), "tenant_size should grow, because we continue to add WAL to initial snapshot size"
|
||||
|
||||
with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg:
|
||||
initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
def get_current_consistent_size(
|
||||
env: NeonEnv,
|
||||
pg: Postgres,
|
||||
size_debug_file, # apparently there is no public signature for open()...
|
||||
http_client: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Tuple[Lsn, int]:
|
||||
consistent = False
|
||||
size_debug = None
|
||||
|
||||
current_lsn = wait_for_wal_insert_lsn(env, pg, tenant_id, timeline_id)
|
||||
# We want to make sure we have a self-consistent set of values.
|
||||
# Size changes with WAL, so only if both before and after getting
|
||||
# the size of the tenant reports the same WAL insert LSN, we're OK
|
||||
# to use that (size, LSN) combination.
|
||||
# Note that 'wait_for_wal_flush_lsn' is not accurate enough: There
|
||||
# can be more wal after the flush LSN that can arrive on the
|
||||
# pageserver before we're requesting the page size.
|
||||
# Anyway, in general this is only one iteration, so in general
|
||||
# this is fine.
|
||||
while not consistent:
|
||||
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
|
||||
after_lsn = wait_for_wal_insert_lsn(env, pg, tenant_id, timeline_id)
|
||||
consistent = current_lsn == after_lsn
|
||||
current_lsn = after_lsn
|
||||
size_debug_file.write(size_debug)
|
||||
return (current_lsn, size)
|
||||
|
||||
with env.postgres.create_start(
|
||||
branch_name,
|
||||
tenant_id=tenant_id,
|
||||
### autovacuum is disabled to limit WAL logging.
|
||||
config_lines=["autovacuum=off"],
|
||||
) as pg:
|
||||
(initdb_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
collected_responses.append(("INITDB", initdb_lsn, size))
|
||||
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL)")
|
||||
cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL) WITH (fillfactor = 40)")
|
||||
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
collected_responses.append(("CREATE", current_lsn, size))
|
||||
|
||||
batch_size = 100
|
||||
|
||||
i = 0
|
||||
while True:
|
||||
for i in range(3):
|
||||
with pg.cursor() as cur:
|
||||
cur.execute(
|
||||
f"INSERT INTO t0(i) SELECT i FROM generate_series({batch_size} * %s, ({batch_size} * (%s + 1)) - 1) s(i)",
|
||||
@@ -365,27 +433,24 @@ def test_single_branch_get_tenant_size_grows(
|
||||
|
||||
i += 1
|
||||
|
||||
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
prev_size = collected_responses[-1][2]
|
||||
if size == 0:
|
||||
assert prev_size == 0
|
||||
else:
|
||||
# branch start shouldn't be past gc_horizon yet
|
||||
# thus the size should grow as we insert more data
|
||||
# "gc_horizon" is tuned so that it kicks in _after_ the
|
||||
# insert phase, but before the update phase ends.
|
||||
assert (
|
||||
current_lsn - initdb_lsn <= gc_horizon
|
||||
), "Tuning of GC window is likely out-of-date"
|
||||
assert size > prev_size
|
||||
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
size_debug_file.write(size_debug)
|
||||
|
||||
if len(collected_responses) > 0:
|
||||
prev = collected_responses[-1][1]
|
||||
if size == 0:
|
||||
assert prev == 0
|
||||
else:
|
||||
# branch start shouldn't be past gc_horizon yet
|
||||
# thus the size should grow as we insert more data
|
||||
assert current_lsn - initdb_lsn <= gc_horizon
|
||||
assert size > prev
|
||||
|
||||
collected_responses.append((current_lsn, size))
|
||||
|
||||
if len(collected_responses) > 2:
|
||||
break
|
||||
collected_responses.append(("INSERT", current_lsn, size))
|
||||
|
||||
while True:
|
||||
with pg.cursor() as cur:
|
||||
@@ -397,18 +462,15 @@ def test_single_branch_get_tenant_size_grows(
|
||||
if updated == 0:
|
||||
break
|
||||
|
||||
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
prev_size = collected_responses[-1][2]
|
||||
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
size_debug_file.write(size_debug)
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
|
||||
|
||||
prev = collected_responses[-1][1]
|
||||
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
|
||||
|
||||
collected_responses.append((current_lsn, size))
|
||||
collected_responses.append(("UPDATE", current_lsn, size))
|
||||
|
||||
while True:
|
||||
with pg.cursor() as cur:
|
||||
@@ -418,40 +480,47 @@ def test_single_branch_get_tenant_size_grows(
|
||||
if deleted == 0:
|
||||
break
|
||||
|
||||
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
size = http_client.tenant_size(tenant_id)
|
||||
prev = collected_responses[-1][1]
|
||||
prev_size = collected_responses[-1][2]
|
||||
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
|
||||
|
||||
collected_responses.append((current_lsn, size))
|
||||
collected_responses.append(("DELETE", current_lsn, size))
|
||||
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("DROP TABLE t0")
|
||||
|
||||
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
# The size of the tenant should still be as large as before we dropped
|
||||
# the table, because the drop operation can still be undone in the PITR
|
||||
# defined by gc_horizon.
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
size = http_client.tenant_size(tenant_id)
|
||||
prev = collected_responses[-1][1]
|
||||
prev_size = collected_responses[-1][2]
|
||||
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
|
||||
|
||||
collected_responses.append((current_lsn, size))
|
||||
collected_responses.append(("DROP", current_lsn, size))
|
||||
|
||||
# this isn't too many lines to forget for a while. observed while
|
||||
# developing these tests that locally the value is a bit more than what we
|
||||
# get in the ci.
|
||||
for lsn, size in collected_responses:
|
||||
log.info(f"collected: {lsn}, {size}")
|
||||
for phase, lsn, size in collected_responses:
|
||||
log.info(f"collected: {phase}, {lsn}, {size}")
|
||||
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
size_after = http_client.tenant_size(tenant_id)
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
size_debug_file.write(size_debug)
|
||||
size_debug_file.close()
|
||||
|
||||
size_after = http_client.tenant_size(tenant_id)
|
||||
prev = collected_responses[-1][1]
|
||||
prev = collected_responses[-1][2]
|
||||
|
||||
assert size_after == prev, "size after restarting pageserver should not have changed"
|
||||
|
||||
@@ -587,7 +656,11 @@ def mask_model_inputs(x):
|
||||
if isinstance(x, dict):
|
||||
newx = {}
|
||||
for k, v in x.items():
|
||||
if k == "size":
|
||||
if (
|
||||
k in ["size", "wal_from_parent", "last_record"]
|
||||
or k.endswith("lsn")
|
||||
or k.endswith("cutoff")
|
||||
):
|
||||
if v is None or v == 0:
|
||||
# no change
|
||||
newx[k] = v
|
||||
@@ -595,12 +668,6 @@ def mask_model_inputs(x):
|
||||
newx[k] = "<0"
|
||||
else:
|
||||
newx[k] = ">0"
|
||||
elif k.endswith("lsn") or k.endswith("cutoff") or k == "last_record":
|
||||
if v is None or v == 0 or v == "0/0":
|
||||
# no change
|
||||
newx[k] = v
|
||||
else:
|
||||
newx[k] = "masked"
|
||||
else:
|
||||
newx[k] = mask_model_inputs(v)
|
||||
return newx
|
||||
|
||||
34
test_runner/regress/test_unlogged.py
Normal file
34
test_runner/regress/test_unlogged.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
|
||||
|
||||
|
||||
#
|
||||
# Test UNLOGGED tables/relations. Postgres copies init fork contents to main
|
||||
# fork to reset them during recovery. In Neon, pageserver directly sends init
|
||||
# fork contents as main fork during basebackup.
|
||||
#
|
||||
def test_unlogged(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_unlogged", "empty")
|
||||
pg = env.postgres.create_start("test_unlogged")
|
||||
|
||||
conn = pg.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("CREATE UNLOGGED TABLE iut (id int);")
|
||||
# create index to test unlogged index relation as well
|
||||
cur.execute("CREATE UNIQUE INDEX iut_idx ON iut (id);")
|
||||
cur.execute("INSERT INTO iut values (42);")
|
||||
|
||||
# create another compute to fetch inital empty contents from pageserver
|
||||
fork_at_current_lsn(env, pg, "test_unlogged_basebackup", "test_unlogged")
|
||||
pg2 = env.postgres.create_start(
|
||||
"test_unlogged_basebackup",
|
||||
)
|
||||
|
||||
conn2 = pg2.connect()
|
||||
cur2 = conn2.cursor()
|
||||
# after restart table should be empty but valid
|
||||
cur2.execute("PREPARE iut_plan (int) AS INSERT INTO iut VALUES ($1)")
|
||||
cur2.execute("EXECUTE iut_plan (43);")
|
||||
cur2.execute("SELECT * FROM iut")
|
||||
assert cur2.fetchall() == [(43,)]
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: f210ac524b...5fb2e0bba0
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 33f9763454...919851e781
Reference in New Issue
Block a user