mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-26 01:20:38 +00:00
Compare commits
12 Commits
hackaneon/
...
faster-ci
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
209fd611ac | ||
|
|
31192ee655 | ||
|
|
8ecbe975cd | ||
|
|
1e62eb9ba0 | ||
|
|
842be0ba74 | ||
|
|
982b376ea2 | ||
|
|
e158df4e86 | ||
|
|
723c0971e8 | ||
|
|
c8f67eed8f | ||
|
|
2d885ac07a | ||
|
|
89c5e80b3f | ||
|
|
93ec7503e0 |
115
Cargo.lock
generated
115
Cargo.lock
generated
@@ -1189,9 +1189,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "comfy-table"
|
||||
version = "6.1.4"
|
||||
version = "7.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e7b787b0dc42e8111badfdbe4c3059158ccb2db8780352fa1b01e8ccf45cc4d"
|
||||
checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7"
|
||||
dependencies = [
|
||||
"crossterm",
|
||||
"strum",
|
||||
@@ -1246,7 +1246,7 @@ dependencies = [
|
||||
"tokio-postgres",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"toml_edit 0.19.10",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"tracing-opentelemetry",
|
||||
"tracing-subscriber",
|
||||
@@ -1360,8 +1360,8 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-postgres",
|
||||
"tokio-util",
|
||||
"toml 0.7.4",
|
||||
"toml_edit 0.19.10",
|
||||
"toml",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"url",
|
||||
"utils",
|
||||
@@ -1485,25 +1485,22 @@ checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
|
||||
|
||||
[[package]]
|
||||
name = "crossterm"
|
||||
version = "0.25.0"
|
||||
version = "0.27.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67"
|
||||
checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df"
|
||||
dependencies = [
|
||||
"bitflags 1.3.2",
|
||||
"bitflags 2.4.1",
|
||||
"crossterm_winapi",
|
||||
"libc",
|
||||
"mio",
|
||||
"parking_lot 0.12.1",
|
||||
"signal-hook",
|
||||
"signal-hook-mio",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossterm_winapi"
|
||||
version = "0.9.0"
|
||||
version = "0.9.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2ae1b35a484aa10e07fe0638d02301c5ad24de82d310ccbd2f3693da5f09bf1c"
|
||||
checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b"
|
||||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
@@ -3144,7 +3141,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fd01039851e82f8799046eabbb354056283fb265c8ec0996af940f4e85a380ff"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"toml 0.8.14",
|
||||
"toml",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3660,7 +3657,7 @@ dependencies = [
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"toml_edit 0.19.10",
|
||||
"toml_edit",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
]
|
||||
@@ -3747,7 +3744,7 @@ dependencies = [
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml_edit 0.19.10",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"twox-hash",
|
||||
"url",
|
||||
@@ -3910,8 +3907,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "parquet"
|
||||
version = "51.0.0"
|
||||
source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829"
|
||||
version = "53.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f0fbf928021131daaa57d334ca8e3904fe9ae22f73c56244fc7db9b04eedc3d8"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"bytes",
|
||||
@@ -3930,8 +3928,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "parquet_derive"
|
||||
version = "51.0.0"
|
||||
source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829"
|
||||
version = "53.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "86e9fcfae007533a06b580429a3f7e07cb833ec8aa37c041c16563e7918f057e"
|
||||
dependencies = [
|
||||
"parquet",
|
||||
"proc-macro2",
|
||||
@@ -4121,7 +4120,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres"
|
||||
version = "0.19.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4134,7 +4133,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-protocol"
|
||||
version = "0.6.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
dependencies = [
|
||||
"base64 0.20.0",
|
||||
"byteorder",
|
||||
@@ -4153,7 +4152,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "postgres-types"
|
||||
version = "0.2.4"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"fallible-iterator",
|
||||
@@ -4812,7 +4811,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"toml_edit 0.19.10",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"utils",
|
||||
]
|
||||
@@ -5322,7 +5321,7 @@ dependencies = [
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml_edit 0.19.10",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
@@ -5731,17 +5730,6 @@ dependencies = [
|
||||
"signal-hook-registry",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-mio"
|
||||
version = "0.2.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"mio",
|
||||
"signal-hook",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "signal-hook-registry"
|
||||
version = "1.4.1"
|
||||
@@ -6054,21 +6042,21 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
|
||||
|
||||
[[package]]
|
||||
name = "strum"
|
||||
version = "0.24.1"
|
||||
version = "0.26.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f"
|
||||
checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06"
|
||||
|
||||
[[package]]
|
||||
name = "strum_macros"
|
||||
version = "0.24.3"
|
||||
version = "0.26.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59"
|
||||
checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
|
||||
dependencies = [
|
||||
"heck 0.4.1",
|
||||
"heck 0.5.0",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"rustversion",
|
||||
"syn 1.0.109",
|
||||
"syn 2.0.52",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6409,7 +6397,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "tokio-postgres"
|
||||
version = "0.7.7"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"byteorder",
|
||||
@@ -6520,18 +6508,6 @@ dependencies = [
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.7.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec"
|
||||
dependencies = [
|
||||
"serde",
|
||||
"serde_spanned",
|
||||
"toml_datetime",
|
||||
"toml_edit 0.19.10",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.8.14"
|
||||
@@ -6541,7 +6517,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_spanned",
|
||||
"toml_datetime",
|
||||
"toml_edit 0.22.14",
|
||||
"toml_edit",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6553,19 +6529,6 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml_edit"
|
||||
version = "0.19.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2380d56e8670370eee6566b0bfd4265f65b3f432e8c6d85623f728d4fa31f739"
|
||||
dependencies = [
|
||||
"indexmap 1.9.3",
|
||||
"serde",
|
||||
"serde_spanned",
|
||||
"toml_datetime",
|
||||
"winnow 0.4.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "toml_edit"
|
||||
version = "0.22.14"
|
||||
@@ -6576,7 +6539,7 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_spanned",
|
||||
"toml_datetime",
|
||||
"winnow 0.6.13",
|
||||
"winnow",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6989,7 +6952,7 @@ dependencies = [
|
||||
"tokio-stream",
|
||||
"tokio-tar",
|
||||
"tokio-util",
|
||||
"toml_edit 0.19.10",
|
||||
"toml_edit",
|
||||
"tracing",
|
||||
"tracing-error",
|
||||
"tracing-subscriber",
|
||||
@@ -7535,15 +7498,6 @@ version = "0.52.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8"
|
||||
|
||||
[[package]]
|
||||
name = "winnow"
|
||||
version = "0.4.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61de7bac303dc551fe038e2b3cef0f571087a47571ea6e79a87692ac99b99699"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "winnow"
|
||||
version = "0.6.13"
|
||||
@@ -7651,6 +7605,7 @@ dependencies = [
|
||||
"tokio",
|
||||
"tokio-rustls 0.24.0",
|
||||
"tokio-util",
|
||||
"toml_edit",
|
||||
"tonic",
|
||||
"tower",
|
||||
"tracing",
|
||||
|
||||
39
Cargo.toml
39
Cargo.toml
@@ -73,7 +73,7 @@ camino = "1.1.6"
|
||||
cfg-if = "1.0.0"
|
||||
chrono = { version = "0.4", default-features = false, features = ["clock"] }
|
||||
clap = { version = "4.0", features = ["derive"] }
|
||||
comfy-table = "6.1"
|
||||
comfy-table = "7.1"
|
||||
const_format = "0.2"
|
||||
crc32c = "0.6"
|
||||
crossbeam-deque = "0.8.5"
|
||||
@@ -123,8 +123,8 @@ opentelemetry = "0.20.0"
|
||||
opentelemetry-otlp = { version = "0.13.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.12.0"
|
||||
parking_lot = "0.12"
|
||||
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
|
||||
parquet_derive = "51.0.0"
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
parquet_derive = "53"
|
||||
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
|
||||
pin-project-lite = "0.2"
|
||||
procfs = "0.16"
|
||||
@@ -158,8 +158,8 @@ signal-hook = "0.3"
|
||||
smallvec = "1.11"
|
||||
smol_str = { version = "0.2.0", features = ["serde"] }
|
||||
socket2 = "0.5"
|
||||
strum = "0.24"
|
||||
strum_macros = "0.24"
|
||||
strum = "0.26"
|
||||
strum_macros = "0.26"
|
||||
"subtle" = "2.5.0"
|
||||
svg_fmt = "0.4.3"
|
||||
sync_wrapper = "0.1.2"
|
||||
@@ -177,8 +177,8 @@ tokio-rustls = "0.25"
|
||||
tokio-stream = "0.1"
|
||||
tokio-tar = "0.3"
|
||||
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
|
||||
toml = "0.7"
|
||||
toml_edit = "0.19"
|
||||
toml = "0.8"
|
||||
toml_edit = "0.22"
|
||||
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
|
||||
tower-service = "0.3.2"
|
||||
tracing = "0.1"
|
||||
@@ -201,10 +201,21 @@ env_logger = "0.10"
|
||||
log = "0.4"
|
||||
|
||||
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
|
||||
# We want to use the 'neon' branch for these, but there's currently one
|
||||
# incompatible change on the branch. See:
|
||||
#
|
||||
# - PR #8076 which contained changes that depended on the new changes in
|
||||
# the rust-postgres crate, and
|
||||
# - PR #8654 which reverted those changes and made the code in proxy incompatible
|
||||
# with the tip of the 'neon' branch again.
|
||||
#
|
||||
# When those proxy changes are re-applied (see PR #8747), we can switch using
|
||||
# the tip of the 'neon' branch again.
|
||||
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
|
||||
## Local libraries
|
||||
compute_api = { version = "0.1", path = "./libs/compute_api/" }
|
||||
@@ -241,11 +252,7 @@ tonic-build = "0.9"
|
||||
[patch.crates-io]
|
||||
|
||||
# Needed to get `tokio-postgres-rustls` to depend on our fork.
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
|
||||
|
||||
# bug fixes for UUID
|
||||
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master" }
|
||||
parquet_derive = { git = "https://github.com/apache/arrow-rs", branch = "master" }
|
||||
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
|
||||
|
||||
################# Binary contents sections
|
||||
|
||||
|
||||
@@ -75,14 +75,14 @@ impl PageServerNode {
|
||||
}
|
||||
}
|
||||
|
||||
fn pageserver_make_identity_toml(&self, node_id: NodeId) -> toml_edit::Document {
|
||||
toml_edit::Document::from_str(&format!("id={node_id}")).unwrap()
|
||||
fn pageserver_make_identity_toml(&self, node_id: NodeId) -> toml_edit::DocumentMut {
|
||||
toml_edit::DocumentMut::from_str(&format!("id={node_id}")).unwrap()
|
||||
}
|
||||
|
||||
fn pageserver_init_make_toml(
|
||||
&self,
|
||||
conf: NeonLocalInitPageserverConf,
|
||||
) -> anyhow::Result<toml_edit::Document> {
|
||||
) -> anyhow::Result<toml_edit::DocumentMut> {
|
||||
assert_eq!(&PageServerConf::from(&conf), &self.conf, "during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully");
|
||||
|
||||
// TODO(christian): instead of what we do here, create a pageserver_api::config::ConfigToml (PR #7656)
|
||||
@@ -137,9 +137,9 @@ impl PageServerNode {
|
||||
|
||||
// Turn `overrides` into a toml document.
|
||||
// TODO: above code is legacy code, it should be refactored to use toml_edit directly.
|
||||
let mut config_toml = toml_edit::Document::new();
|
||||
let mut config_toml = toml_edit::DocumentMut::new();
|
||||
for fragment_str in overrides {
|
||||
let fragment = toml_edit::Document::from_str(&fragment_str)
|
||||
let fragment = toml_edit::DocumentMut::from_str(&fragment_str)
|
||||
.expect("all fragments in `overrides` are valid toml documents, this function controls that");
|
||||
for (key, item) in fragment.iter() {
|
||||
config_toml.insert(key, item.clone());
|
||||
|
||||
@@ -263,15 +263,6 @@ impl Key {
|
||||
field5: u8::MAX,
|
||||
field6: u32::MAX,
|
||||
};
|
||||
/// A key slightly smaller than [`Key::MAX`] for use in layer key ranges to avoid them to be confused with L0 layers
|
||||
pub const NON_L0_MAX: Key = Key {
|
||||
field1: u8::MAX,
|
||||
field2: u32::MAX,
|
||||
field3: u32::MAX,
|
||||
field4: u32::MAX,
|
||||
field5: u8::MAX,
|
||||
field6: u32::MAX - 1,
|
||||
};
|
||||
|
||||
pub fn from_hex(s: &str) -> Result<Self> {
|
||||
if s.len() != 36 {
|
||||
|
||||
@@ -62,7 +62,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
serde::Serialize,
|
||||
serde::Deserialize,
|
||||
strum_macros::Display,
|
||||
strum_macros::EnumVariantNames,
|
||||
strum_macros::VariantNames,
|
||||
strum_macros::AsRefStr,
|
||||
strum_macros::IntoStaticStr,
|
||||
)]
|
||||
|
||||
@@ -121,6 +121,7 @@ fn main() -> anyhow::Result<()> {
|
||||
.allowlist_type("XLogPageHeaderData")
|
||||
.allowlist_type("XLogLongPageHeaderData")
|
||||
.allowlist_var("XLOG_PAGE_MAGIC")
|
||||
.allowlist_var("PG_MAJORVERSION_NUM")
|
||||
.allowlist_var("PG_CONTROL_FILE_SIZE")
|
||||
.allowlist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC")
|
||||
.allowlist_type("PageHeaderData")
|
||||
|
||||
@@ -44,6 +44,9 @@ macro_rules! postgres_ffi {
|
||||
// Re-export some symbols from bindings
|
||||
pub use bindings::DBState_DB_SHUTDOWNED;
|
||||
pub use bindings::{CheckPoint, ControlFileData, XLogRecord};
|
||||
|
||||
pub const ZERO_CHECKPOINT: bytes::Bytes =
|
||||
bytes::Bytes::from_static(&[0u8; xlog_utils::SIZEOF_CHECKPOINT]);
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -106,6 +109,107 @@ macro_rules! dispatch_pgversion {
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! enum_pgversion_dispatch {
|
||||
($name:expr, $typ:ident, $bind:ident, $code:block) => {
|
||||
enum_pgversion_dispatch!(
|
||||
name = $name,
|
||||
bind = $bind,
|
||||
typ = $typ,
|
||||
code = $code,
|
||||
pgversions = [
|
||||
V14 : v14,
|
||||
V15 : v15,
|
||||
V16 : v16,
|
||||
]
|
||||
)
|
||||
};
|
||||
(name = $name:expr,
|
||||
bind = $bind:ident,
|
||||
typ = $typ:ident,
|
||||
code = $code:block,
|
||||
pgversions = [$($variant:ident : $md:ident),+ $(,)?]) => {
|
||||
match $name {
|
||||
$(
|
||||
self::$typ::$variant($bind) => {
|
||||
use $crate::$md as pgv;
|
||||
$code
|
||||
}
|
||||
),+,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! enum_pgversion {
|
||||
{$name:ident, pgv :: $t:ident} => {
|
||||
enum_pgversion!{
|
||||
name = $name,
|
||||
typ = $t,
|
||||
pgversions = [
|
||||
V14 : v14,
|
||||
V15 : v15,
|
||||
V16 : v16,
|
||||
]
|
||||
}
|
||||
};
|
||||
{$name:ident, pgv :: $p:ident :: $t:ident} => {
|
||||
enum_pgversion!{
|
||||
name = $name,
|
||||
path = $p,
|
||||
typ = $t,
|
||||
pgversions = [
|
||||
V14 : v14,
|
||||
V15 : v15,
|
||||
V16 : v16,
|
||||
]
|
||||
}
|
||||
};
|
||||
{name = $name:ident,
|
||||
typ = $t:ident,
|
||||
pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
|
||||
pub enum $name {
|
||||
$($variant ( $crate::$md::$t )),+
|
||||
}
|
||||
impl self::$name {
|
||||
pub fn pg_version(&self) -> u32 {
|
||||
enum_pgversion_dispatch!(self, $name, _ign, {
|
||||
pgv::bindings::PG_MAJORVERSION_NUM
|
||||
})
|
||||
}
|
||||
}
|
||||
$(
|
||||
impl Into<self::$name> for $crate::$md::$t {
|
||||
fn into(self) -> self::$name {
|
||||
self::$name::$variant (self)
|
||||
}
|
||||
}
|
||||
)+
|
||||
};
|
||||
{name = $name:ident,
|
||||
path = $p:ident,
|
||||
typ = $t:ident,
|
||||
pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
|
||||
pub enum $name {
|
||||
$($variant ($crate::$md::$p::$t)),+
|
||||
}
|
||||
impl $name {
|
||||
pub fn pg_version(&self) -> u32 {
|
||||
enum_pgversion_dispatch!(self, $name, _ign, {
|
||||
pgv::bindings::PG_MAJORVERSION_NUM
|
||||
})
|
||||
}
|
||||
}
|
||||
$(
|
||||
impl Into<$name> for $crate::$md::$p::$t {
|
||||
fn into(self) -> $name {
|
||||
$name::$variant (self)
|
||||
}
|
||||
}
|
||||
)+
|
||||
};
|
||||
}
|
||||
|
||||
pub mod pg_constants;
|
||||
pub mod relfile_utils;
|
||||
|
||||
|
||||
@@ -185,7 +185,7 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
fn parse(input: &str) -> anyhow::Result<RemoteStorageConfig> {
|
||||
let toml = input.parse::<toml_edit::Document>().unwrap();
|
||||
let toml = input.parse::<toml_edit::DocumentMut>().unwrap();
|
||||
RemoteStorageConfig::from_toml(toml.as_item())
|
||||
}
|
||||
|
||||
|
||||
@@ -3,11 +3,9 @@ use std::str::FromStr;
|
||||
use anyhow::Context;
|
||||
use metrics::{IntCounter, IntCounterVec};
|
||||
use once_cell::sync::Lazy;
|
||||
use strum_macros::{EnumString, EnumVariantNames};
|
||||
use strum_macros::{EnumString, VariantNames};
|
||||
|
||||
#[derive(
|
||||
EnumString, strum_macros::Display, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy,
|
||||
)]
|
||||
#[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
pub enum LogFormat {
|
||||
Plain,
|
||||
|
||||
@@ -10,7 +10,7 @@ pub fn deserialize_item<T>(item: &toml_edit::Item) -> Result<T, Error>
|
||||
where
|
||||
T: serde::de::DeserializeOwned,
|
||||
{
|
||||
let document: toml_edit::Document = match item {
|
||||
let document: toml_edit::DocumentMut = match item {
|
||||
toml_edit::Item::Table(toml) => toml.clone().into(),
|
||||
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
|
||||
toml.clone().into_table().into()
|
||||
|
||||
@@ -174,7 +174,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
println!("specified prefix '{}' failed validation", cmd.prefix);
|
||||
return Ok(());
|
||||
};
|
||||
let toml_document = toml_edit::Document::from_str(&cmd.config_toml_str)?;
|
||||
let toml_document = toml_edit::DocumentMut::from_str(&cmd.config_toml_str)?;
|
||||
let toml_item = toml_document
|
||||
.get("remote_storage")
|
||||
.expect("need remote_storage");
|
||||
|
||||
@@ -9,7 +9,7 @@ use metrics::{
|
||||
use once_cell::sync::Lazy;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use strum::{EnumCount, VariantNames};
|
||||
use strum_macros::{EnumVariantNames, IntoStaticStr};
|
||||
use strum_macros::{IntoStaticStr, VariantNames};
|
||||
use tracing::warn;
|
||||
use utils::id::TimelineId;
|
||||
|
||||
@@ -27,7 +27,7 @@ const CRITICAL_OP_BUCKETS: &[f64] = &[
|
||||
];
|
||||
|
||||
// Metrics collected on operations on the storage repository.
|
||||
#[derive(Debug, EnumVariantNames, IntoStaticStr)]
|
||||
#[derive(Debug, VariantNames, IntoStaticStr)]
|
||||
#[strum(serialize_all = "kebab_case")]
|
||||
pub(crate) enum StorageTimeOperation {
|
||||
#[strum(serialize = "layer flush")]
|
||||
|
||||
@@ -7091,13 +7091,13 @@ mod tests {
|
||||
vec![
|
||||
// Image layer at GC horizon
|
||||
PersistentLayerKey {
|
||||
key_range: Key::MIN..Key::NON_L0_MAX,
|
||||
key_range: Key::MIN..Key::MAX,
|
||||
lsn_range: Lsn(0x30)..Lsn(0x31),
|
||||
is_delta: false
|
||||
},
|
||||
// The delta layer covers the full range (with the layer key hack to avoid being recognized as L0)
|
||||
// The delta layer below the horizon
|
||||
PersistentLayerKey {
|
||||
key_range: Key::MIN..Key::NON_L0_MAX,
|
||||
key_range: get_key(3)..get_key(4),
|
||||
lsn_range: Lsn(0x30)..Lsn(0x48),
|
||||
is_delta: true
|
||||
},
|
||||
|
||||
@@ -452,7 +452,8 @@ impl TryFrom<toml_edit::Item> for TenantConfOpt {
|
||||
.map_err(|e| anyhow::anyhow!("{}: {}", e.path(), e.inner().message()));
|
||||
}
|
||||
toml_edit::Item::Table(table) => {
|
||||
let deserializer = toml_edit::de::Deserializer::new(table.into());
|
||||
let deserializer =
|
||||
toml_edit::de::Deserializer::from(toml_edit::DocumentMut::from(table));
|
||||
return serde_path_to_error::deserialize(deserializer)
|
||||
.map_err(|e| anyhow::anyhow!("{}: {}", e.path(), e.inner().message()));
|
||||
}
|
||||
|
||||
@@ -188,7 +188,7 @@ impl SplitImageLayerWriter {
|
||||
.await
|
||||
}
|
||||
|
||||
/// When split writer fails, the caller should call this function and handle partially generated layers.
|
||||
/// This function will be deprecated with #8841.
|
||||
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
|
||||
Ok((self.generated_layers, self.inner))
|
||||
}
|
||||
@@ -204,7 +204,7 @@ impl SplitImageLayerWriter {
|
||||
/// will split them into multiple files based on size.
|
||||
#[must_use]
|
||||
pub struct SplitDeltaLayerWriter {
|
||||
inner: DeltaLayerWriter,
|
||||
inner: Option<(Key, DeltaLayerWriter)>,
|
||||
target_layer_size: u64,
|
||||
generated_layers: Vec<SplitWriterResult>,
|
||||
conf: &'static PageServerConf,
|
||||
@@ -212,7 +212,6 @@ pub struct SplitDeltaLayerWriter {
|
||||
tenant_shard_id: TenantShardId,
|
||||
lsn_range: Range<Lsn>,
|
||||
last_key_written: Key,
|
||||
start_key: Key,
|
||||
}
|
||||
|
||||
impl SplitDeltaLayerWriter {
|
||||
@@ -220,29 +219,18 @@ impl SplitDeltaLayerWriter {
|
||||
conf: &'static PageServerConf,
|
||||
timeline_id: TimelineId,
|
||||
tenant_shard_id: TenantShardId,
|
||||
start_key: Key,
|
||||
lsn_range: Range<Lsn>,
|
||||
target_layer_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<Self> {
|
||||
Ok(Self {
|
||||
target_layer_size,
|
||||
inner: DeltaLayerWriter::new(
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
start_key,
|
||||
lsn_range.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
inner: None,
|
||||
generated_layers: Vec::new(),
|
||||
conf,
|
||||
timeline_id,
|
||||
tenant_shard_id,
|
||||
lsn_range,
|
||||
last_key_written: Key::MIN,
|
||||
start_key,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -265,9 +253,26 @@ impl SplitDeltaLayerWriter {
|
||||
//
|
||||
// Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
|
||||
// strategy. https://github.com/neondatabase/neon/issues/8837
|
||||
|
||||
if self.inner.is_none() {
|
||||
self.inner = Some((
|
||||
key,
|
||||
DeltaLayerWriter::new(
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
key,
|
||||
self.lsn_range.clone(),
|
||||
ctx,
|
||||
)
|
||||
.await?,
|
||||
));
|
||||
}
|
||||
let (_, inner) = self.inner.as_mut().unwrap();
|
||||
|
||||
let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
|
||||
if self.inner.num_keys() >= 1
|
||||
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
|
||||
if inner.num_keys() >= 1
|
||||
&& inner.estimated_size() + addition_size_estimation >= self.target_layer_size
|
||||
{
|
||||
if key != self.last_key_written {
|
||||
let next_delta_writer = DeltaLayerWriter::new(
|
||||
@@ -279,13 +284,13 @@ impl SplitDeltaLayerWriter {
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
|
||||
let (start_key, prev_delta_writer) =
|
||||
std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
|
||||
let layer_key = PersistentLayerKey {
|
||||
key_range: self.start_key..key,
|
||||
key_range: start_key..key,
|
||||
lsn_range: self.lsn_range.clone(),
|
||||
is_delta: true,
|
||||
};
|
||||
self.start_key = key;
|
||||
if discard(&layer_key).await {
|
||||
drop(prev_delta_writer);
|
||||
self.generated_layers
|
||||
@@ -296,17 +301,18 @@ impl SplitDeltaLayerWriter {
|
||||
self.generated_layers
|
||||
.push(SplitWriterResult::Produced(delta_layer));
|
||||
}
|
||||
} else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT {
|
||||
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
|
||||
// We have to produce a very large file b/c a key is updated too often.
|
||||
anyhow::bail!(
|
||||
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
|
||||
key,
|
||||
self.inner.estimated_size()
|
||||
inner.estimated_size()
|
||||
);
|
||||
}
|
||||
}
|
||||
self.last_key_written = key;
|
||||
self.inner.put_value(key, lsn, val, ctx).await
|
||||
let (_, inner) = self.inner.as_mut().unwrap();
|
||||
inner.put_value(key, lsn, val, ctx).await
|
||||
}
|
||||
|
||||
pub async fn put_value(
|
||||
@@ -325,7 +331,6 @@ impl SplitDeltaLayerWriter {
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
discard: D,
|
||||
) -> anyhow::Result<Vec<SplitWriterResult>>
|
||||
where
|
||||
@@ -337,11 +342,15 @@ impl SplitDeltaLayerWriter {
|
||||
inner,
|
||||
..
|
||||
} = self;
|
||||
let Some((start_key, inner)) = inner else {
|
||||
return Ok(generated_layers);
|
||||
};
|
||||
if inner.num_keys() == 0 {
|
||||
return Ok(generated_layers);
|
||||
}
|
||||
let end_key = self.last_key_written.next();
|
||||
let layer_key = PersistentLayerKey {
|
||||
key_range: self.start_key..end_key,
|
||||
key_range: start_key..end_key,
|
||||
lsn_range: self.lsn_range.clone(),
|
||||
is_delta: true,
|
||||
};
|
||||
@@ -360,15 +369,14 @@ impl SplitDeltaLayerWriter {
|
||||
self,
|
||||
tline: &Arc<Timeline>,
|
||||
ctx: &RequestContext,
|
||||
end_key: Key,
|
||||
) -> anyhow::Result<Vec<SplitWriterResult>> {
|
||||
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
|
||||
self.finish_with_discard_fn(tline, ctx, |_| async { false })
|
||||
.await
|
||||
}
|
||||
|
||||
/// When split writer fails, the caller should call this function and handle partially generated layers.
|
||||
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, DeltaLayerWriter)> {
|
||||
Ok((self.generated_layers, self.inner))
|
||||
/// This function will be deprecated with #8841.
|
||||
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, Option<DeltaLayerWriter>)> {
|
||||
Ok((self.generated_layers, self.inner.map(|x| x.1)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -432,10 +440,8 @@ mod tests {
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -460,11 +466,22 @@ mod tests {
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(10))
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
|
||||
assert_eq!(layers.len(), 1);
|
||||
assert_eq!(
|
||||
layers
|
||||
.into_iter()
|
||||
.next()
|
||||
.unwrap()
|
||||
.into_resident_layer()
|
||||
.layer_desc()
|
||||
.key(),
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(0)..get_key(1),
|
||||
lsn_range: Lsn(0x18)..Lsn(0x20),
|
||||
is_delta: true
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -501,10 +518,8 @@ mod tests {
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -533,10 +548,7 @@ mod tests {
|
||||
.finish(&tline, &ctx, get_key(N as u32))
|
||||
.await
|
||||
.unwrap();
|
||||
let delta_layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(N as u32))
|
||||
.await
|
||||
.unwrap();
|
||||
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
|
||||
if discard {
|
||||
for layer in image_layers {
|
||||
layer.into_discarded_layer();
|
||||
@@ -555,6 +567,14 @@ mod tests {
|
||||
.collect_vec();
|
||||
assert_eq!(image_layers.len(), N / 512 + 1);
|
||||
assert_eq!(delta_layers.len(), N / 512 + 1);
|
||||
assert_eq!(
|
||||
delta_layers.first().unwrap().layer_desc().key_range.start,
|
||||
get_key(0)
|
||||
);
|
||||
assert_eq!(
|
||||
delta_layers.last().unwrap().layer_desc().key_range.end,
|
||||
get_key(N as u32)
|
||||
);
|
||||
for idx in 0..image_layers.len() {
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
|
||||
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
|
||||
@@ -602,10 +622,8 @@ mod tests {
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x18)..Lsn(0x20),
|
||||
4 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -644,11 +662,35 @@ mod tests {
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(10))
|
||||
.await
|
||||
.unwrap();
|
||||
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
|
||||
assert_eq!(layers.len(), 2);
|
||||
let mut layers_iter = layers.into_iter();
|
||||
assert_eq!(
|
||||
layers_iter
|
||||
.next()
|
||||
.unwrap()
|
||||
.into_resident_layer()
|
||||
.layer_desc()
|
||||
.key(),
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(0)..get_key(1),
|
||||
lsn_range: Lsn(0x18)..Lsn(0x20),
|
||||
is_delta: true
|
||||
}
|
||||
);
|
||||
assert_eq!(
|
||||
layers_iter
|
||||
.next()
|
||||
.unwrap()
|
||||
.into_resident_layer()
|
||||
.layer_desc()
|
||||
.key(),
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(1)..get_key(2),
|
||||
lsn_range: Lsn(0x18)..Lsn(0x20),
|
||||
is_delta: true
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -668,10 +710,8 @@ mod tests {
|
||||
tenant.conf,
|
||||
tline.timeline_id,
|
||||
tenant.tenant_shard_id,
|
||||
get_key(0),
|
||||
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
|
||||
4 * 1024 * 1024,
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -689,10 +729,20 @@ mod tests {
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
let delta_layers = delta_writer
|
||||
.finish(&tline, &ctx, get_key(N as u32))
|
||||
.await
|
||||
.unwrap();
|
||||
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
|
||||
assert_eq!(delta_layers.len(), 1);
|
||||
let delta_layer = delta_layers
|
||||
.into_iter()
|
||||
.next()
|
||||
.unwrap()
|
||||
.into_resident_layer();
|
||||
assert_eq!(
|
||||
delta_layer.layer_desc().key(),
|
||||
PersistentLayerKey {
|
||||
key_range: get_key(0)..get_key(1),
|
||||
lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
|
||||
is_delta: true
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1809,7 +1809,6 @@ impl Timeline {
|
||||
.unwrap();
|
||||
// We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
|
||||
// as an L0 layer.
|
||||
let hack_end_key = Key::NON_L0_MAX;
|
||||
let mut delta_layers = Vec::new();
|
||||
let mut image_layers = Vec::new();
|
||||
let mut downloaded_layers = Vec::new();
|
||||
@@ -1855,10 +1854,8 @@ impl Timeline {
|
||||
self.conf,
|
||||
self.timeline_id,
|
||||
self.tenant_shard_id,
|
||||
Key::MIN,
|
||||
lowest_retain_lsn..end_lsn,
|
||||
self.get_compaction_target_size(),
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -1965,7 +1962,7 @@ impl Timeline {
|
||||
let produced_image_layers = if let Some(writer) = image_layer_writer {
|
||||
if !dry_run {
|
||||
writer
|
||||
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
|
||||
.finish_with_discard_fn(self, ctx, Key::MAX, discard)
|
||||
.await?
|
||||
} else {
|
||||
let (layers, _) = writer.take()?;
|
||||
@@ -1978,7 +1975,7 @@ impl Timeline {
|
||||
|
||||
let produced_delta_layers = if !dry_run {
|
||||
delta_layer_writer
|
||||
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
|
||||
.finish_with_discard_fn(self, ctx, discard)
|
||||
.await?
|
||||
} else {
|
||||
let (layers, _) = delta_layer_writer.take()?;
|
||||
|
||||
@@ -25,9 +25,7 @@ use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
|
||||
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
|
||||
use postgres_ffi::TimestampTz;
|
||||
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
|
||||
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
|
||||
|
||||
use anyhow::{bail, Context, Result};
|
||||
@@ -48,16 +46,31 @@ use pageserver_api::key::rel_block_to_key;
|
||||
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
|
||||
use postgres_ffi::v14::xlog_utils::*;
|
||||
use postgres_ffi::v14::CheckPoint;
|
||||
use postgres_ffi::TransactionId;
|
||||
use postgres_ffi::BLCKSZ;
|
||||
use utils::bin_ser::SerializeError;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
enum_pgversion! {CheckPoint, pgv::CheckPoint}
|
||||
|
||||
impl CheckPoint {
|
||||
fn encode(&self) -> Result<Bytes, SerializeError> {
|
||||
enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
|
||||
}
|
||||
|
||||
fn update_next_xid(&mut self, xid: u32) -> bool {
|
||||
enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
|
||||
}
|
||||
|
||||
pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
|
||||
enum_pgversion_dispatch!(self, CheckPoint, cp, {
|
||||
cp.update_next_multixid(multi_xid, multi_offset)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WalIngest {
|
||||
shard: ShardIdentity,
|
||||
pg_version: u32,
|
||||
checkpoint: CheckPoint,
|
||||
checkpoint_modified: bool,
|
||||
warn_ingest_lag: WarnIngestLag,
|
||||
@@ -78,12 +91,16 @@ impl WalIngest {
|
||||
// Fetch the latest checkpoint into memory, so that we can compare with it
|
||||
// quickly in `ingest_record` and update it when it changes.
|
||||
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
|
||||
let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
|
||||
let pgversion = timeline.pg_version;
|
||||
|
||||
let checkpoint = dispatch_pgversion!(pgversion, {
|
||||
let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
|
||||
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
|
||||
<pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
|
||||
});
|
||||
|
||||
Ok(WalIngest {
|
||||
shard: *timeline.get_shard_identity(),
|
||||
pg_version: timeline.pg_version,
|
||||
checkpoint,
|
||||
checkpoint_modified: false,
|
||||
warn_ingest_lag: WarnIngestLag {
|
||||
@@ -117,7 +134,7 @@ impl WalIngest {
|
||||
|
||||
modification.set_lsn(lsn)?;
|
||||
|
||||
if decoded.is_dbase_create_copy(self.pg_version) {
|
||||
if decoded.is_dbase_create_copy(pg_version) {
|
||||
// Records of this type should always be preceded by a commit(), as they
|
||||
// rely on reading data pages back from the Timeline.
|
||||
assert!(!modification.has_dirty_data_pages());
|
||||
@@ -337,70 +354,67 @@ impl WalIngest {
|
||||
pg_constants::RM_XLOG_ID => {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
|
||||
if info == pg_constants::XLOG_NEXTOID {
|
||||
let next_oid = buf.get_u32_le();
|
||||
if self.checkpoint.nextOid != next_oid {
|
||||
self.checkpoint.nextOid = next_oid;
|
||||
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
|
||||
if info == pg_constants::XLOG_NEXTOID {
|
||||
let next_oid = buf.get_u32_le();
|
||||
if cp.nextOid != next_oid {
|
||||
cp.nextOid = next_oid;
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|
||||
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
{
|
||||
let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
|
||||
buf.copy_to_slice(&mut checkpoint_bytes);
|
||||
let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
|
||||
trace!(
|
||||
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
|
||||
xlog_checkpoint.oldestXid,
|
||||
cp.oldestXid
|
||||
);
|
||||
if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
|
||||
cp.oldestXid = xlog_checkpoint.oldestXid;
|
||||
}
|
||||
trace!(
|
||||
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
|
||||
xlog_checkpoint.oldestActiveXid,
|
||||
cp.oldestActiveXid
|
||||
);
|
||||
|
||||
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
|
||||
// because at shutdown, all in-progress transactions will implicitly
|
||||
// end. Postgres startup code knows that, and allows hot standby to start
|
||||
// immediately from a shutdown checkpoint.
|
||||
//
|
||||
// In Neon, Postgres hot standby startup always behaves as if starting from
|
||||
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
|
||||
// instead of overwriting self.checkpoint.oldestActiveXid with
|
||||
// InvalidTransactionid from the checkpoint WAL record, update it to a
|
||||
// proper value, knowing that there are no in-progress transactions at this
|
||||
// point, except for prepared transactions.
|
||||
//
|
||||
// See also the neon code changes in the InitWalRecovery() function.
|
||||
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
|
||||
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
{
|
||||
let mut oldest_active_xid = cp.nextXid.value as u32;
|
||||
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
|
||||
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
|
||||
oldest_active_xid = xid;
|
||||
}
|
||||
}
|
||||
cp.oldestActiveXid = oldest_active_xid;
|
||||
} else {
|
||||
cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
||||
}
|
||||
|
||||
// Write a new checkpoint key-value pair on every checkpoint record, even
|
||||
// if nothing really changed. Not strictly required, but it seems nice to
|
||||
// have some trace of the checkpoint records in the layer files at the same
|
||||
// LSNs.
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|
||||
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
{
|
||||
let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
|
||||
buf.copy_to_slice(&mut checkpoint_bytes);
|
||||
let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
|
||||
trace!(
|
||||
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
|
||||
xlog_checkpoint.oldestXid,
|
||||
self.checkpoint.oldestXid
|
||||
);
|
||||
if (self
|
||||
.checkpoint
|
||||
.oldestXid
|
||||
.wrapping_sub(xlog_checkpoint.oldestXid) as i32)
|
||||
< 0
|
||||
{
|
||||
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
|
||||
}
|
||||
trace!(
|
||||
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
|
||||
xlog_checkpoint.oldestActiveXid,
|
||||
self.checkpoint.oldestActiveXid
|
||||
);
|
||||
|
||||
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
|
||||
// because at shutdown, all in-progress transactions will implicitly
|
||||
// end. Postgres startup code knows that, and allows hot standby to start
|
||||
// immediately from a shutdown checkpoint.
|
||||
//
|
||||
// In Neon, Postgres hot standby startup always behaves as if starting from
|
||||
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
|
||||
// instead of overwriting self.checkpoint.oldestActiveXid with
|
||||
// InvalidTransactionid from the checkpoint WAL record, update it to a
|
||||
// proper value, knowing that there are no in-progress transactions at this
|
||||
// point, except for prepared transactions.
|
||||
//
|
||||
// See also the neon code changes in the InitWalRecovery() function.
|
||||
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
|
||||
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
|
||||
{
|
||||
let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
|
||||
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
|
||||
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
|
||||
oldest_active_xid = xid;
|
||||
}
|
||||
}
|
||||
self.checkpoint.oldestActiveXid = oldest_active_xid;
|
||||
} else {
|
||||
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
|
||||
}
|
||||
|
||||
// Write a new checkpoint key-value pair on every checkpoint record, even
|
||||
// if nothing really changed. Not strictly required, but it seems nice to
|
||||
// have some trace of the checkpoint records in the layer files at the same
|
||||
// LSNs.
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
});
|
||||
}
|
||||
pg_constants::RM_LOGICALMSG_ID => {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
@@ -424,7 +438,11 @@ impl WalIngest {
|
||||
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
|
||||
if info == pg_constants::XLOG_RUNNING_XACTS {
|
||||
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
|
||||
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
|
||||
|
||||
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
|
||||
cp.oldestActiveXid = xlrec.oldest_running_xid;
|
||||
});
|
||||
|
||||
self.checkpoint_modified = true;
|
||||
}
|
||||
}
|
||||
@@ -539,7 +557,7 @@ impl WalIngest {
|
||||
&& blk.has_image
|
||||
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
|
||||
&& (decoded.xl_info == pg_constants::XLOG_FPI
|
||||
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|
||||
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|
||||
// compression of WAL is not yet supported: fall back to storing the original WAL record
|
||||
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)
|
||||
// do not materialize null pages because them most likely be soon replaced with real data
|
||||
@@ -1242,12 +1260,17 @@ impl WalIngest {
|
||||
fn warn_on_ingest_lag(
|
||||
&mut self,
|
||||
conf: &crate::config::PageServerConf,
|
||||
wal_timestmap: TimestampTz,
|
||||
wal_timestamp: TimestampTz,
|
||||
) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
let now = SystemTime::now();
|
||||
let rate_limits = &mut self.warn_ingest_lag;
|
||||
match try_from_pg_timestamp(wal_timestmap) {
|
||||
|
||||
let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
|
||||
pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
|
||||
});
|
||||
|
||||
match ts {
|
||||
Ok(ts) => {
|
||||
match now.duration_since(ts) {
|
||||
Ok(lag) => {
|
||||
@@ -1257,7 +1280,7 @@ impl WalIngest {
|
||||
warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
|
||||
})
|
||||
}
|
||||
},
|
||||
}
|
||||
Err(e) => {
|
||||
let delta_t = e.duration();
|
||||
// determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
|
||||
@@ -1271,7 +1294,6 @@ impl WalIngest {
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
Err(error) => {
|
||||
rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
|
||||
@@ -1379,14 +1401,17 @@ impl WalIngest {
|
||||
// truncated, but a checkpoint record with the updated values isn't written until
|
||||
// later. In Neon, a server can start at any LSN, not just on a checkpoint record,
|
||||
// so we keep the oldestXid and oldestXidDB up-to-date.
|
||||
self.checkpoint.oldestXid = xlrec.oldest_xid;
|
||||
self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
|
||||
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
|
||||
cp.oldestXid = xlrec.oldest_xid;
|
||||
cp.oldestXidDB = xlrec.oldest_xid_db;
|
||||
});
|
||||
self.checkpoint_modified = true;
|
||||
|
||||
// TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
|
||||
|
||||
let latest_page_number =
|
||||
self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
|
||||
/ pg_constants::CLOG_XACTS_PER_PAGE;
|
||||
|
||||
// Now delete all segments containing pages between xlrec.pageno
|
||||
// and latest_page_number.
|
||||
@@ -1394,7 +1419,9 @@ impl WalIngest {
|
||||
// First, make an important safety check:
|
||||
// the current endpoint page must not be eligible for removal.
|
||||
// See SimpleLruTruncate() in slru.c
|
||||
if clogpage_precedes(latest_page_number, xlrec.pageno) {
|
||||
if dispatch_pgversion!(modification.tline.pg_version, {
|
||||
pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, xlrec.pageno)
|
||||
}) {
|
||||
info!("could not truncate directory pg_xact apparent wraparound");
|
||||
return Ok(());
|
||||
}
|
||||
@@ -1411,7 +1438,12 @@ impl WalIngest {
|
||||
.await?
|
||||
{
|
||||
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
|
||||
if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
|
||||
|
||||
let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
|
||||
pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, xlrec.pageno)
|
||||
});
|
||||
|
||||
if may_delete {
|
||||
modification
|
||||
.drop_slru_segment(SlruKind::Clog, segno, ctx)
|
||||
.await?;
|
||||
@@ -1530,14 +1562,23 @@ impl WalIngest {
|
||||
xlrec: &XlMultiXactTruncate,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<()> {
|
||||
self.checkpoint.oldestMulti = xlrec.end_trunc_off;
|
||||
self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
|
||||
let (maxsegment, startsegment, endsegment) =
|
||||
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
|
||||
cp.oldestMulti = xlrec.end_trunc_off;
|
||||
cp.oldestMultiDB = xlrec.oldest_multi_db;
|
||||
let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
|
||||
pg_constants::MAX_MULTIXACT_OFFSET,
|
||||
);
|
||||
let startsegment: i32 =
|
||||
pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
|
||||
let endsegment: i32 =
|
||||
pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
|
||||
(maxsegment, startsegment, endsegment)
|
||||
});
|
||||
|
||||
self.checkpoint_modified = true;
|
||||
|
||||
// PerformMembersTruncation
|
||||
let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
|
||||
let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
|
||||
let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
|
||||
let mut segment: i32 = startsegment;
|
||||
|
||||
// Delete all the segments except the last one. The last segment can still
|
||||
@@ -1811,11 +1852,23 @@ mod tests {
|
||||
// TODO
|
||||
}
|
||||
|
||||
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
|
||||
#[tokio::test]
|
||||
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
|
||||
for i in 14..=16 {
|
||||
dispatch_pgversion!(i, {
|
||||
pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
|
||||
});
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
|
||||
let mut m = tline.begin_modification(Lsn(0x10));
|
||||
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
|
||||
m.put_checkpoint(dispatch_pgversion!(
|
||||
tline.pg_version,
|
||||
pgv::ZERO_CHECKPOINT.clone()
|
||||
))?;
|
||||
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
|
||||
m.commit(ctx).await?;
|
||||
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
|
||||
|
||||
@@ -598,15 +598,15 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1315874, 3, 6000),
|
||||
(1315867, 3, 6000),
|
||||
(1315927, 3, 6000),
|
||||
(1315884, 3, 6000),
|
||||
(1316014, 3, 6000),
|
||||
(1315856, 3, 6000),
|
||||
(1315648, 3, 6000),
|
||||
(1315884, 3, 6000),
|
||||
(438913, 1, 2000)
|
||||
(1312632, 3, 6000),
|
||||
(1312621, 3, 6000),
|
||||
(1312680, 3, 6000),
|
||||
(1312637, 3, 6000),
|
||||
(1312773, 3, 6000),
|
||||
(1312610, 3, 6000),
|
||||
(1312404, 3, 6000),
|
||||
(1312639, 3, 6000),
|
||||
(437848, 1, 2000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -638,11 +638,11 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1208861, 5, 10000),
|
||||
(1208592, 5, 10000),
|
||||
(1208885, 5, 10000),
|
||||
(1208873, 5, 10000),
|
||||
(1209128, 5, 10000)
|
||||
(1203465, 5, 10000),
|
||||
(1203189, 5, 10000),
|
||||
(1203490, 5, 10000),
|
||||
(1203475, 5, 10000),
|
||||
(1203729, 5, 10000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -667,15 +667,15 @@ mod tests {
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[
|
||||
(1315874, 3, 6000),
|
||||
(1315867, 3, 6000),
|
||||
(1315927, 3, 6000),
|
||||
(1315884, 3, 6000),
|
||||
(1316014, 3, 6000),
|
||||
(1315856, 3, 6000),
|
||||
(1315648, 3, 6000),
|
||||
(1315884, 3, 6000),
|
||||
(438913, 1, 2000)
|
||||
(1312632, 3, 6000),
|
||||
(1312621, 3, 6000),
|
||||
(1312680, 3, 6000),
|
||||
(1312637, 3, 6000),
|
||||
(1312773, 3, 6000),
|
||||
(1312610, 3, 6000),
|
||||
(1312404, 3, 6000),
|
||||
(1312639, 3, 6000),
|
||||
(437848, 1, 2000)
|
||||
]
|
||||
);
|
||||
|
||||
@@ -712,7 +712,7 @@ mod tests {
|
||||
// files are smaller than the size threshold, but they took too long to fill so were flushed early
|
||||
assert_eq!(
|
||||
file_stats,
|
||||
[(659836, 2, 3001), (659550, 2, 3000), (659346, 2, 2999)]
|
||||
[(657696, 2, 3001), (657410, 2, 3000), (657206, 2, 2999)]
|
||||
);
|
||||
|
||||
tmpdir.close().unwrap();
|
||||
|
||||
@@ -18,8 +18,7 @@ Prerequisites:
|
||||
|
||||
Regression tests are in the 'regress' directory. They can be run in
|
||||
parallel to minimize total runtime. Most regression test sets up their
|
||||
environment with its own pageservers and safekeepers (but see
|
||||
`TEST_SHARED_FIXTURES`).
|
||||
environment with its own pageservers and safekeepers.
|
||||
|
||||
'pg_clients' contains tests for connecting with various client
|
||||
libraries. Each client test uses a Dockerfile that pulls an image that
|
||||
@@ -74,7 +73,6 @@ This is used to construct full path to the postgres binaries.
|
||||
Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION=16`
|
||||
`TEST_OUTPUT`: Set the directory where test state and test output files
|
||||
should go.
|
||||
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.
|
||||
`RUST_LOG`: logging configuration to pass into Neon CLI
|
||||
|
||||
Useful parameters and commands:
|
||||
@@ -259,11 +257,8 @@ compute Postgres nodes. The connections between them can be configured to use JW
|
||||
authentication tokens, and some other configuration options can be tweaked too.
|
||||
|
||||
The easiest way to get access to a Neon Environment is by using the `neon_simple_env`
|
||||
fixture. The 'simple' env may be shared across multiple tests, so don't shut down the nodes
|
||||
or make other destructive changes in that environment. Also don't assume that
|
||||
there are no tenants or branches or data in the cluster. For convenience, there is a
|
||||
branch called `empty`, though. The convention is to create a test-specific branch of
|
||||
that and load any test data there, instead of the 'main' branch.
|
||||
fixture. For convenience, there is a branch called `main` in environments created with
|
||||
'neon_simple_env', ready to be used in the test.
|
||||
|
||||
For more complicated cases, you can build a custom Neon Environment, with the `neon_env`
|
||||
fixture:
|
||||
|
||||
@@ -10,4 +10,8 @@ pytest_plugins = (
|
||||
"fixtures.compare_fixtures",
|
||||
"fixtures.slow",
|
||||
"fixtures.flaky",
|
||||
"fixtures.shared_fixtures",
|
||||
"fixtures.function.neon_storage",
|
||||
"fixtures.session.neon_storage",
|
||||
"fixtures.session.s3",
|
||||
)
|
||||
|
||||
0
test_runner/fixtures/function/__init__.py
Normal file
0
test_runner/fixtures/function/__init__.py
Normal file
48
test_runner/fixtures/function/neon_storage.py
Normal file
48
test_runner/fixtures/function/neon_storage.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from typing import Iterator, Optional, Dict, Any
|
||||
|
||||
import pytest
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.shared_fixtures import TTenant, TTimeline
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def tenant_config() -> Dict[str, Any]:
|
||||
return dict()
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def tenant(
|
||||
neon_shared_env: NeonEnv,
|
||||
request: FixtureRequest,
|
||||
tenant_config: Optional[Dict[str, Any]] = None,
|
||||
) -> Iterator[TTenant]:
|
||||
tenant = TTenant(env=neon_shared_env, name=request.node.name, config=tenant_config)
|
||||
yield tenant
|
||||
tenant.shutdown_resources()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def timeline(
|
||||
tenant: TTenant,
|
||||
) -> Iterator[TTimeline]:
|
||||
timeline = tenant.default_timeline
|
||||
yield timeline
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def exclusive_tenant(
|
||||
neon_env: NeonEnv,
|
||||
request: FixtureRequest,
|
||||
tenant_config: Optional[Dict[str, Any]] = None,
|
||||
) -> Iterator[TTenant]:
|
||||
tenant = TTenant(env=neon_env, name=request.node.name, config=tenant_config)
|
||||
yield tenant
|
||||
tenant.shutdown_resources()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def exclusive_timeline(
|
||||
exclusive_tenant: TTenant,
|
||||
) -> Iterator[TTimeline]:
|
||||
timeline = exclusive_tenant.default_timeline
|
||||
yield timeline
|
||||
@@ -57,7 +57,6 @@ from _pytest.fixtures import FixtureRequest
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
from psycopg2.extensions import cursor as PgCursor
|
||||
from psycopg2.extensions import make_dsn, parse_dsn
|
||||
from typing_extensions import Literal
|
||||
from urllib3.util.retry import Retry
|
||||
|
||||
from fixtures import overlayfs
|
||||
@@ -94,7 +93,6 @@ from fixtures.utils import (
|
||||
allure_add_grafana_links,
|
||||
allure_attach_from_dir,
|
||||
assert_no_errors,
|
||||
get_self_dir,
|
||||
print_gc_result,
|
||||
subprocess_capture,
|
||||
wait_until,
|
||||
@@ -127,149 +125,6 @@ Env = Dict[str, str]
|
||||
DEFAULT_OUTPUT_DIR: str = "test_output"
|
||||
DEFAULT_BRANCH_NAME: str = "main"
|
||||
|
||||
BASE_PORT: int = 15000
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def base_dir() -> Iterator[Path]:
|
||||
# find the base directory (currently this is the git root)
|
||||
base_dir = get_self_dir().parent.parent
|
||||
log.info(f"base_dir is {base_dir}")
|
||||
|
||||
yield base_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# we are in remote env and do not have neon binaries locally
|
||||
# this is the case for benchmarks run on self-hosted runner
|
||||
return
|
||||
|
||||
# Find the neon binaries.
|
||||
if env_neon_bin := os.environ.get("NEON_BIN"):
|
||||
binpath = Path(env_neon_bin)
|
||||
else:
|
||||
binpath = base_dir / "target" / build_type
|
||||
log.info(f"neon_binpath is {binpath}")
|
||||
|
||||
if not (binpath / "pageserver").exists():
|
||||
raise Exception(f"neon binaries not found at '{binpath}'")
|
||||
|
||||
yield binpath
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
|
||||
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
|
||||
distrib_dir = Path(env_postgres_bin).resolve()
|
||||
else:
|
||||
distrib_dir = base_dir / "pg_install"
|
||||
|
||||
log.info(f"pg_distrib_dir is {distrib_dir}")
|
||||
yield distrib_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def top_output_dir(base_dir: Path) -> Iterator[Path]:
|
||||
# Compute the top-level directory for all tests.
|
||||
if env_test_output := os.environ.get("TEST_OUTPUT"):
|
||||
output_dir = Path(env_test_output).resolve()
|
||||
else:
|
||||
output_dir = base_dir / DEFAULT_OUTPUT_DIR
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
|
||||
log.info(f"top_output_dir is {output_dir}")
|
||||
yield output_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
|
||||
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
|
||||
|
||||
psql_bin_path = versioned_dir / "bin/psql"
|
||||
postgres_bin_path = versioned_dir / "bin/postgres"
|
||||
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# When testing against a remote server, we only need the client binary.
|
||||
if not psql_bin_path.exists():
|
||||
raise Exception(f"psql not found at '{psql_bin_path}'")
|
||||
else:
|
||||
if not postgres_bin_path.exists():
|
||||
raise Exception(f"postgres not found at '{postgres_bin_path}'")
|
||||
|
||||
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
|
||||
yield versioned_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_key() -> str:
|
||||
api_key = os.getenv("NEON_API_KEY")
|
||||
if not api_key:
|
||||
raise AssertionError("Set the NEON_API_KEY environment variable")
|
||||
|
||||
return api_key
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_base_url() -> str:
|
||||
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
|
||||
return NeonAPI(neon_api_key, neon_api_base_url)
|
||||
|
||||
|
||||
def shareable_scope(fixture_name: str, config: Config) -> Literal["session", "function"]:
|
||||
"""Return either session of function scope, depending on TEST_SHARED_FIXTURES envvar.
|
||||
|
||||
This function can be used as a scope like this:
|
||||
@pytest.fixture(scope=shareable_scope)
|
||||
def myfixture(...)
|
||||
...
|
||||
"""
|
||||
scope: Literal["session", "function"]
|
||||
|
||||
if os.environ.get("TEST_SHARED_FIXTURES") is None:
|
||||
# Create the environment in the per-test output directory
|
||||
scope = "function"
|
||||
elif (
|
||||
os.environ.get("BUILD_TYPE") is not None
|
||||
and os.environ.get("DEFAULT_PG_VERSION") is not None
|
||||
):
|
||||
scope = "session"
|
||||
else:
|
||||
pytest.fail(
|
||||
"Shared environment(TEST_SHARED_FIXTURES) requires BUILD_TYPE and DEFAULT_PG_VERSION to be set",
|
||||
pytrace=False,
|
||||
)
|
||||
|
||||
return scope
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_port_num():
|
||||
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_seq_no(worker_id: str) -> int:
|
||||
# worker_id is a pytest-xdist fixture
|
||||
# it can be master or gw<number>
|
||||
# parse it to always get a number
|
||||
if worker_id == "master":
|
||||
return 0
|
||||
assert worker_id.startswith("gw")
|
||||
return int(worker_id[2:])
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int:
|
||||
# so we divide ports in ranges of ports
|
||||
# so workers have disjoint set of ports for services
|
||||
return BASE_PORT + worker_seq_no * worker_port_num
|
||||
|
||||
|
||||
def get_dir_size(path: str) -> int:
|
||||
"""Return size in bytes."""
|
||||
@@ -281,11 +136,6 @@ def get_dir_size(path: str) -> int:
|
||||
return totalbytes
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor:
|
||||
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def default_broker(
|
||||
port_distributor: PortDistributor,
|
||||
@@ -301,18 +151,6 @@ def default_broker(
|
||||
broker.stop()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def run_id() -> Iterator[uuid.UUID]:
|
||||
yield uuid.uuid4()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
|
||||
mock_s3_server = MockS3Server(port_distributor.get_port())
|
||||
yield mock_s3_server
|
||||
mock_s3_server.kill()
|
||||
|
||||
|
||||
class PgProtocol:
|
||||
"""Reusable connection logic"""
|
||||
|
||||
@@ -512,6 +350,7 @@ class NeonEnvBuilder:
|
||||
safekeeper_extra_opts: Optional[list[str]] = None,
|
||||
storage_controller_port_override: Optional[int] = None,
|
||||
pageserver_io_buffer_alignment: Optional[int] = None,
|
||||
shared: Optional[bool] = False,
|
||||
):
|
||||
self.repo_dir = repo_dir
|
||||
self.rust_log_override = rust_log_override
|
||||
@@ -568,8 +407,8 @@ class NeonEnvBuilder:
|
||||
|
||||
self.pageserver_io_buffer_alignment = pageserver_io_buffer_alignment
|
||||
|
||||
assert test_name.startswith(
|
||||
"test_"
|
||||
assert (
|
||||
test_name.startswith("test_") or shared
|
||||
), "Unexpectedly instantiated from outside a test function"
|
||||
self.test_name = test_name
|
||||
|
||||
@@ -1431,8 +1270,8 @@ class NeonEnv:
|
||||
return "ep-" + str(self.endpoint_counter)
|
||||
|
||||
|
||||
@pytest.fixture(scope=shareable_scope)
|
||||
def _shared_simple_env(
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_simple_env(
|
||||
request: FixtureRequest,
|
||||
pytestconfig: Config,
|
||||
port_distributor: PortDistributor,
|
||||
@@ -1450,19 +1289,13 @@ def _shared_simple_env(
|
||||
pageserver_io_buffer_alignment: Optional[int],
|
||||
) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
|
||||
is set, this is shared by all tests using `neon_simple_env`.
|
||||
Simple Neon environment, with no authentication and no safekeepers.
|
||||
|
||||
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
|
||||
"""
|
||||
|
||||
if os.environ.get("TEST_SHARED_FIXTURES") is None:
|
||||
# Create the environment in the per-test output directory
|
||||
repo_dir = get_test_repo_dir(request, top_output_dir)
|
||||
else:
|
||||
# We're running shared fixtures. Share a single directory.
|
||||
repo_dir = top_output_dir / "shared_repo"
|
||||
shutil.rmtree(repo_dir, ignore_errors=True)
|
||||
# Create the environment in the per-test output directory
|
||||
repo_dir = get_test_repo_dir(request, top_output_dir)
|
||||
|
||||
with NeonEnvBuilder(
|
||||
top_output_dir=top_output_dir,
|
||||
@@ -1484,25 +1317,22 @@ def _shared_simple_env(
|
||||
) as builder:
|
||||
env = builder.init_start()
|
||||
|
||||
# For convenience in tests, create a branch from the freshly-initialized cluster.
|
||||
env.neon_cli.create_branch("empty", ancestor_branch_name=DEFAULT_BRANCH_NAME)
|
||||
|
||||
yield env
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def neon_simple_env(_shared_simple_env: NeonEnv) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
Simple Neon environment, with no authentication and no safekeepers.
|
||||
def neon_endpoint(request: FixtureRequest, neon_shared_env: NeonEnv) -> Endpoint:
|
||||
neon_shared_env.neon_cli.create_branch(request.node.name)
|
||||
ep = neon_shared_env.endpoints.create_start(request.node.name)
|
||||
|
||||
If TEST_SHARED_FIXTURES environment variable is set, we reuse the same
|
||||
environment for all tests that use 'neon_simple_env', keeping the
|
||||
page server and safekeepers running. Any compute nodes are stopped after
|
||||
each the test, however.
|
||||
"""
|
||||
yield _shared_simple_env
|
||||
|
||||
_shared_simple_env.endpoints.stop_all()
|
||||
try:
|
||||
yield ep
|
||||
finally:
|
||||
if ep.is_running():
|
||||
try:
|
||||
ep.stop()
|
||||
except BaseException:
|
||||
pass
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
@@ -4860,27 +4690,35 @@ def _get_test_dir(request: FixtureRequest, top_output_dir: Path, prefix: str) ->
|
||||
return test_dir
|
||||
|
||||
|
||||
def get_test_output_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
|
||||
def get_test_output_dir(
|
||||
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
|
||||
) -> Path:
|
||||
"""
|
||||
The working directory for a test.
|
||||
"""
|
||||
return _get_test_dir(request, top_output_dir, "")
|
||||
return _get_test_dir(request, top_output_dir, prefix or "")
|
||||
|
||||
|
||||
def get_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
|
||||
def get_test_overlay_dir(
|
||||
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
|
||||
) -> Path:
|
||||
"""
|
||||
Directory that contains `upperdir` and `workdir` for overlayfs mounts
|
||||
that a test creates. See `NeonEnvBuilder.overlay_mount`.
|
||||
"""
|
||||
return _get_test_dir(request, top_output_dir, "overlay-")
|
||||
return _get_test_dir(request, top_output_dir, f"overlay-{prefix or ''}")
|
||||
|
||||
|
||||
def get_shared_snapshot_dir_path(top_output_dir: Path, snapshot_name: str) -> Path:
|
||||
return top_output_dir / "shared-snapshots" / snapshot_name
|
||||
def get_shared_snapshot_dir_path(
|
||||
top_output_dir: Path, snapshot_name: str, prefix: Optional[str] = None
|
||||
) -> Path:
|
||||
return top_output_dir / f"{prefix or ''}shared-snapshots" / snapshot_name
|
||||
|
||||
|
||||
def get_test_repo_dir(request: FixtureRequest, top_output_dir: Path) -> Path:
|
||||
return get_test_output_dir(request, top_output_dir) / "repo"
|
||||
def get_test_repo_dir(
|
||||
request: FixtureRequest, top_output_dir: Path, prefix: Optional[str] = None
|
||||
) -> Path:
|
||||
return get_test_output_dir(request, top_output_dir, prefix or "") / "repo"
|
||||
|
||||
|
||||
def pytest_addoption(parser: Parser):
|
||||
@@ -4898,14 +4736,7 @@ SMALL_DB_FILE_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
|
||||
|
||||
|
||||
# This is autouse, so the test output directory always gets created, even
|
||||
# if a test doesn't put anything there. It also solves a problem with the
|
||||
# neon_simple_env fixture: if TEST_SHARED_FIXTURES is not set, it
|
||||
# creates the repo in the test output directory. But it cannot depend on
|
||||
# 'test_output_dir' fixture, because when TEST_SHARED_FIXTURES is not set,
|
||||
# it has 'session' scope and cannot access fixtures with 'function'
|
||||
# scope. So it uses the get_test_output_dir() function to get the path, and
|
||||
# this fixture ensures that the directory exists. That works because
|
||||
# 'autouse' fixtures are run before other fixtures.
|
||||
# if a test doesn't put anything there.
|
||||
#
|
||||
# NB: we request the overlay dir fixture so the fixture does its cleanups
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
@@ -5252,7 +5083,7 @@ def tenant_get_shards(
|
||||
return [(TenantShardId(tenant_id, 0, 0), override_pageserver or env.pageserver)]
|
||||
|
||||
|
||||
def wait_replica_caughtup(primary: Endpoint, secondary: Endpoint):
|
||||
def wait_replica_caughtup(primary: PgProtocol, secondary: PgProtocol):
|
||||
primary_lsn = Lsn(
|
||||
primary.safe_psql_scalar("SELECT pg_current_wal_flush_lsn()", log_query=False)
|
||||
)
|
||||
|
||||
@@ -14,32 +14,60 @@ Dynamically parametrize tests by different parameters
|
||||
"""
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def pg_version() -> Optional[PgVersion]:
|
||||
return None
|
||||
def get_pgversions():
|
||||
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
|
||||
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
|
||||
else:
|
||||
pg_versions = [PgVersion(v)]
|
||||
|
||||
return pg_versions
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def build_type() -> Optional[str]:
|
||||
return None
|
||||
@pytest.fixture(
|
||||
scope="session",
|
||||
autouse=True,
|
||||
params=get_pgversions(),
|
||||
ids=lambda v: f"pg{v}",
|
||||
)
|
||||
def pg_version(request) -> Optional[PgVersion]:
|
||||
return request.param
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
def get_buildtypes():
|
||||
if (bt := os.getenv("BUILD_TYPE")) is None:
|
||||
build_types = ["debug", "release"]
|
||||
else:
|
||||
build_types = [bt.lower()]
|
||||
|
||||
return build_types
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope="session",
|
||||
autouse=True,
|
||||
params=get_buildtypes(),
|
||||
ids=lambda t: f"{t}",
|
||||
)
|
||||
def build_type(request) -> Optional[str]:
|
||||
return request.param
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def platform() -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def pageserver_virtual_file_io_engine() -> Optional[str]:
|
||||
return os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE")
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def pageserver_io_buffer_alignment() -> Optional[int]:
|
||||
return None
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def pageserver_aux_file_policy() -> Optional[AuxFileStore]:
|
||||
return None
|
||||
|
||||
@@ -53,26 +81,12 @@ def get_pageserver_default_tenant_config_compaction_algorithm() -> Optional[Dict
|
||||
return v
|
||||
|
||||
|
||||
@pytest.fixture(scope="function", autouse=True)
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def pageserver_default_tenant_config_compaction_algorithm() -> Optional[Dict[str, Any]]:
|
||||
return get_pageserver_default_tenant_config_compaction_algorithm()
|
||||
|
||||
|
||||
def pytest_generate_tests(metafunc: Metafunc):
|
||||
if (bt := os.getenv("BUILD_TYPE")) is None:
|
||||
build_types = ["debug", "release"]
|
||||
else:
|
||||
build_types = [bt.lower()]
|
||||
|
||||
metafunc.parametrize("build_type", build_types)
|
||||
|
||||
if (v := os.getenv("DEFAULT_PG_VERSION")) is None:
|
||||
pg_versions = [version for version in PgVersion if version != PgVersion.NOT_SET]
|
||||
else:
|
||||
pg_versions = [PgVersion(v)]
|
||||
|
||||
metafunc.parametrize("pg_version", pg_versions, ids=map(lambda v: f"pg{v}", pg_versions))
|
||||
|
||||
# A hacky way to parametrize tests only for `pageserver_virtual_file_io_engine=std-fs`
|
||||
# And do not change test name for default `pageserver_virtual_file_io_engine=tokio-epoll-uring` to keep tests statistics
|
||||
if (io_engine := os.getenv("PAGESERVER_VIRTUAL_FILE_IO_ENGINE", "")) not in (
|
||||
@@ -89,6 +103,7 @@ def pytest_generate_tests(metafunc: Metafunc):
|
||||
"pageserver_default_tenant_config_compaction_algorithm",
|
||||
[explicit_default],
|
||||
ids=[explicit_default["kind"]],
|
||||
scope="session",
|
||||
)
|
||||
|
||||
# For performance tests, parametrize also by platform
|
||||
|
||||
0
test_runner/fixtures/session/__init__.py
Normal file
0
test_runner/fixtures/session/__init__.py
Normal file
303
test_runner/fixtures/session/neon_storage.py
Normal file
303
test_runner/fixtures/session/neon_storage.py
Normal file
@@ -0,0 +1,303 @@
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterator, Optional, cast
|
||||
|
||||
import pytest
|
||||
from _pytest.config import Config
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
|
||||
from fixtures import overlayfs
|
||||
from fixtures.broker import NeonBroker
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_api import NeonAPI
|
||||
from fixtures.neon_fixtures import (
|
||||
DEFAULT_OUTPUT_DIR,
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
get_test_output_dir,
|
||||
get_test_overlay_dir,
|
||||
get_test_repo_dir,
|
||||
)
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import MockS3Server
|
||||
from fixtures.utils import AuxFileStore, allure_attach_from_dir, get_self_dir
|
||||
|
||||
BASE_PORT: int = 15000
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def base_dir() -> Iterator[Path]:
|
||||
# find the base directory (currently this is the git root)
|
||||
base_dir = get_self_dir().parent.parent
|
||||
log.info(f"base_dir is {base_dir}")
|
||||
|
||||
yield base_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_binpath(base_dir: Path, build_type: str) -> Iterator[Path]:
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# we are in remote env and do not have neon binaries locally
|
||||
# this is the case for benchmarks run on self-hosted runner
|
||||
return
|
||||
|
||||
# Find the neon binaries.
|
||||
if env_neon_bin := os.environ.get("NEON_BIN"):
|
||||
binpath = Path(env_neon_bin)
|
||||
else:
|
||||
binpath = base_dir / "target" / build_type
|
||||
log.info(f"neon_binpath is {binpath}")
|
||||
|
||||
if not (binpath / "pageserver").exists():
|
||||
raise Exception(f"neon binaries not found at '{binpath}'")
|
||||
|
||||
yield binpath
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def pg_distrib_dir(base_dir: Path) -> Iterator[Path]:
|
||||
if env_postgres_bin := os.environ.get("POSTGRES_DISTRIB_DIR"):
|
||||
distrib_dir = Path(env_postgres_bin).resolve()
|
||||
else:
|
||||
distrib_dir = base_dir / "pg_install"
|
||||
|
||||
log.info(f"pg_distrib_dir is {distrib_dir}")
|
||||
yield distrib_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def top_output_dir(base_dir: Path) -> Iterator[Path]:
|
||||
# Compute the top-level directory for all tests.
|
||||
if env_test_output := os.environ.get("TEST_OUTPUT"):
|
||||
output_dir = Path(env_test_output).resolve()
|
||||
else:
|
||||
output_dir = base_dir / DEFAULT_OUTPUT_DIR
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
|
||||
log.info(f"top_output_dir is {output_dir}")
|
||||
yield output_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def versioned_pg_distrib_dir(pg_distrib_dir: Path, pg_version: PgVersion) -> Iterator[Path]:
|
||||
versioned_dir = pg_distrib_dir / pg_version.v_prefixed
|
||||
|
||||
psql_bin_path = versioned_dir / "bin/psql"
|
||||
postgres_bin_path = versioned_dir / "bin/postgres"
|
||||
|
||||
if os.getenv("REMOTE_ENV"):
|
||||
# When testing against a remote server, we only need the client binary.
|
||||
if not psql_bin_path.exists():
|
||||
raise Exception(f"psql not found at '{psql_bin_path}'")
|
||||
else:
|
||||
if not postgres_bin_path.exists():
|
||||
raise Exception(f"postgres not found at '{postgres_bin_path}'")
|
||||
|
||||
log.info(f"versioned_pg_distrib_dir is {versioned_dir}")
|
||||
yield versioned_dir
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_key() -> str:
|
||||
api_key = os.getenv("NEON_API_KEY")
|
||||
if not api_key:
|
||||
raise AssertionError("Set the NEON_API_KEY environment variable")
|
||||
|
||||
return api_key
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api_base_url() -> str:
|
||||
return os.getenv("NEON_API_BASE_URL", "https://console-stage.neon.build/api/v2")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
|
||||
return NeonAPI(neon_api_key, neon_api_base_url)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_port_num():
|
||||
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_seq_no(worker_id: str) -> int:
|
||||
# worker_id is a pytest-xdist fixture
|
||||
# it can be master or gw<number>
|
||||
# parse it to always get a number
|
||||
if worker_id == "master":
|
||||
return 0
|
||||
assert worker_id.startswith("gw")
|
||||
return int(worker_id[2:])
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def worker_base_port(worker_seq_no: int, worker_port_num: int) -> int:
|
||||
# so we divide ports in ranges of ports
|
||||
# so workers have disjoint set of ports for services
|
||||
return BASE_PORT + worker_seq_no * worker_port_num
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def port_distributor(worker_base_port: int, worker_port_num: int) -> PortDistributor:
|
||||
return PortDistributor(base_port=worker_base_port, port_number=worker_port_num)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_broker(
|
||||
port_distributor: PortDistributor,
|
||||
shared_test_output_dir: Path,
|
||||
neon_binpath: Path,
|
||||
) -> Iterator[NeonBroker]:
|
||||
# multiple pytest sessions could get launched in parallel, get them different ports/datadirs
|
||||
client_port = port_distributor.get_port()
|
||||
broker_logfile = shared_test_output_dir / "repo" / "storage_broker.log"
|
||||
|
||||
broker = NeonBroker(logfile=broker_logfile, port=client_port, neon_binpath=neon_binpath)
|
||||
yield broker
|
||||
broker.stop()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def run_id() -> Iterator[uuid.UUID]:
|
||||
yield uuid.uuid4()
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def neon_shared_env(
|
||||
pytestconfig: Config,
|
||||
port_distributor: PortDistributor,
|
||||
mock_s3_server: MockS3Server,
|
||||
shared_broker: NeonBroker,
|
||||
run_id: uuid.UUID,
|
||||
top_output_dir: Path,
|
||||
shared_test_output_dir: Path,
|
||||
neon_binpath: Path,
|
||||
build_type: str,
|
||||
pg_distrib_dir: Path,
|
||||
pg_version: PgVersion,
|
||||
pageserver_virtual_file_io_engine: str,
|
||||
pageserver_aux_file_policy: Optional[AuxFileStore],
|
||||
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
|
||||
pageserver_io_buffer_alignment: Optional[int],
|
||||
request: FixtureRequest,
|
||||
) -> Iterator[NeonEnv]:
|
||||
"""
|
||||
Simple Neon environment, with no authentication and no safekeepers.
|
||||
|
||||
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
|
||||
"""
|
||||
|
||||
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
|
||||
|
||||
# Create the environment in the per-test output directory
|
||||
repo_dir = get_test_repo_dir(request, top_output_dir, prefix)
|
||||
|
||||
with NeonEnvBuilder(
|
||||
top_output_dir=top_output_dir,
|
||||
repo_dir=repo_dir,
|
||||
port_distributor=port_distributor,
|
||||
broker=shared_broker,
|
||||
mock_s3_server=mock_s3_server,
|
||||
neon_binpath=neon_binpath,
|
||||
pg_distrib_dir=pg_distrib_dir,
|
||||
pg_version=pg_version,
|
||||
run_id=run_id,
|
||||
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
|
||||
test_name=f"{prefix}{request.node.name}",
|
||||
test_output_dir=shared_test_output_dir,
|
||||
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
|
||||
pageserver_aux_file_policy=pageserver_aux_file_policy,
|
||||
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
|
||||
pageserver_io_buffer_alignment=pageserver_io_buffer_alignment,
|
||||
shared=True,
|
||||
) as builder:
|
||||
env = builder.init_start()
|
||||
|
||||
yield env
|
||||
|
||||
|
||||
# This is autouse, so the test output directory always gets created, even
|
||||
# if a test doesn't put anything there.
|
||||
#
|
||||
# NB: we request the overlay dir fixture so the fixture does its cleanups
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_test_output_dir(
|
||||
request: FixtureRequest,
|
||||
pg_version: PgVersion,
|
||||
build_type: str,
|
||||
top_output_dir: Path,
|
||||
shared_test_overlay_dir: Path,
|
||||
) -> Iterator[Path]:
|
||||
"""Create the working directory for shared tests."""
|
||||
|
||||
prefix = f"shared[{build_type}-{pg_version.v_prefixed}]-"
|
||||
|
||||
# one directory per test
|
||||
test_dir = get_test_output_dir(request, top_output_dir, prefix)
|
||||
|
||||
log.info(f"test_output_dir is {test_dir}")
|
||||
shutil.rmtree(test_dir, ignore_errors=True)
|
||||
test_dir.mkdir()
|
||||
|
||||
yield test_dir
|
||||
|
||||
# Allure artifacts creation might involve the creation of `.tar.zst` archives,
|
||||
# which aren't going to be used if Allure results collection is not enabled
|
||||
# (i.e. --alluredir is not set).
|
||||
# Skip `allure_attach_from_dir` in this case
|
||||
if not request.config.getoption("--alluredir"):
|
||||
return
|
||||
|
||||
preserve_database_files = False
|
||||
for k, v in request.node.user_properties:
|
||||
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).
|
||||
# So, neon_env_builder's cleanup runs before here.
|
||||
# The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property.
|
||||
if k == "preserve_database_files":
|
||||
assert isinstance(v, bool)
|
||||
preserve_database_files = v
|
||||
|
||||
allure_attach_from_dir(test_dir, preserve_database_files)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_test_overlay_dir(request: FixtureRequest, top_output_dir: Path) -> Optional[Path]:
|
||||
"""
|
||||
Idempotently create a test's overlayfs mount state directory.
|
||||
If the functionality isn't enabled via env var, returns None.
|
||||
|
||||
The procedure cleans up after previous runs that were aborted (e.g. due to Ctrl-C, OOM kills, etc).
|
||||
"""
|
||||
|
||||
if os.getenv("NEON_ENV_BUILDER_USE_OVERLAYFS_FOR_SNAPSHOTS") is None:
|
||||
return None
|
||||
|
||||
overlay_dir = get_test_overlay_dir(request, top_output_dir)
|
||||
log.info(f"test_overlay_dir is {overlay_dir}")
|
||||
|
||||
overlay_dir.mkdir(exist_ok=True)
|
||||
# unmount stale overlayfs mounts which subdirectories of `overlay_dir/*` as the overlayfs `upperdir` and `workdir`
|
||||
for mountpoint in overlayfs.iter_mounts_beneath(get_test_output_dir(request, top_output_dir)):
|
||||
cmd = ["sudo", "umount", str(mountpoint)]
|
||||
log.info(
|
||||
f"Unmounting stale overlayfs mount probably created during earlier test run: {cmd}"
|
||||
)
|
||||
subprocess.run(cmd, capture_output=True, check=True)
|
||||
# the overlayfs `workdir`` is owned by `root`, shutil.rmtree won't work.
|
||||
cmd = ["sudo", "rm", "-rf", str(overlay_dir)]
|
||||
subprocess.run(cmd, capture_output=True, check=True)
|
||||
|
||||
overlay_dir.mkdir()
|
||||
|
||||
return overlay_dir
|
||||
|
||||
# no need to clean up anything: on clean shutdown,
|
||||
# NeonEnvBuilder.overlay_cleanup_teardown takes care of cleanup
|
||||
# and on unclean shutdown, this function will take care of it
|
||||
# on the next test run
|
||||
13
test_runner/fixtures/session/s3.py
Normal file
13
test_runner/fixtures/session/s3.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from typing import Iterator
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures.port_distributor import PortDistributor
|
||||
from fixtures.remote_storage import MockS3Server
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def mock_s3_server(port_distributor: PortDistributor) -> Iterator[MockS3Server]:
|
||||
mock_s3_server = MockS3Server(port_distributor.get_port())
|
||||
yield mock_s3_server
|
||||
mock_s3_server.kill()
|
||||
348
test_runner/fixtures/shared_fixtures.py
Normal file
348
test_runner/fixtures/shared_fixtures.py
Normal file
@@ -0,0 +1,348 @@
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional, cast, List, Dict
|
||||
|
||||
import pytest
|
||||
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import Endpoint, NeonEnv, PgProtocol, DEFAULT_BRANCH_NAME, tenant_get_shards, \
|
||||
check_restored_datadir_content
|
||||
from fixtures.pageserver.utils import wait_for_last_record_lsn
|
||||
from fixtures.safekeeper.utils import are_walreceivers_absent
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
"""
|
||||
In this file most important resources are exposed as function-level fixtures
|
||||
that depend on session-level resources like pageservers and safekeepers.
|
||||
|
||||
The main rationale here is that we don't need to initialize a new SK/PS/etc
|
||||
every time we want to test something that has branching: we can just as well
|
||||
reuse still-available PageServers from other tests of the same kind.
|
||||
"""
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def shared_storage_repo_path(build_type: str, base_dir: Path) -> Path:
|
||||
return base_dir / f"shared[{build_type}]" / "repo"
|
||||
|
||||
|
||||
class TEndpoint(PgProtocol):
|
||||
def clear_shared_buffers(self, cursor: Optional[Any] = None):
|
||||
"""
|
||||
Best-effort way to clear postgres buffers. Pinned buffers will not be 'cleared.'
|
||||
|
||||
Might also clear LFC.
|
||||
"""
|
||||
if cursor is not None:
|
||||
cursor.execute("select clear_buffer_cache()")
|
||||
else:
|
||||
self.safe_psql("select clear_buffer_cache()")
|
||||
|
||||
_TEndpoint = Endpoint
|
||||
|
||||
|
||||
class TTimeline:
|
||||
__primary: Optional[_TEndpoint]
|
||||
__secondary: Optional[_TEndpoint]
|
||||
|
||||
def __init__(self, timeline_id: TimelineId, tenant: "TTenant", name: str):
|
||||
self.id = timeline_id
|
||||
self.tenant = tenant
|
||||
self.name = name
|
||||
self.__primary = None
|
||||
self.__secondary = None
|
||||
|
||||
@property
|
||||
def primary(self) -> TEndpoint:
|
||||
if self.__primary is None:
|
||||
self.__primary = cast(_TEndpoint, self.tenant.create_endpoint(
|
||||
name=self._get_full_endpoint_name("primary"),
|
||||
timeline_id=self.id,
|
||||
primary=True,
|
||||
running=True,
|
||||
))
|
||||
|
||||
return cast(TEndpoint, self.__primary)
|
||||
|
||||
def primary_with_config(self, config_lines: List[str], reboot: bool = False):
|
||||
if self.__primary is None:
|
||||
self.__primary = cast(_TEndpoint, self.tenant.create_endpoint(
|
||||
name=self._get_full_endpoint_name("primary"),
|
||||
timeline_id=self.id,
|
||||
primary=True,
|
||||
running=True,
|
||||
config_lines=config_lines,
|
||||
))
|
||||
else:
|
||||
self.__primary.config(config_lines)
|
||||
if reboot:
|
||||
if self.__primary.is_running():
|
||||
self.__primary.stop()
|
||||
self.__primary.start()
|
||||
|
||||
return cast(TTimeline, self.__primary)
|
||||
|
||||
@property
|
||||
def secondary(self) -> TEndpoint:
|
||||
if self.__secondary is None:
|
||||
self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint(
|
||||
name=self._get_full_endpoint_name("secondary"),
|
||||
timeline_id=self.id,
|
||||
primary=False,
|
||||
running=True,
|
||||
))
|
||||
|
||||
return cast(TEndpoint, self.__secondary)
|
||||
|
||||
def secondary_with_config(self, config_lines: List[str], reboot: bool = False) -> TEndpoint:
|
||||
if self.__secondary is None:
|
||||
self.__secondary = cast(_TEndpoint, self.tenant.create_endpoint(
|
||||
name=self._get_full_endpoint_name("secondary"),
|
||||
timeline_id=self.id,
|
||||
primary=False,
|
||||
running=True,
|
||||
config_lines=config_lines,
|
||||
))
|
||||
else:
|
||||
self.__secondary.config(config_lines)
|
||||
if reboot:
|
||||
if self.__secondary.is_running():
|
||||
self.__secondary.stop()
|
||||
self.__secondary.start()
|
||||
|
||||
return cast(TEndpoint, self.__secondary)
|
||||
|
||||
def _get_full_endpoint_name(self, name) -> str:
|
||||
return f"{self.name}.{name}"
|
||||
|
||||
def create_branch(self, name: str, lsn: Optional[Lsn]) -> "TTimeline":
|
||||
return self.tenant.create_timeline(
|
||||
new_name=name,
|
||||
parent_name=self.name,
|
||||
branch_at=lsn,
|
||||
)
|
||||
|
||||
def stop_and_flush(self, endpoint: TEndpoint):
|
||||
self.tenant.stop_endpoint(endpoint)
|
||||
self.tenant.flush_timeline_data(self)
|
||||
|
||||
def checkpoint(self, **kwargs):
|
||||
self.tenant.checkpoint_timeline(self, **kwargs)
|
||||
|
||||
|
||||
class TTenant:
|
||||
"""
|
||||
An object representing a single test case on a shared pageserver.
|
||||
All operations here are safe practically safe.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
env: NeonEnv,
|
||||
name: str,
|
||||
config: Optional[Dict[str, Any]] = None,
|
||||
):
|
||||
self.id = TenantId.generate()
|
||||
self.timelines = []
|
||||
self.timeline_names = {}
|
||||
self.timeline_ids = {}
|
||||
self.name = name
|
||||
|
||||
# publicly inaccessible stuff, used during shutdown
|
||||
self.__endpoints: List[Endpoint] = []
|
||||
self.__env: NeonEnv = env
|
||||
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id=self.id,
|
||||
set_default=False,
|
||||
conf=config
|
||||
)
|
||||
|
||||
self.first_timeline_id = env.neon_cli.create_branch(
|
||||
new_branch_name=f"{self.name}:{DEFAULT_BRANCH_NAME}",
|
||||
ancestor_branch_name=DEFAULT_BRANCH_NAME,
|
||||
tenant_id=self.id,
|
||||
)
|
||||
|
||||
self._add_timeline(
|
||||
TTimeline(
|
||||
timeline_id=self.first_timeline_id,
|
||||
tenant=self,
|
||||
name=DEFAULT_BRANCH_NAME,
|
||||
)
|
||||
)
|
||||
|
||||
def _add_timeline(self, timeline: TTimeline):
|
||||
assert timeline.tenant == self
|
||||
assert timeline not in self.timelines
|
||||
assert timeline.id is not None
|
||||
|
||||
self.timelines.append(timeline)
|
||||
self.timeline_ids[timeline.id] = timeline
|
||||
|
||||
if timeline.name is not None:
|
||||
self.timeline_names[timeline.name] = timeline
|
||||
|
||||
def create_timeline(
|
||||
self,
|
||||
new_name: str,
|
||||
parent_name: Optional[str] = None,
|
||||
parent_id: Optional[TimelineId] = None,
|
||||
branch_at: Optional[Lsn] = None,
|
||||
) -> TTimeline:
|
||||
if parent_name is not None:
|
||||
pass
|
||||
elif parent_id is not None:
|
||||
assert parent_name is None
|
||||
parent = self.timeline_ids[parent_id]
|
||||
parent_name = parent.name
|
||||
else:
|
||||
raise LookupError("Timeline creation requires parent by either ID or name")
|
||||
|
||||
assert parent_name is not None
|
||||
|
||||
new_id = self.__env.neon_cli.create_branch(
|
||||
new_branch_name=f"{self.name}:{new_name}",
|
||||
ancestor_branch_name=f"{self.name}:{parent_name}",
|
||||
tenant_id=self.id,
|
||||
ancestor_start_lsn=branch_at,
|
||||
)
|
||||
|
||||
new_tl = TTimeline(
|
||||
timeline_id=new_id,
|
||||
tenant=self,
|
||||
name=new_name,
|
||||
)
|
||||
self._add_timeline(new_tl)
|
||||
|
||||
return new_tl
|
||||
|
||||
@property
|
||||
def default_timeline(self) -> TTimeline:
|
||||
return self.timeline_ids.get(self.first_timeline_id)
|
||||
|
||||
def start_endpoint(self, ep: TEndpoint):
|
||||
if ep not in self.__endpoints:
|
||||
return
|
||||
|
||||
ep = cast(Endpoint, ep)
|
||||
|
||||
if not ep.is_running():
|
||||
ep.start()
|
||||
|
||||
def stop_endpoint(self, ep: TEndpoint, mode: str = "fast"):
|
||||
if ep not in self.__endpoints:
|
||||
return
|
||||
|
||||
ep = cast(Endpoint, ep)
|
||||
|
||||
if ep.is_running():
|
||||
ep.stop(mode=mode)
|
||||
|
||||
def create_endpoint(
|
||||
self,
|
||||
name: str,
|
||||
timeline_id: TimelineId,
|
||||
primary: bool = True,
|
||||
running: bool = False,
|
||||
port: Optional[int] = None,
|
||||
http_port: Optional[int] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
) -> TEndpoint:
|
||||
endpoint: _TEndpoint
|
||||
|
||||
if port is None:
|
||||
port = self.__env.port_distributor.get_port()
|
||||
|
||||
if http_port is None:
|
||||
http_port = self.__env.port_distributor.get_port()
|
||||
|
||||
if running:
|
||||
endpoint = self.__env.endpoints.create_start(
|
||||
branch_name=self.timeline_ids[timeline_id].name,
|
||||
endpoint_id=f"{self.name}.{name}",
|
||||
tenant_id=self.id,
|
||||
lsn=lsn,
|
||||
hot_standby=not primary,
|
||||
config_lines=config_lines,
|
||||
)
|
||||
else:
|
||||
endpoint = self.__env.endpoints.create(
|
||||
branch_name=self.timeline_ids[timeline_id].name,
|
||||
endpoint_id=f"{self.name}.{name}",
|
||||
tenant_id=self.id,
|
||||
lsn=lsn,
|
||||
hot_standby=not primary,
|
||||
config_lines=config_lines,
|
||||
)
|
||||
|
||||
self.__endpoints.append(endpoint)
|
||||
|
||||
return endpoint
|
||||
|
||||
def shutdown_resources(self):
|
||||
for ep in self.__endpoints:
|
||||
try:
|
||||
ep.stop_and_destroy("fast")
|
||||
except BaseException as e:
|
||||
log.error("Error encountered while shutting down endpoint %s", ep.endpoint_id, exc_info=e)
|
||||
|
||||
def reconfigure(self, endpoint: TEndpoint, lines: List[str], restart: bool):
|
||||
if endpoint not in self.__endpoints:
|
||||
return
|
||||
|
||||
endpoint = cast(_TEndpoint, endpoint)
|
||||
was_running = endpoint.is_running()
|
||||
if restart and was_running:
|
||||
endpoint.stop()
|
||||
|
||||
endpoint.config(lines)
|
||||
|
||||
if restart:
|
||||
endpoint.start()
|
||||
|
||||
def flush_timeline_data(self, timeline: TTimeline) -> Lsn:
|
||||
commit_lsn: Lsn = Lsn(0)
|
||||
# In principle in the absense of failures polling single sk would be enough.
|
||||
for sk in self.__env.safekeepers:
|
||||
cli = sk.http_client()
|
||||
# wait until compute connections are gone
|
||||
wait_until(30, 0.5, partial(are_walreceivers_absent, cli, self.id, timeline.id))
|
||||
commit_lsn = max(cli.get_commit_lsn(self.id, timeline.id), commit_lsn)
|
||||
|
||||
# Note: depending on WAL filtering implementation, probably most shards
|
||||
# won't be able to reach commit_lsn (unless gaps are also ack'ed), so this
|
||||
# is broken in sharded case.
|
||||
shards = tenant_get_shards(env=self.__env, tenant_id=self.id)
|
||||
for tenant_shard_id, pageserver in shards:
|
||||
log.info(
|
||||
f"flush_ep_to_pageserver: waiting for {commit_lsn} on shard {tenant_shard_id} on pageserver {pageserver.id})"
|
||||
)
|
||||
waited = wait_for_last_record_lsn(
|
||||
pageserver.http_client(), tenant_shard_id, timeline.id, commit_lsn
|
||||
)
|
||||
|
||||
assert waited >= commit_lsn
|
||||
|
||||
return commit_lsn
|
||||
|
||||
def checkpoint_timeline(self, timeline: TTimeline, **kwargs):
|
||||
self.__env.pageserver.http_client().timeline_checkpoint(
|
||||
tenant_id=self.id,
|
||||
timeline_id=timeline.id,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def pgdatadir(self, endpoint: TEndpoint):
|
||||
if endpoint not in self.__endpoints:
|
||||
return None
|
||||
|
||||
return cast(Endpoint, endpoint).pgdata_dir
|
||||
|
||||
def check_restored_datadir_content(self, path, endpoint: TEndpoint, *args, **kwargs):
|
||||
if endpoint not in self.__endpoints:
|
||||
return
|
||||
|
||||
check_restored_datadir_content(path, self.__env, cast(Endpoint, endpoint), *args, **kwargs)
|
||||
@@ -22,10 +22,8 @@ if TYPE_CHECKING:
|
||||
def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("test_logical_replication", "empty")
|
||||
endpoint = env.endpoints.create_start("test_logical_replication")
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
log.info("postgres is running on 'test_logical_replication' branch")
|
||||
pg_bin.run_capture(["pgbench", "-i", "-s10", endpoint.connstr()])
|
||||
|
||||
endpoint.safe_psql("create publication pub1 for table pgbench_accounts, pgbench_history")
|
||||
|
||||
@@ -8,11 +8,10 @@ from fixtures.neon_fixtures import NeonEnv
|
||||
#
|
||||
def test_basebackup_error(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_basebackup_error", "empty")
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
|
||||
# Introduce failpoint
|
||||
pageserver_http.configure_failpoints(("basebackup-before-control-file", "return"))
|
||||
|
||||
with pytest.raises(Exception, match="basebackup-before-control-file"):
|
||||
env.endpoints.create_start("test_basebackup_error")
|
||||
env.endpoints.create_start("main")
|
||||
|
||||
@@ -15,6 +15,7 @@ from fixtures.neon_fixtures import (
|
||||
)
|
||||
from fixtures.pageserver.http import PageserverApiException
|
||||
from fixtures.pageserver.utils import wait_until_tenant_active
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.utils import query_scalar
|
||||
from performance.test_perf_pgbench import get_scales_matrix
|
||||
from requests import RequestException
|
||||
@@ -115,13 +116,10 @@ def test_branching_with_pgbench(
|
||||
# This test checks if the pageserver is able to handle a "unnormalized" starting LSN.
|
||||
#
|
||||
# Related: see discussion in https://github.com/neondatabase/neon/pull/2143#issuecomment-1209092186
|
||||
def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
def test_branching_unnormalized_start_lsn(timeline: TTimeline, pg_bin: PgBin):
|
||||
XLOG_BLCKSZ = 8192
|
||||
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("b0")
|
||||
endpoint0 = env.endpoints.create_start("b0")
|
||||
endpoint0 = timeline.primary
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", endpoint0.connstr()])
|
||||
|
||||
@@ -133,8 +131,8 @@ def test_branching_unnormalized_start_lsn(neon_simple_env: NeonEnv, pg_bin: PgBi
|
||||
start_lsn = Lsn((int(curr_lsn) - XLOG_BLCKSZ) // XLOG_BLCKSZ * XLOG_BLCKSZ)
|
||||
|
||||
log.info(f"Branching b1 from b0 starting at lsn {start_lsn}...")
|
||||
env.neon_cli.create_branch("b1", "b0", ancestor_start_lsn=start_lsn)
|
||||
endpoint1 = env.endpoints.create_start("b1")
|
||||
tl2 = timeline.create_branch("branch", start_lsn)
|
||||
endpoint1 = tl2.primary
|
||||
|
||||
pg_bin.run_capture(["pgbench", "-i", endpoint1.connstr()])
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ from fixtures.utils import query_scalar
|
||||
#
|
||||
def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_clog_truncate", "empty")
|
||||
|
||||
# set aggressive autovacuum to make sure that truncation will happen
|
||||
config = [
|
||||
@@ -24,7 +23,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
"autovacuum_freeze_max_age=100000",
|
||||
]
|
||||
|
||||
endpoint = env.endpoints.create_start("test_clog_truncate", config_lines=config)
|
||||
endpoint = env.endpoints.create_start("main", config_lines=config)
|
||||
|
||||
# Install extension containing function needed for test
|
||||
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")
|
||||
@@ -58,7 +57,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
|
||||
# create new branch after clog truncation and start a compute node on it
|
||||
log.info(f"create branch at lsn_after_truncation {lsn_after_truncation}")
|
||||
env.neon_cli.create_branch(
|
||||
"test_clog_truncate_new", "test_clog_truncate", ancestor_start_lsn=lsn_after_truncation
|
||||
"test_clog_truncate_new", "main", ancestor_start_lsn=lsn_after_truncation
|
||||
)
|
||||
endpoint2 = env.endpoints.create_start("test_clog_truncate_new")
|
||||
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver
|
||||
from pathlib import Path
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder, flush_ep_to_pageserver, test_output_dir
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
|
||||
|
||||
def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
],
|
||||
)
|
||||
def do_combocid_op(timeline: TTimeline, op):
|
||||
endpoint = timeline.primary_with_config(config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
])
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
@@ -43,32 +42,26 @@ def do_combocid_op(neon_env_builder: NeonEnvBuilder, op):
|
||||
assert len(rows) == 500
|
||||
|
||||
cur.execute("rollback")
|
||||
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
timeline.stop_and_flush(endpoint)
|
||||
timeline.checkpoint(compact=False, wait_until_uploaded=True)
|
||||
|
||||
|
||||
def test_combocid_delete(neon_env_builder: NeonEnvBuilder):
|
||||
do_combocid_op(neon_env_builder, "delete from t")
|
||||
def test_combocid_delete(timeline: TTimeline):
|
||||
do_combocid_op(timeline, "delete from t")
|
||||
|
||||
|
||||
def test_combocid_update(neon_env_builder: NeonEnvBuilder):
|
||||
do_combocid_op(neon_env_builder, "update t set val=val+1")
|
||||
def test_combocid_update(timeline: TTimeline):
|
||||
do_combocid_op(timeline, "update t set val=val+1")
|
||||
|
||||
|
||||
def test_combocid_lock(neon_env_builder: NeonEnvBuilder):
|
||||
do_combocid_op(neon_env_builder, "select * from t for update")
|
||||
def test_combocid_lock(timeline: TTimeline):
|
||||
do_combocid_op(timeline, "select * from t for update")
|
||||
|
||||
|
||||
def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start(
|
||||
"main",
|
||||
config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
],
|
||||
)
|
||||
def test_combocid_multi_insert(timeline: TTimeline, test_output_dir: Path):
|
||||
endpoint = timeline.primary_with_config(config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
])
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
@@ -77,7 +70,7 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
|
||||
cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
|
||||
cur.execute("create table t(id integer, val integer)")
|
||||
file_path = f"{endpoint.pg_data_dir_path()}/t.csv"
|
||||
file_path = str(test_output_dir / "t.csv")
|
||||
cur.execute(f"insert into t select g, 0 from generate_series(1,{n_records}) g")
|
||||
cur.execute(f"copy t to '{file_path}'")
|
||||
cur.execute("truncate table t")
|
||||
@@ -106,15 +99,12 @@ def test_combocid_multi_insert(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
cur.execute("rollback")
|
||||
|
||||
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
timeline.stop_and_flush(endpoint)
|
||||
timeline.checkpoint(compact=False, wait_until_uploaded=True)
|
||||
|
||||
|
||||
def test_combocid(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
def test_combocid(timeline: TTimeline):
|
||||
endpoint = timeline.primary
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
@@ -147,7 +137,5 @@ def test_combocid(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
cur.execute("rollback")
|
||||
|
||||
flush_ep_to_pageserver(env, endpoint, env.initial_tenant, env.initial_timeline)
|
||||
env.pageserver.http_client().timeline_checkpoint(
|
||||
env.initial_tenant, env.initial_timeline, compact=False, wait_until_uploaded=True
|
||||
)
|
||||
timeline.stop_and_flush(endpoint)
|
||||
timeline.checkpoint(compact=False, wait_until_uploaded=True)
|
||||
|
||||
@@ -4,9 +4,8 @@ from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
def test_compute_catalog(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_config", "empty")
|
||||
|
||||
endpoint = env.endpoints.create_start("test_config", config_lines=["log_min_messages=debug1"])
|
||||
endpoint = env.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
|
||||
client = endpoint.http_client()
|
||||
|
||||
objects = client.dbs_and_roles()
|
||||
|
||||
@@ -9,10 +9,9 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
||||
#
|
||||
def test_config(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_config", "empty")
|
||||
|
||||
# change config
|
||||
endpoint = env.endpoints.create_start("test_config", config_lines=["log_min_messages=debug1"])
|
||||
endpoint = env.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
|
||||
|
||||
with closing(endpoint.connect()) as conn:
|
||||
with conn.cursor() as cur:
|
||||
|
||||
@@ -5,6 +5,7 @@ import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
@@ -12,20 +13,17 @@ from fixtures.utils import query_scalar
|
||||
# Test CREATE DATABASE when there have been relmapper changes
|
||||
#
|
||||
@pytest.mark.parametrize("strategy", ["file_copy", "wal_log"])
|
||||
def test_createdb(neon_simple_env: NeonEnv, strategy: str):
|
||||
env = neon_simple_env
|
||||
if env.pg_version == PgVersion.V14 and strategy == "wal_log":
|
||||
def test_createdb(timeline: TTimeline, pg_version: PgVersion, strategy: str):
|
||||
if pg_version == PgVersion.V14 and strategy == "wal_log":
|
||||
pytest.skip("wal_log strategy not supported on PostgreSQL 14")
|
||||
|
||||
env.neon_cli.create_branch("test_createdb", "empty")
|
||||
|
||||
endpoint = env.endpoints.create_start("test_createdb")
|
||||
endpoint = timeline.primary
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
# Cause a 'relmapper' change in the original branch
|
||||
cur.execute("VACUUM FULL pg_class")
|
||||
|
||||
if env.pg_version == PgVersion.V14:
|
||||
if pg_version == PgVersion.V14:
|
||||
cur.execute("CREATE DATABASE foodb")
|
||||
else:
|
||||
cur.execute(f"CREATE DATABASE foodb STRATEGY={strategy}")
|
||||
@@ -33,8 +31,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
|
||||
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
|
||||
|
||||
# Create a branch
|
||||
env.neon_cli.create_branch("test_createdb2", "test_createdb", ancestor_start_lsn=lsn)
|
||||
endpoint2 = env.endpoints.create_start("test_createdb2")
|
||||
tl2 = timeline.create_branch("createdb2", lsn=lsn)
|
||||
endpoint2 = tl2.primary
|
||||
|
||||
# Test that you can connect to the new database on both branches
|
||||
for db in (endpoint, endpoint2):
|
||||
@@ -60,10 +58,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
|
||||
#
|
||||
# Test DROP DATABASE
|
||||
#
|
||||
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_dropdb", "empty")
|
||||
endpoint = env.endpoints.create_start("test_dropdb")
|
||||
def test_dropdb(timeline: TTimeline, test_output_dir):
|
||||
endpoint = timeline.primary
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
cur.execute("CREATE DATABASE foodb")
|
||||
@@ -80,32 +76,28 @@ def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
|
||||
lsn_after_drop = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
|
||||
|
||||
# Create two branches before and after database drop.
|
||||
env.neon_cli.create_branch(
|
||||
"test_before_dropdb", "test_dropdb", ancestor_start_lsn=lsn_before_drop
|
||||
)
|
||||
endpoint_before = env.endpoints.create_start("test_before_dropdb")
|
||||
tl_before = timeline.create_branch("test_before_dropdb", lsn=lsn_before_drop)
|
||||
endpoint_before = tl_before.primary
|
||||
|
||||
env.neon_cli.create_branch(
|
||||
"test_after_dropdb", "test_dropdb", ancestor_start_lsn=lsn_after_drop
|
||||
)
|
||||
endpoint_after = env.endpoints.create_start("test_after_dropdb")
|
||||
tl_after = timeline.create_branch("test_after_dropdb", lsn=lsn_after_drop)
|
||||
endpoint_after = tl_after.primary
|
||||
|
||||
# Test that database exists on the branch before drop
|
||||
endpoint_before.connect(dbname="foodb").close()
|
||||
|
||||
# Test that database subdir exists on the branch before drop
|
||||
assert endpoint_before.pgdata_dir
|
||||
dbpath = pathlib.Path(endpoint_before.pgdata_dir) / "base" / str(dboid)
|
||||
assert timeline.tenant.pgdatadir(endpoint_before)
|
||||
dbpath = pathlib.Path(timeline.tenant.pgdatadir(endpoint_before)) / "base" / str(dboid)
|
||||
log.info(dbpath)
|
||||
|
||||
assert os.path.isdir(dbpath) is True
|
||||
|
||||
# Test that database subdir doesn't exist on the branch after drop
|
||||
assert endpoint_after.pgdata_dir
|
||||
dbpath = pathlib.Path(endpoint_after.pgdata_dir) / "base" / str(dboid)
|
||||
assert timeline.tenant.pgdatadir(endpoint_after)
|
||||
dbpath = pathlib.Path(timeline.tenant.pgdatadir(endpoint_after)) / "base" / str(dboid)
|
||||
log.info(dbpath)
|
||||
|
||||
assert os.path.isdir(dbpath) is False
|
||||
|
||||
# Check that we restore the content of the datadir correctly
|
||||
check_restored_datadir_content(test_output_dir, env, endpoint)
|
||||
timeline.tenant.check_restored_datadir_content(test_output_dir, endpoint)
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
#
|
||||
# Test CREATE USER to check shared catalog restore
|
||||
#
|
||||
def test_createuser(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_createuser", "empty")
|
||||
endpoint = env.endpoints.create_start("test_createuser")
|
||||
def test_createuser(timeline: TTimeline):
|
||||
endpoint = timeline.primary
|
||||
|
||||
with endpoint.cursor() as cur:
|
||||
# Cause a 'relmapper' change in the original branch
|
||||
@@ -19,8 +18,8 @@ def test_createuser(neon_simple_env: NeonEnv):
|
||||
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
|
||||
|
||||
# Create a branch
|
||||
env.neon_cli.create_branch("test_createuser2", "test_createuser", ancestor_start_lsn=lsn)
|
||||
endpoint2 = env.endpoints.create_start("test_createuser2")
|
||||
branch = timeline.create_branch("test_createuser2", lsn=lsn)
|
||||
endpoint2 = branch.primary
|
||||
|
||||
# Test that you can connect to new branch as a new user
|
||||
assert endpoint2.safe_psql("select current_user", user="testuser") == [("testuser",)]
|
||||
|
||||
@@ -290,9 +290,8 @@ def assert_db_connlimit(endpoint: Any, db_name: str, connlimit: int, msg: str):
|
||||
# Here we test the latter. The first one is tested in test_ddl_forwarding
|
||||
def test_ddl_forwarding_invalid_db(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_ddl_forwarding_invalid_db", "empty")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_ddl_forwarding_invalid_db",
|
||||
"main",
|
||||
# Some non-existent url
|
||||
config_lines=["neon.console_url=http://localhost:9999/unknown/api/v0/roles_and_databases"],
|
||||
)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import Endpoint
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -10,14 +11,11 @@ from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
"💣", # calls `trigger_segfault` internally
|
||||
],
|
||||
)
|
||||
def test_endpoint_crash(neon_env_builder: NeonEnvBuilder, sql_func: str):
|
||||
def test_endpoint_crash(timeline: TTimeline, sql_func: str):
|
||||
"""
|
||||
Test that triggering crash from neon_test_utils crashes the endpoint
|
||||
"""
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_endpoint_crash")
|
||||
endpoint = env.endpoints.create_start("test_endpoint_crash")
|
||||
|
||||
endpoint = timeline.primary
|
||||
endpoint.safe_psql("CREATE EXTENSION neon_test_utils;")
|
||||
with pytest.raises(Exception, match="This probably means the server terminated abnormally"):
|
||||
endpoint.safe_psql(f"SELECT {sql_func}();")
|
||||
|
||||
@@ -10,11 +10,9 @@ def test_explain_with_lfc_stats(neon_simple_env: NeonEnv):
|
||||
cache_dir = Path(env.repo_dir) / "file_cache"
|
||||
cache_dir.mkdir(exist_ok=True)
|
||||
|
||||
branchname = "test_explain_with_lfc_stats"
|
||||
env.neon_cli.create_branch(branchname, "empty")
|
||||
log.info(f"Creating endopint with 1MB shared_buffers and 64 MB LFC for branch {branchname}")
|
||||
log.info("Creating endpoint with 1MB shared_buffers and 64 MB LFC")
|
||||
endpoint = env.endpoints.create_start(
|
||||
branchname,
|
||||
"main",
|
||||
config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
f"neon.file_cache_path='{cache_dir}/file.cache'",
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
|
||||
|
||||
def test_fsm_truncate(neon_env_builder: NeonEnvBuilder):
|
||||
env = neon_env_builder.init_start()
|
||||
env.neon_cli.create_branch("test_fsm_truncate")
|
||||
endpoint = env.endpoints.create_start("test_fsm_truncate")
|
||||
def test_fsm_truncate(timeline: TTimeline):
|
||||
endpoint = timeline.primary
|
||||
endpoint.safe_psql(
|
||||
"CREATE TABLE t1(key int); CREATE TABLE t2(key int); TRUNCATE TABLE t1; TRUNCATE TABLE t2;"
|
||||
)
|
||||
|
||||
@@ -1,17 +1,15 @@
|
||||
import time
|
||||
|
||||
from fixtures.neon_fixtures import NeonEnv, wait_replica_caughtup
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
|
||||
|
||||
#
|
||||
# Test that redo of XLOG_GIN_VACUUM_PAGE doesn't produce error
|
||||
#
|
||||
def test_gin_redo(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
primary = env.endpoints.create_start(branch_name="main", endpoint_id="primary")
|
||||
time.sleep(1)
|
||||
secondary = env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary")
|
||||
def test_gin_redo(timeline: TTimeline):
|
||||
primary = timeline.primary
|
||||
secondary = timeline.secondary
|
||||
con = primary.connect()
|
||||
cur = con.cursor()
|
||||
cur.execute("create table gin_test_tbl(id integer, i int4[])")
|
||||
|
||||
@@ -14,6 +14,7 @@ from fixtures.neon_fixtures import (
|
||||
tenant_get_shards,
|
||||
wait_replica_caughtup,
|
||||
)
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.utils import wait_until
|
||||
|
||||
|
||||
@@ -221,29 +222,20 @@ def pgbench_accounts_initialized(ep):
|
||||
#
|
||||
# Without hs feedback enabled we'd see 'User query might have needed to see row
|
||||
# versions that must be removed.' errors.
|
||||
def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env = neon_env_builder.init_start()
|
||||
agressive_vacuum_conf = [
|
||||
def test_hot_standby_feedback(timeline: TTimeline, pg_bin: PgBin):
|
||||
with timeline.primary_with_config(config_lines=[
|
||||
"log_autovacuum_min_duration = 0",
|
||||
"autovacuum_naptime = 10s",
|
||||
"autovacuum_vacuum_threshold = 25",
|
||||
"autovacuum_vacuum_scale_factor = 0.1",
|
||||
"autovacuum_vacuum_cost_delay = -1",
|
||||
]
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="primary", config_lines=agressive_vacuum_conf
|
||||
) as primary:
|
||||
]) as primary:
|
||||
# It would be great to have more strict max_standby_streaming_delay=0s here, but then sometimes it fails with
|
||||
# 'User was holding shared buffer pin for too long.'.
|
||||
with env.endpoints.new_replica_start(
|
||||
origin=primary,
|
||||
endpoint_id="secondary",
|
||||
config_lines=[
|
||||
"max_standby_streaming_delay=2s",
|
||||
"neon.protocol_version=2",
|
||||
"hot_standby_feedback=true",
|
||||
],
|
||||
) as secondary:
|
||||
with timeline.secondary_with_config(config_lines=[
|
||||
"max_standby_streaming_delay=2s",
|
||||
"hot_standby_feedback=true",
|
||||
]) as secondary:
|
||||
log.info(
|
||||
f"primary connstr is {primary.connstr()}, secondary connstr {secondary.connstr()}"
|
||||
)
|
||||
@@ -286,20 +278,15 @@ def test_hot_standby_feedback(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
|
||||
# Test race condition between WAL replay and backends performing queries
|
||||
# https://github.com/neondatabase/neon/issues/7791
|
||||
def test_replica_query_race(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
primary_ep = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
)
|
||||
def test_replica_query_race(timeline: TTimeline):
|
||||
primary_ep = timeline.primary
|
||||
|
||||
with primary_ep.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("CREATE EXTENSION neon_test_utils")
|
||||
p_cur.execute("CREATE TABLE test AS SELECT 0 AS counter")
|
||||
|
||||
standby_ep = env.endpoints.new_replica_start(origin=primary_ep, endpoint_id="standby")
|
||||
standby_ep = timeline.secondary
|
||||
wait_replica_caughtup(primary_ep, standby_ep)
|
||||
|
||||
# In primary, run a lot of UPDATEs on a single page
|
||||
|
||||
@@ -8,23 +8,19 @@ import time
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, PgBin
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
|
||||
|
||||
#
|
||||
# Test branching, when a transaction is in prepared state
|
||||
#
|
||||
@pytest.mark.timeout(600)
|
||||
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_lfc_resize", "empty")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_lfc_resize",
|
||||
config_lines=[
|
||||
"neon.file_cache_path='file.cache'",
|
||||
"neon.max_file_cache_size=512MB",
|
||||
"neon.file_cache_size_limit=512MB",
|
||||
],
|
||||
)
|
||||
def test_lfc_resize(timeline: TTimeline, pg_bin: PgBin):
|
||||
endpoint = timeline.primary_with_config(config_lines=[
|
||||
"neon.file_cache_path='file.cache'",
|
||||
"neon.max_file_cache_size=512MB",
|
||||
"neon.file_cache_size_limit=512MB",
|
||||
])
|
||||
n_resize = 10
|
||||
scale = 100
|
||||
|
||||
@@ -50,7 +46,7 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
|
||||
|
||||
thread.join()
|
||||
|
||||
lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache"
|
||||
lfc_file_path = timeline.tenant.pgdatadir(endpoint) / "file.cache"
|
||||
lfc_file_size = os.path.getsize(lfc_file_path)
|
||||
res = subprocess.run(["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True)
|
||||
lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0]
|
||||
|
||||
@@ -3,6 +3,7 @@ from pathlib import Path
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
@@ -12,11 +13,9 @@ def test_lfc_working_set_approximation(neon_simple_env: NeonEnv):
|
||||
cache_dir = Path(env.repo_dir) / "file_cache"
|
||||
cache_dir.mkdir(exist_ok=True)
|
||||
|
||||
branchname = "test_approximate_working_set_size"
|
||||
env.neon_cli.create_branch(branchname, "empty")
|
||||
log.info(f"Creating endopint with 1MB shared_buffers and 64 MB LFC for branch {branchname}")
|
||||
log.info("Creating endpoint with 1MB shared_buffers and 64 MB LFC")
|
||||
endpoint = env.endpoints.create_start(
|
||||
branchname,
|
||||
"main",
|
||||
config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
f"neon.file_cache_path='{cache_dir}/file.cache'",
|
||||
@@ -75,18 +74,14 @@ WITH (fillfactor='100');
|
||||
assert blocks < 10
|
||||
|
||||
|
||||
def test_sliding_working_set_approximation(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
def test_sliding_working_set_approximation(timeline: TTimeline):
|
||||
endpoint = timeline.primary_with_config(config_lines=[
|
||||
"autovacuum = off",
|
||||
"shared_buffers=1MB",
|
||||
"neon.max_file_cache_size=256MB",
|
||||
"neon.file_cache_size_limit=245MB",
|
||||
])
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
config_lines=[
|
||||
"autovacuum = off",
|
||||
"shared_buffers=1MB",
|
||||
"neon.max_file_cache_size=256MB",
|
||||
"neon.file_cache_size_limit=245MB",
|
||||
],
|
||||
)
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
cur.execute("create extension neon")
|
||||
|
||||
@@ -5,7 +5,7 @@ import threading
|
||||
import time
|
||||
from typing import List
|
||||
|
||||
from fixtures.neon_fixtures import DEFAULT_BRANCH_NAME, NeonEnvBuilder
|
||||
from fixtures.neon_fixtures import NeonEnvBuilder
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
@@ -15,11 +15,8 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
|
||||
cache_dir = os.path.join(env.repo_dir, "file_cache")
|
||||
os.mkdir(cache_dir)
|
||||
|
||||
env.neon_cli.create_branch("empty", ancestor_branch_name=DEFAULT_BRANCH_NAME)
|
||||
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_local_file_cache_unlink",
|
||||
"main",
|
||||
config_lines=[
|
||||
"shared_buffers='1MB'",
|
||||
f"neon.file_cache_path='{cache_dir}/file.cache'",
|
||||
|
||||
@@ -36,10 +36,8 @@ def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
env = neon_simple_env
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_logical_replication", "empty")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_logical_replication", config_lines=["log_statement=all"]
|
||||
)
|
||||
timeline_id = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start("main", config_lines=["log_statement=all"])
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
@@ -185,10 +183,9 @@ def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg):
|
||||
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("test_logical_replication", "empty")
|
||||
# set low neon.logical_replication_max_snap_files
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_logical_replication",
|
||||
"main",
|
||||
config_lines=["log_statement=all", "neon.logical_replication_max_snap_files=1"],
|
||||
)
|
||||
|
||||
@@ -472,7 +469,7 @@ def test_slots_and_branching(neon_simple_env: NeonEnv):
|
||||
def test_replication_shutdown(neon_simple_env: NeonEnv):
|
||||
# Ensure Postgres can exit without stuck when a replication job is active + neon extension installed
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_replication_shutdown_publisher", "empty")
|
||||
env.neon_cli.create_branch("test_replication_shutdown_publisher", "main")
|
||||
pub = env.endpoints.create("test_replication_shutdown_publisher")
|
||||
|
||||
env.neon_cli.create_branch("test_replication_shutdown_subscriber")
|
||||
|
||||
@@ -9,9 +9,8 @@ if TYPE_CHECKING:
|
||||
|
||||
def test_migrations(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_migrations", "empty")
|
||||
|
||||
endpoint = env.endpoints.create("test_migrations")
|
||||
endpoint = env.endpoints.create("main")
|
||||
endpoint.respec(skip_pg_catalog_updates=False)
|
||||
endpoint.start()
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
from fixtures.shared_fixtures import TTimeline
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
@@ -12,10 +13,8 @@ from fixtures.utils import query_scalar
|
||||
# is enough to verify that the WAL records are handled correctly
|
||||
# in the pageserver.
|
||||
#
|
||||
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_multixact", "empty")
|
||||
endpoint = env.endpoints.create_start("test_multixact")
|
||||
def test_multixact(timeline: TTimeline, test_output_dir):
|
||||
endpoint = timeline.primary
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute(
|
||||
@@ -73,8 +72,8 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
assert int(next_multixact_id) > int(next_multixact_id_old)
|
||||
|
||||
# Branch at this point
|
||||
env.neon_cli.create_branch("test_multixact_new", "test_multixact", ancestor_start_lsn=lsn)
|
||||
endpoint_new = env.endpoints.create_start("test_multixact_new")
|
||||
branch = timeline.create_branch("test_multixact_new", lsn=lsn)
|
||||
endpoint_new = branch.primary
|
||||
|
||||
next_multixact_id_new = endpoint_new.safe_psql(
|
||||
"SELECT next_multixact_id FROM pg_control_checkpoint()"
|
||||
@@ -84,4 +83,4 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
|
||||
assert next_multixact_id_new == next_multixact_id
|
||||
|
||||
# Check that we can restore the content of the datadir correctly
|
||||
check_restored_datadir_content(test_output_dir, env, endpoint)
|
||||
timeline.tenant.check_restored_datadir_content(test_output_dir, endpoint)
|
||||
|
||||
@@ -6,7 +6,7 @@ from fixtures.utils import wait_until
|
||||
|
||||
def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_neon_superuser_publisher", "empty")
|
||||
env.neon_cli.create_branch("test_neon_superuser_publisher", "main")
|
||||
pub = env.endpoints.create("test_neon_superuser_publisher")
|
||||
|
||||
env.neon_cli.create_branch("test_neon_superuser_subscriber")
|
||||
|
||||
@@ -41,8 +41,7 @@ async def parallel_load_same_table(endpoint: Endpoint, n_parallel: int):
|
||||
# Load data into one table with COPY TO from 5 parallel connections
|
||||
def test_parallel_copy(neon_simple_env: NeonEnv, n_parallel=5):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_parallel_copy", "empty")
|
||||
endpoint = env.endpoints.create_start("test_parallel_copy")
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
# Create test table
|
||||
conn = endpoint.connect()
|
||||
|
||||
@@ -42,11 +42,9 @@ def test_cancellations(neon_simple_env: NeonEnv):
|
||||
ps_http = ps.http_client()
|
||||
ps_http.is_testing_enabled_or_skip()
|
||||
|
||||
env.neon_cli.create_branch("test_config", "empty")
|
||||
|
||||
# We don't want to have any racy behaviour with autovacuum IOs
|
||||
ep = env.endpoints.create_start(
|
||||
"test_config",
|
||||
"main",
|
||||
config_lines=[
|
||||
"autovacuum = off",
|
||||
"shared_buffers = 128MB",
|
||||
|
||||
@@ -22,8 +22,8 @@ def check_wal_segment(pg_waldump_path: str, segment_path: str, test_output_dir):
|
||||
def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
|
||||
env = neon_simple_env
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_pg_waldump", "empty")
|
||||
endpoint = env.endpoints.create_start("test_pg_waldump")
|
||||
timeline_id = env.initial_timeline
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
cur = endpoint.connect().cursor()
|
||||
cur.execute(
|
||||
|
||||
@@ -15,12 +15,8 @@ extensions = ["pageinspect", "neon_test_utils", "pg_buffercache"]
|
||||
#
|
||||
def test_read_validation(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_read_validation", "empty")
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_read_validation",
|
||||
)
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
with closing(endpoint.connect()) as con:
|
||||
with con.cursor() as c:
|
||||
for e in extensions:
|
||||
@@ -131,13 +127,9 @@ def test_read_validation(neon_simple_env: NeonEnv):
|
||||
|
||||
def test_read_validation_neg(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_read_validation_neg", "empty")
|
||||
|
||||
env.pageserver.allowed_errors.append(".*invalid LSN\\(0\\) in request.*")
|
||||
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_read_validation_neg",
|
||||
)
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
with closing(endpoint.connect()) as con:
|
||||
with con.cursor() as c:
|
||||
|
||||
@@ -22,8 +22,7 @@ from fixtures.utils import query_scalar
|
||||
#
|
||||
def test_readonly_node(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_readonly_node", "empty")
|
||||
endpoint_main = env.endpoints.create_start("test_readonly_node")
|
||||
endpoint_main = env.endpoints.create_start("main")
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
@@ -74,12 +73,12 @@ def test_readonly_node(neon_simple_env: NeonEnv):
|
||||
|
||||
# Create first read-only node at the point where only 100 rows were inserted
|
||||
endpoint_hundred = env.endpoints.create_start(
|
||||
branch_name="test_readonly_node", endpoint_id="ep-readonly_node_hundred", lsn=lsn_a
|
||||
branch_name="main", endpoint_id="ep-readonly_node_hundred", lsn=lsn_a
|
||||
)
|
||||
|
||||
# And another at the point where 200100 rows were inserted
|
||||
endpoint_more = env.endpoints.create_start(
|
||||
branch_name="test_readonly_node", endpoint_id="ep-readonly_node_more", lsn=lsn_b
|
||||
branch_name="main", endpoint_id="ep-readonly_node_more", lsn=lsn_b
|
||||
)
|
||||
|
||||
# On the 'hundred' node, we should see only 100 rows
|
||||
@@ -100,7 +99,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
|
||||
|
||||
# Check creating a node at segment boundary
|
||||
endpoint = env.endpoints.create_start(
|
||||
branch_name="test_readonly_node",
|
||||
branch_name="main",
|
||||
endpoint_id="ep-branch_segment_boundary",
|
||||
lsn=Lsn("0/3000000"),
|
||||
)
|
||||
@@ -112,7 +111,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
|
||||
with pytest.raises(Exception, match="invalid basebackup lsn"):
|
||||
# compute node startup with invalid LSN should fail
|
||||
env.endpoints.create_start(
|
||||
branch_name="test_readonly_node",
|
||||
branch_name="main",
|
||||
endpoint_id="ep-readonly_node_preinitdb",
|
||||
lsn=Lsn("0/42"),
|
||||
)
|
||||
@@ -218,14 +217,10 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
|
||||
# Similar test, but with more data, and we force checkpoints
|
||||
def test_timetravel(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
pageserver_http_client = env.pageserver.http_client()
|
||||
env.neon_cli.create_branch("test_timetravel", "empty")
|
||||
endpoint = env.endpoints.create_start("test_timetravel")
|
||||
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
|
||||
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
lsns = []
|
||||
|
||||
@@ -249,7 +244,7 @@ def test_timetravel(neon_simple_env: NeonEnv):
|
||||
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
|
||||
|
||||
# run checkpoint manually to force a new layer file
|
||||
pageserver_http_client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
client.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
##### Restart pageserver
|
||||
env.endpoints.stop_all()
|
||||
@@ -258,7 +253,7 @@ def test_timetravel(neon_simple_env: NeonEnv):
|
||||
|
||||
for i, lsn in lsns:
|
||||
endpoint_old = env.endpoints.create_start(
|
||||
branch_name="test_timetravel", endpoint_id=f"ep-old_lsn_{i}", lsn=lsn
|
||||
branch_name="main", endpoint_id=f"ep-old_lsn_{i}", lsn=lsn
|
||||
)
|
||||
with endpoint_old.cursor() as cur:
|
||||
assert query_scalar(cur, f"select count(*) from testtab where iteration={i}") == 100000
|
||||
|
||||
@@ -9,8 +9,7 @@ from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
|
||||
# CLOG.
|
||||
def test_subxacts(neon_simple_env: NeonEnv, test_output_dir):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_subxacts", "empty")
|
||||
endpoint = env.endpoints.create_start("test_subxacts")
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
@@ -68,10 +68,13 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
|
||||
|
||||
# construct pair of branches to validate that pageserver prohibits
|
||||
# deletion of ancestor timelines when they have child branches
|
||||
parent_timeline_id = env.neon_cli.create_branch("test_ancestor_branch_delete_parent", "empty")
|
||||
parent_timeline_id = env.neon_cli.create_branch(
|
||||
new_branch_name="test_ancestor_branch_delete_parent", ancestor_branch_name="main"
|
||||
)
|
||||
|
||||
leaf_timeline_id = env.neon_cli.create_branch(
|
||||
"test_ancestor_branch_delete_branch1", "test_ancestor_branch_delete_parent"
|
||||
new_branch_name="test_ancestor_branch_delete_branch1",
|
||||
ancestor_branch_name="test_ancestor_branch_delete_parent",
|
||||
)
|
||||
|
||||
timeline_path = env.pageserver.timeline_dir(env.initial_tenant, parent_timeline_id)
|
||||
|
||||
@@ -36,7 +36,7 @@ from fixtures.utils import get_timeline_dir_size, wait_until
|
||||
|
||||
def test_timeline_size(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
new_timeline_id = env.neon_cli.create_branch("test_timeline_size", "empty")
|
||||
new_timeline_id = env.neon_cli.create_branch("test_timeline_size", "main")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
client.timeline_wait_logical_size(env.initial_tenant, new_timeline_id)
|
||||
@@ -68,7 +68,7 @@ def test_timeline_size(neon_simple_env: NeonEnv):
|
||||
|
||||
def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
new_timeline_id = env.neon_cli.create_branch("test_timeline_size_createdropdb", "empty")
|
||||
new_timeline_id = env.neon_cli.create_branch("test_timeline_size_createdropdb", "main")
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
client.timeline_wait_logical_size(env.initial_tenant, new_timeline_id)
|
||||
|
||||
@@ -9,10 +9,7 @@ from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
|
||||
#
|
||||
def test_twophase(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_twophase", "empty")
|
||||
endpoint = env.endpoints.create_start(
|
||||
"test_twophase", config_lines=["max_prepared_transactions=5"]
|
||||
)
|
||||
endpoint = env.endpoints.create_start("main", config_lines=["max_prepared_transactions=5"])
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
@@ -56,7 +53,7 @@ def test_twophase(neon_simple_env: NeonEnv):
|
||||
assert len(twophase_files) == 2
|
||||
|
||||
# Create a branch with the transaction in prepared state
|
||||
fork_at_current_lsn(env, endpoint, "test_twophase_prepared", "test_twophase")
|
||||
fork_at_current_lsn(env, endpoint, "test_twophase_prepared", "main")
|
||||
|
||||
# Start compute on the new branch
|
||||
endpoint2 = env.endpoints.create_start(
|
||||
|
||||
@@ -9,8 +9,7 @@ from fixtures.pg_version import PgVersion
|
||||
#
|
||||
def test_unlogged(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_unlogged", "empty")
|
||||
endpoint = env.endpoints.create_start("test_unlogged")
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
conn = endpoint.connect()
|
||||
cur = conn.cursor()
|
||||
@@ -22,7 +21,7 @@ def test_unlogged(neon_simple_env: NeonEnv):
|
||||
cur.execute("INSERT INTO iut (id) values (42);")
|
||||
|
||||
# create another compute to fetch inital empty contents from pageserver
|
||||
fork_at_current_lsn(env, endpoint, "test_unlogged_basebackup", "test_unlogged")
|
||||
fork_at_current_lsn(env, endpoint, "test_unlogged_basebackup", "main")
|
||||
endpoint2 = env.endpoints.create_start("test_unlogged_basebackup")
|
||||
|
||||
conn2 = endpoint2.connect()
|
||||
|
||||
@@ -2,7 +2,7 @@ import time
|
||||
from contextlib import closing
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, fork_at_current_lsn
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
@@ -13,8 +13,7 @@ from fixtures.utils import query_scalar
|
||||
def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("test_vm_bit_clear", "empty")
|
||||
endpoint = env.endpoints.create_start("test_vm_bit_clear")
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
@@ -58,7 +57,7 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
cur.execute("UPDATE vmtest_cold_update2 SET id = 5000, filler=repeat('x', 200) WHERE id = 1")
|
||||
|
||||
# Branch at this point, to test that later
|
||||
fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "test_vm_bit_clear")
|
||||
# fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "main")
|
||||
|
||||
# Clear the buffer cache, to force the VM page to be re-fetched from
|
||||
# the page server
|
||||
@@ -92,6 +91,7 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
|
||||
# a dirty VM page is evicted. If the VM bit was not correctly cleared by the
|
||||
# earlier WAL record, the full-page image hides the problem. Starting a new
|
||||
# server at the right point-in-time avoids that full-page image.
|
||||
|
||||
endpoint_new = env.endpoints.create_start("test_vm_bit_clear_new")
|
||||
|
||||
pg_new_conn = endpoint_new.connect()
|
||||
|
||||
@@ -23,8 +23,7 @@ run_broken = pytest.mark.skipif(
|
||||
def test_broken(neon_simple_env: NeonEnv, pg_bin):
|
||||
env = neon_simple_env
|
||||
|
||||
env.neon_cli.create_branch("test_broken", "empty")
|
||||
env.endpoints.create_start("test_broken")
|
||||
env.endpoints.create_start("main")
|
||||
log.info("postgres is running")
|
||||
|
||||
log.info("THIS NEXT COMMAND WILL FAIL:")
|
||||
|
||||
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 49d5e576a5...0353a8f5f7
2
vendor/postgres-v16
vendored
2
vendor/postgres-v16
vendored
Submodule vendor/postgres-v16 updated: 6e9a4ff624...49386938eb
@@ -60,7 +60,7 @@ num-bigint = { version = "0.4" }
|
||||
num-integer = { version = "0.1", features = ["i128"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
once_cell = { version = "1" }
|
||||
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master", default-features = false, features = ["zstd"] }
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
prost = { version = "0.11" }
|
||||
rand = { version = "0.8", features = ["small_rng"] }
|
||||
regex = { version = "1" }
|
||||
@@ -83,6 +83,7 @@ time = { version = "0.3", features = ["macros", "serde-well-known"] }
|
||||
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
|
||||
tokio-rustls = { version = "0.24" }
|
||||
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
|
||||
toml_edit = { version = "0.22", features = ["serde"] }
|
||||
tonic = { version = "0.9", features = ["tls-roots"] }
|
||||
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "timeout", "util"] }
|
||||
tracing = { version = "0.1", features = ["log"] }
|
||||
@@ -115,7 +116,7 @@ num-bigint = { version = "0.4" }
|
||||
num-integer = { version = "0.1", features = ["i128"] }
|
||||
num-traits = { version = "0.2", features = ["i128", "libm"] }
|
||||
once_cell = { version = "1" }
|
||||
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master", default-features = false, features = ["zstd"] }
|
||||
parquet = { version = "53", default-features = false, features = ["zstd"] }
|
||||
proc-macro2 = { version = "1" }
|
||||
prost = { version = "0.11" }
|
||||
quote = { version = "1" }
|
||||
@@ -126,6 +127,7 @@ serde = { version = "1", features = ["alloc", "derive"] }
|
||||
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit"] }
|
||||
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] }
|
||||
time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] }
|
||||
toml_edit = { version = "0.22", features = ["serde"] }
|
||||
zstd = { version = "0.13" }
|
||||
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
|
||||
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }
|
||||
|
||||
Reference in New Issue
Block a user