Compare commits

..

14 Commits

Author SHA1 Message Date
Vlad Lazar
b2cb10590e fixup: deserialize shenanigans 2024-09-12 20:00:06 +01:00
Vlad Lazar
2923fd2a5b fixup: remove stale import 2024-09-12 19:25:46 +01:00
Vlad Lazar
2a5336b9ab fixup image deserialization 2024-09-12 19:24:41 +01:00
Christian Schwarz
6f20726610 Merge remote-tracking branch 'origin/hackaneon/lisbon24/superscalar-page_service--problame/evaluate-debouncer' into hackaneon/lisbon24/superscalar-page_service 2024-09-12 17:47:16 +00:00
Christian Schwarz
29f741e1e9 debounce: actually issue vectored get 2024-09-12 17:46:30 +00:00
Vlad Lazar
2b37a40079 Materialize future ios 2024-09-12 18:25:17 +01:00
Vlad Lazar
af2b65a2fb Rework issuing of IOs on read path 2024-09-12 16:42:35 +01:00
Christian Schwarz
5d194c7824 debounce: bounce if shard or effective request_lsn differ 2024-09-12 14:20:32 +00:00
Christian Schwarz
ac2702afd3 deboucner: move decoding into debounce loop 2024-09-12 10:58:09 +00:00
Christian Schwarz
88fd46d795 sketch interface 2024-09-12 11:35:00 +01:00
Christian Schwarz
2d6763882e pagebench: fake queue depth of 10 2024-09-12 11:35:00 +01:00
Christian Schwarz
c0c23cde72 debouncer 2024-09-12 11:35:00 +01:00
Christian Schwarz
942bc9544b fixup 2024-09-11 20:04:39 +00:00
Christian Schwarz
02b7cdb305 HACK: instrument page_service to count nonblocking consecutive getpage requests 2024-09-11 19:25:19 +01:00
88 changed files with 1474 additions and 1139 deletions

View File

@@ -1 +0,0 @@
FROM neondatabase/build-tools:pinned

View File

@@ -1,23 +0,0 @@
// https://containers.dev/implementors/json_reference/
{
"name": "Neon",
"build": {
"context": "..",
"dockerfile": "Dockerfile.devcontainer"
},
"postCreateCommand": {
"build neon": "BUILD_TYPE=debug CARGO_BUILD_FLAGS='--features=testing' mold -run make -s -j`nproc`",
"install python deps": "./scripts/pysync"
},
"customizations": {
"vscode": {
"extensions": [
"charliermarsh.ruff",
"github.vscode-github-actions",
"rust-lang.rust-analyzer"
]
}
}
}

115
Cargo.lock generated
View File

@@ -1189,9 +1189,9 @@ dependencies = [
[[package]]
name = "comfy-table"
version = "7.1.1"
version = "6.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b34115915337defe99b2aff5c2ce6771e5fbc4079f4b506301f5cf394c8452f7"
checksum = "6e7b787b0dc42e8111badfdbe4c3059158ccb2db8780352fa1b01e8ccf45cc4d"
dependencies = [
"crossterm",
"strum",
@@ -1246,7 +1246,7 @@ dependencies = [
"tokio-postgres",
"tokio-stream",
"tokio-util",
"toml_edit",
"toml_edit 0.19.10",
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
@@ -1360,8 +1360,8 @@ dependencies = [
"tokio",
"tokio-postgres",
"tokio-util",
"toml",
"toml_edit",
"toml 0.7.4",
"toml_edit 0.19.10",
"tracing",
"url",
"utils",
@@ -1485,22 +1485,25 @@ checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345"
[[package]]
name = "crossterm"
version = "0.27.0"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f476fe445d41c9e991fd07515a6f463074b782242ccf4a5b7b1d1012e70824df"
checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67"
dependencies = [
"bitflags 2.4.1",
"bitflags 1.3.2",
"crossterm_winapi",
"libc",
"mio",
"parking_lot 0.12.1",
"signal-hook",
"signal-hook-mio",
"winapi",
]
[[package]]
name = "crossterm_winapi"
version = "0.9.1"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b"
checksum = "2ae1b35a484aa10e07fe0638d02301c5ad24de82d310ccbd2f3693da5f09bf1c"
dependencies = [
"winapi",
]
@@ -3141,7 +3144,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd01039851e82f8799046eabbb354056283fb265c8ec0996af940f4e85a380ff"
dependencies = [
"serde",
"toml",
"toml 0.8.14",
]
[[package]]
@@ -3657,7 +3660,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-util",
"toml_edit",
"toml_edit 0.19.10",
"utils",
"workspace_hack",
]
@@ -3744,7 +3747,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit",
"toml_edit 0.19.10",
"tracing",
"twox-hash",
"url",
@@ -3907,9 +3910,8 @@ dependencies = [
[[package]]
name = "parquet"
version = "53.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0fbf928021131daaa57d334ca8e3904fe9ae22f73c56244fc7db9b04eedc3d8"
version = "51.0.0"
source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829"
dependencies = [
"ahash",
"bytes",
@@ -3928,9 +3930,8 @@ dependencies = [
[[package]]
name = "parquet_derive"
version = "53.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86e9fcfae007533a06b580429a3f7e07cb833ec8aa37c041c16563e7918f057e"
version = "51.0.0"
source = "git+https://github.com/apache/arrow-rs?branch=master#2534976a564be3d2d56312dc88fb1b6ed4cef829"
dependencies = [
"parquet",
"proc-macro2",
@@ -4120,7 +4121,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4133,7 +4134,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -4152,7 +4153,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4811,7 +4812,7 @@ dependencies = [
"tokio",
"tokio-stream",
"tokio-util",
"toml_edit",
"toml_edit 0.19.10",
"tracing",
"utils",
]
@@ -5321,7 +5322,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit",
"toml_edit 0.19.10",
"tracing",
"tracing-subscriber",
"url",
@@ -5730,6 +5731,17 @@ dependencies = [
"signal-hook-registry",
]
[[package]]
name = "signal-hook-mio"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af"
dependencies = [
"libc",
"mio",
"signal-hook",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
@@ -6042,21 +6054,21 @@ checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "strum"
version = "0.26.3"
version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06"
checksum = "063e6045c0e62079840579a7e47a355ae92f60eb74daaf156fb1e84ba164e63f"
[[package]]
name = "strum_macros"
version = "0.26.4"
version = "0.24.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be"
checksum = "1e385be0d24f186b4ce2f9982191e7101bb737312ad61c1f2f984f34bcf85d59"
dependencies = [
"heck 0.5.0",
"heck 0.4.1",
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.52",
"syn 1.0.109",
]
[[package]]
@@ -6397,7 +6409,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=20031d7a9ee1addeae6e0968e3899ae6bf01cee2#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#20031d7a9ee1addeae6e0968e3899ae6bf01cee2"
dependencies = [
"async-trait",
"byteorder",
@@ -6508,6 +6520,18 @@ dependencies = [
"tracing",
]
[[package]]
name = "toml"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6135d499e69981f9ff0ef2167955a5333c35e36f6937d382974566b3d5b94ec"
dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit 0.19.10",
]
[[package]]
name = "toml"
version = "0.8.14"
@@ -6517,7 +6541,7 @@ dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"toml_edit",
"toml_edit 0.22.14",
]
[[package]]
@@ -6529,6 +6553,19 @@ dependencies = [
"serde",
]
[[package]]
name = "toml_edit"
version = "0.19.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2380d56e8670370eee6566b0bfd4265f65b3f432e8c6d85623f728d4fa31f739"
dependencies = [
"indexmap 1.9.3",
"serde",
"serde_spanned",
"toml_datetime",
"winnow 0.4.6",
]
[[package]]
name = "toml_edit"
version = "0.22.14"
@@ -6539,7 +6576,7 @@ dependencies = [
"serde",
"serde_spanned",
"toml_datetime",
"winnow",
"winnow 0.6.13",
]
[[package]]
@@ -6952,7 +6989,7 @@ dependencies = [
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit",
"toml_edit 0.19.10",
"tracing",
"tracing-error",
"tracing-subscriber",
@@ -7498,6 +7535,15 @@ version = "0.52.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32b752e52a2da0ddfbdbcc6fceadfeede4c939ed16d13e648833a61dfb611ed8"
[[package]]
name = "winnow"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61de7bac303dc551fe038e2b3cef0f571087a47571ea6e79a87692ac99b99699"
dependencies = [
"memchr",
]
[[package]]
name = "winnow"
version = "0.6.13"
@@ -7605,7 +7651,6 @@ dependencies = [
"tokio",
"tokio-rustls 0.24.0",
"tokio-util",
"toml_edit",
"tonic",
"tower",
"tracing",

View File

@@ -73,7 +73,7 @@ camino = "1.1.6"
cfg-if = "1.0.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
clap = { version = "4.0", features = ["derive"] }
comfy-table = "7.1"
comfy-table = "6.1"
const_format = "0.2"
crc32c = "0.6"
crossbeam-deque = "0.8.5"
@@ -123,8 +123,8 @@ opentelemetry = "0.20.0"
opentelemetry-otlp = { version = "0.13.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
opentelemetry-semantic-conventions = "0.12.0"
parking_lot = "0.12"
parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet_derive = "53"
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
parquet_derive = "51.0.0"
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
pin-project-lite = "0.2"
procfs = "0.16"
@@ -158,8 +158,8 @@ signal-hook = "0.3"
smallvec = "1.11"
smol_str = { version = "0.2.0", features = ["serde"] }
socket2 = "0.5"
strum = "0.26"
strum_macros = "0.26"
strum = "0.24"
strum_macros = "0.24"
"subtle" = "2.5.0"
svg_fmt = "0.4.3"
sync_wrapper = "0.1.2"
@@ -177,8 +177,8 @@ tokio-rustls = "0.25"
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.8"
toml_edit = "0.22"
toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}
tower-service = "0.3.2"
tracing = "0.1"
@@ -201,21 +201,10 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
# We want to use the 'neon' branch for these, but there's currently one
# incompatible change on the branch. See:
#
# - PR #8076 which contained changes that depended on the new changes in
# the rust-postgres crate, and
# - PR #8654 which reverted those changes and made the code in proxy incompatible
# with the tip of the 'neon' branch again.
#
# When those proxy changes are re-applied (see PR #8747), we can switch using
# the tip of the 'neon' branch again.
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
## Local libraries
compute_api = { version = "0.1", path = "./libs/compute_api/" }
@@ -252,7 +241,11 @@ tonic-build = "0.9"
[patch.crates-io]
# Needed to get `tokio-postgres-rustls` to depend on our fork.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev = "20031d7a9ee1addeae6e0968e3899ae6bf01cee2" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
# bug fixes for UUID
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master" }
parquet_derive = { git = "https://github.com/apache/arrow-rs", branch = "master" }
################# Binary contents sections

View File

@@ -640,8 +640,6 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
}
Some(("branch", branch_match)) => {
let tenant_id = get_tenant_id(branch_match, env)?;
let new_timeline_id =
parse_timeline_id(branch_match)?.unwrap_or(TimelineId::generate());
let new_branch_name = branch_match
.get_one::<String>("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;
@@ -660,6 +658,7 @@ async fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::Local
.map(|lsn_str| Lsn::from_str(lsn_str))
.transpose()
.context("Failed to parse ancestor start Lsn from the request")?;
let new_timeline_id = TimelineId::generate();
let storage_controller = StorageController::from_env(env);
let create_req = TimelineCreateRequest {
new_timeline_id,
@@ -1571,6 +1570,7 @@ fn cli() -> Command {
.value_parser(value_parser!(PathBuf))
.value_name("config")
)
.arg(pg_version_arg.clone())
.arg(force_arg)
)
.subcommand(
@@ -1583,7 +1583,6 @@ fn cli() -> Command {
.subcommand(Command::new("branch")
.about("Create a new timeline, using another timeline as a base, copying its data")
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone())
.arg(branch_name_arg.clone())
.arg(Arg::new("ancestor-branch-name").long("ancestor-branch-name")
.help("Use last Lsn of another timeline (and its data) as base when creating the new timeline. The timeline gets resolved by its branch name.").required(false))

View File

@@ -75,14 +75,14 @@ impl PageServerNode {
}
}
fn pageserver_make_identity_toml(&self, node_id: NodeId) -> toml_edit::DocumentMut {
toml_edit::DocumentMut::from_str(&format!("id={node_id}")).unwrap()
fn pageserver_make_identity_toml(&self, node_id: NodeId) -> toml_edit::Document {
toml_edit::Document::from_str(&format!("id={node_id}")).unwrap()
}
fn pageserver_init_make_toml(
&self,
conf: NeonLocalInitPageserverConf,
) -> anyhow::Result<toml_edit::DocumentMut> {
) -> anyhow::Result<toml_edit::Document> {
assert_eq!(&PageServerConf::from(&conf), &self.conf, "during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully");
// TODO(christian): instead of what we do here, create a pageserver_api::config::ConfigToml (PR #7656)
@@ -137,9 +137,9 @@ impl PageServerNode {
// Turn `overrides` into a toml document.
// TODO: above code is legacy code, it should be refactored to use toml_edit directly.
let mut config_toml = toml_edit::DocumentMut::new();
let mut config_toml = toml_edit::Document::new();
for fragment_str in overrides {
let fragment = toml_edit::DocumentMut::from_str(&fragment_str)
let fragment = toml_edit::Document::from_str(&fragment_str)
.expect("all fragments in `overrides` are valid toml documents, this function controls that");
for (key, item) in fragment.iter() {
config_toml.insert(key, item.clone());

View File

@@ -263,6 +263,15 @@ impl Key {
field5: u8::MAX,
field6: u32::MAX,
};
/// A key slightly smaller than [`Key::MAX`] for use in layer key ranges to avoid them to be confused with L0 layers
pub const NON_L0_MAX: Key = Key {
field1: u8::MAX,
field2: u32::MAX,
field3: u32::MAX,
field4: u32::MAX,
field5: u8::MAX,
field6: u32::MAX - 1,
};
pub fn from_hex(s: &str) -> Result<Self> {
if s.len() != 36 {

View File

@@ -62,7 +62,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
serde::Serialize,
serde::Deserialize,
strum_macros::Display,
strum_macros::VariantNames,
strum_macros::EnumVariantNames,
strum_macros::AsRefStr,
strum_macros::IntoStaticStr,
)]

View File

@@ -89,19 +89,8 @@ impl PageserverUtilization {
/// If a node is currently hosting more work than it can comfortably handle. This does not indicate that
/// it will fail, but it is a strong signal that more work should not be added unless there is no alternative.
///
/// When a node is overloaded, we may override soft affinity preferences and do things like scheduling
/// into a node in a less desirable AZ, if all the nodes in the preferred AZ are overloaded.
pub fn is_overloaded(score: RawScore) -> bool {
// Why the factor of two? This is unscientific but reflects behavior of real systems:
// - In terms of shard counts, a node's preferred max count is a soft limit intended to keep
// startup and housekeeping jobs nice and responsive. We can go to double this limit if needed
// until some more nodes are deployed.
// - In terms of disk space, the node's utilization heuristic assumes every tenant needs to
// hold its biggest timeline fully on disk, which is tends to be an over estimate when
// some tenants are very idle and have dropped layers from disk. In practice going up to
// double is generally better than giving up and scheduling in a sub-optimal AZ.
score >= 2 * Self::UTILIZATION_FULL
score >= Self::UTILIZATION_FULL
}
pub fn adjust_shard_count_max(&mut self, shard_count: u32) {

View File

@@ -81,16 +81,17 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
)
}
#[async_trait::async_trait]
pub trait Handler<IO> {
/// Handle single query.
/// postgres_backend will issue ReadyForQuery after calling this (this
/// might be not what we want after CopyData streaming, but currently we don't
/// care). It will also flush out the output buffer.
fn process_query(
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> impl Future<Output = Result<(), QueryError>>;
) -> Result<(), QueryError>;
/// Called on startup packet receival, allows to process params.
///

View File

@@ -23,6 +23,7 @@ async fn make_tcp_pair() -> (TcpStream, TcpStream) {
struct TestHandler {}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> Handler<IO> for TestHandler {
// return single col 'hey' for any query
async fn process_query(

View File

@@ -121,7 +121,6 @@ fn main() -> anyhow::Result<()> {
.allowlist_type("XLogPageHeaderData")
.allowlist_type("XLogLongPageHeaderData")
.allowlist_var("XLOG_PAGE_MAGIC")
.allowlist_var("PG_MAJORVERSION_NUM")
.allowlist_var("PG_CONTROL_FILE_SIZE")
.allowlist_var("PG_CONTROLFILEDATA_OFFSETOF_CRC")
.allowlist_type("PageHeaderData")

View File

@@ -44,9 +44,6 @@ macro_rules! postgres_ffi {
// Re-export some symbols from bindings
pub use bindings::DBState_DB_SHUTDOWNED;
pub use bindings::{CheckPoint, ControlFileData, XLogRecord};
pub const ZERO_CHECKPOINT: bytes::Bytes =
bytes::Bytes::from_static(&[0u8; xlog_utils::SIZEOF_CHECKPOINT]);
}
};
}
@@ -109,107 +106,6 @@ macro_rules! dispatch_pgversion {
};
}
#[macro_export]
macro_rules! enum_pgversion_dispatch {
($name:expr, $typ:ident, $bind:ident, $code:block) => {
enum_pgversion_dispatch!(
name = $name,
bind = $bind,
typ = $typ,
code = $code,
pgversions = [
V14 : v14,
V15 : v15,
V16 : v16,
]
)
};
(name = $name:expr,
bind = $bind:ident,
typ = $typ:ident,
code = $code:block,
pgversions = [$($variant:ident : $md:ident),+ $(,)?]) => {
match $name {
$(
self::$typ::$variant($bind) => {
use $crate::$md as pgv;
$code
}
),+,
}
};
}
#[macro_export]
macro_rules! enum_pgversion {
{$name:ident, pgv :: $t:ident} => {
enum_pgversion!{
name = $name,
typ = $t,
pgversions = [
V14 : v14,
V15 : v15,
V16 : v16,
]
}
};
{$name:ident, pgv :: $p:ident :: $t:ident} => {
enum_pgversion!{
name = $name,
path = $p,
typ = $t,
pgversions = [
V14 : v14,
V15 : v15,
V16 : v16,
]
}
};
{name = $name:ident,
typ = $t:ident,
pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
pub enum $name {
$($variant ( $crate::$md::$t )),+
}
impl self::$name {
pub fn pg_version(&self) -> u32 {
enum_pgversion_dispatch!(self, $name, _ign, {
pgv::bindings::PG_MAJORVERSION_NUM
})
}
}
$(
impl Into<self::$name> for $crate::$md::$t {
fn into(self) -> self::$name {
self::$name::$variant (self)
}
}
)+
};
{name = $name:ident,
path = $p:ident,
typ = $t:ident,
pgversions = [$($variant:ident : $md:ident),+ $(,)?]} => {
pub enum $name {
$($variant ($crate::$md::$p::$t)),+
}
impl $name {
pub fn pg_version(&self) -> u32 {
enum_pgversion_dispatch!(self, $name, _ign, {
pgv::bindings::PG_MAJORVERSION_NUM
})
}
}
$(
impl Into<$name> for $crate::$md::$p::$t {
fn into(self) -> $name {
$name::$variant (self)
}
}
)+
};
}
pub mod pg_constants;
pub mod relfile_utils;

View File

@@ -185,7 +185,7 @@ mod tests {
use super::*;
fn parse(input: &str) -> anyhow::Result<RemoteStorageConfig> {
let toml = input.parse::<toml_edit::DocumentMut>().unwrap();
let toml = input.parse::<toml_edit::Document>().unwrap();
RemoteStorageConfig::from_toml(toml.as_item())
}

View File

@@ -3,9 +3,11 @@ use std::str::FromStr;
use anyhow::Context;
use metrics::{IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
use strum_macros::{EnumString, VariantNames};
use strum_macros::{EnumString, EnumVariantNames};
#[derive(EnumString, strum_macros::Display, VariantNames, Eq, PartialEq, Debug, Clone, Copy)]
#[derive(
EnumString, strum_macros::Display, EnumVariantNames, Eq, PartialEq, Debug, Clone, Copy,
)]
#[strum(serialize_all = "snake_case")]
pub enum LogFormat {
Plain,

View File

@@ -10,7 +10,7 @@ pub fn deserialize_item<T>(item: &toml_edit::Item) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let document: toml_edit::DocumentMut = match item {
let document: toml_edit::Document = match item {
toml_edit::Item::Table(toml) => toml.clone().into(),
toml_edit::Item::Value(toml_edit::Value::InlineTable(toml)) => {
toml.clone().into_table().into()

View File

@@ -142,11 +142,16 @@ impl PagestreamClient {
) -> anyhow::Result<PagestreamGetPageResponse> {
let req = PagestreamFeMessage::GetPage(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.copy_both.send_all(&mut req).await?;
for i in 0..10 {
let mut req = tokio_stream::once(Ok(req.clone()));
self.copy_both.send_all(&mut req).await?;
}
for i in 0..9 {
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?;
}
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next: bytes::Bytes = next.unwrap()?;

View File

@@ -174,7 +174,7 @@ async fn main() -> anyhow::Result<()> {
println!("specified prefix '{}' failed validation", cmd.prefix);
return Ok(());
};
let toml_document = toml_edit::DocumentMut::from_str(&cmd.config_toml_str)?;
let toml_document = toml_edit::Document::from_str(&cmd.config_toml_str)?;
let toml_item = toml_document
.get("remote_storage")
.expect("need remote_storage");

View File

@@ -9,7 +9,7 @@ use metrics::{
use once_cell::sync::Lazy;
use pageserver_api::shard::TenantShardId;
use strum::{EnumCount, VariantNames};
use strum_macros::{IntoStaticStr, VariantNames};
use strum_macros::{EnumVariantNames, IntoStaticStr};
use tracing::warn;
use utils::id::TimelineId;
@@ -27,7 +27,7 @@ const CRITICAL_OP_BUCKETS: &[f64] = &[
];
// Metrics collected on operations on the storage repository.
#[derive(Debug, VariantNames, IntoStaticStr)]
#[derive(Debug, EnumVariantNames, IntoStaticStr)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum StorageTimeOperation {
#[strum(serialize = "layer flush")]
@@ -1185,6 +1185,7 @@ struct GlobalAndPerTimelineHistogramTimer<'a, 'c> {
ctx: &'c RequestContext,
start: std::time::Instant,
op: SmgrQueryType,
count: usize,
}
impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
@@ -1212,9 +1213,11 @@ impl<'a, 'c> Drop for GlobalAndPerTimelineHistogramTimer<'a, 'c> {
elapsed
}
};
self.global_metric.observe(ex_throttled.as_secs_f64());
if let Some(timeline_metric) = self.timeline_metric {
timeline_metric.observe(ex_throttled.as_secs_f64());
for _ in 0..self.count {
self.global_metric.observe(ex_throttled.as_secs_f64());
if let Some(timeline_metric) = self.timeline_metric {
timeline_metric.observe(ex_throttled.as_secs_f64());
}
}
}
}
@@ -1343,6 +1346,14 @@ impl SmgrQueryTimePerTimeline {
&'a self,
op: SmgrQueryType,
ctx: &'c RequestContext,
) -> Option<impl Drop + '_> {
self.start_timer_many(op, 1, ctx)
}
pub(crate) fn start_timer_many<'c: 'a, 'a>(
&'a self,
op: SmgrQueryType,
count: usize,
ctx: &'c RequestContext,
) -> Option<impl Drop + '_> {
let global_metric = &self.global_metrics[op as usize];
let start = Instant::now();
@@ -1376,6 +1387,7 @@ impl SmgrQueryTimePerTimeline {
ctx,
start,
op,
count,
})
}
}
@@ -3170,6 +3182,16 @@ static TOKIO_EXECUTOR_THREAD_COUNT: Lazy<UIntGaugeVec> = Lazy::new(|| {
.unwrap()
});
pub(crate) static CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM: Lazy<Histogram> =
Lazy::new(|| {
register_histogram!(
"pageserver_consecutive_nonblocking_getpage_requests",
"Number of consecutive nonblocking getpage requests",
(0..=256).map(|x| x as f64).collect::<Vec<f64>>(),
)
.unwrap()
});
pub(crate) fn set_tokio_runtime_setup(setup: &str, num_threads: NonZeroUsize) {
static SERIALIZE: std::sync::Mutex<()> = std::sync::Mutex::new(());
let _guard = SERIALIZE.lock().unwrap();

View File

@@ -5,14 +5,14 @@ use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use futures::FutureExt;
use once_cell::sync::OnceCell;
use pageserver_api::models::TenantState;
use once_cell::sync::{Lazy, OnceCell};
use pageserver_api::models::{self, TenantState};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamDbSizeRequest, PagestreamDbSizeResponse,
PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest,
PagestreamNblocksResponse, PagestreamProtocolVersion,
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest,
PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse,
PagestreamProtocolVersion,
};
use pageserver_api::shard::TenantShardId;
use postgres_backend::{is_expected_io_error, AuthType, PostgresBackend, QueryError};
@@ -43,7 +43,7 @@ use crate::basebackup;
use crate::basebackup::BasebackupError;
use crate::config::PageServerConf;
use crate::context::{DownloadBehavior, RequestContext};
use crate::metrics;
use crate::metrics::{self, CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM};
use crate::metrics::{ComputeCommandKind, COMPUTE_COMMANDS_COUNTERS, LIVE_CONNECTIONS};
use crate::pgdatadir_mapping::Version;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
@@ -58,7 +58,7 @@ use crate::tenant::GetTimelineError;
use crate::tenant::PageReconstructError;
use crate::tenant::Timeline;
use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::SlruKind;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::BLCKSZ;
@@ -577,124 +577,317 @@ impl PageServerHandler {
}
}
loop {
// read request bytes (it's exactly 1 PagestreamFeMessage per CopyData)
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
}
msg = pgb.read_message() => { msg }
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break,
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => break, // client disconnected
};
let mut batched = None;
'outer: loop {
enum DebouncedFeMessage {
Exists(models::PagestreamExistsRequest),
Nblocks(models::PagestreamNblocksRequest),
GetPage {
span: Span,
shard: timeline::handle::Handle<TenantManagerTypes>,
effective_request_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
},
DbSize(models::PagestreamDbSizeRequest),
GetSlruSegment(models::PagestreamGetSlruSegmentRequest),
RespondError(Span, PageStreamError),
}
let mut debounce: Option<std::time::Instant> = None;
// return or `?` on protocol error
// `break EXPR` to stop batching. The EXPR will be the first message in the next batch.
let next_batched: Option<DebouncedFeMessage> = loop {
static BOUNCE_TIMEOUT: Lazy<Duration> = Lazy::new(|| {
utils::env::var::<humantime::Duration, _>("NEON_PAGESERVER_DEBOUNCE")
.unwrap()
.into()
});
let sleep_fut = if let Some(started_at) = debounce {
futures::future::Either::Left(tokio::time::sleep_until(
(started_at + *BOUNCE_TIMEOUT).into(),
))
} else {
futures::future::Either::Right(futures::future::pending())
};
let msg = tokio::select! {
biased;
_ = self.cancel.cancelled() => {
return Err(QueryError::Shutdown)
}
msg = pgb.read_message() => {
msg
}
_ = sleep_fut => {
assert!(batched.is_some());
break None;
}
};
let copy_data_bytes = match msg? {
Some(FeMessage::CopyData(bytes)) => bytes,
Some(FeMessage::Terminate) => break 'outer,
Some(m) => {
return Err(QueryError::Other(anyhow::anyhow!(
"unexpected message: {m:?} during COPY"
)));
}
None => break 'outer, // client disconnected
};
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
trace!("query: {copy_data_bytes:?}");
fail::fail_point!("ps::handle-pagerequest-message");
// parse request
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
// parse request
let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?;
let this_msg = match neon_fe_msg {
PagestreamFeMessage::Exists(msg) => DebouncedFeMessage::Exists(msg),
PagestreamFeMessage::Nblocks(msg) => DebouncedFeMessage::Nblocks(msg),
PagestreamFeMessage::DbSize(msg) => DebouncedFeMessage::DbSize(msg),
PagestreamFeMessage::GetSlruSegment(msg) => {
DebouncedFeMessage::GetSlruSegment(msg)
}
PagestreamFeMessage::GetPage(PagestreamGetPageRequest {
request_lsn,
not_modified_since,
rel,
blkno,
}) => {
let span = tracing::info_span!("handle_get_page_at_lsn_request_batched", %tenant_id, %timeline_id, shard_id = tracing::field::Empty, req_lsn = %request_lsn, batch_size = tracing::field::Empty);
let key = rel_block_to_key(rel, blkno);
let shard = match self
.timeline_handles
.get(tenant_id, timeline_id, ShardSelector::Page(key))
.instrument(span.clone())
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(
GetActiveTenantError::NotFound(_),
)) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
break Some(DebouncedFeMessage::RespondError(
span,
PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
),
));
}
Err(e) => break Some(DebouncedFeMessage::RespondError(span, e.into())),
};
let effective_request_lsn = match Self::wait_or_get_last_lsn(
&shard,
request_lsn,
not_modified_since,
&shard.get_latest_gc_cutoff_lsn(),
&ctx,
)
// TODO: if we actually need to wait for lsn here, it delays the entire batch which doesn't need to wait
.await
{
Ok(lsn) => lsn,
Err(e) => {
break Some(DebouncedFeMessage::RespondError(span, e));
}
};
DebouncedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages: smallvec::smallvec![(rel, blkno)],
}
}
};
// check if we can debounce
match (&mut batched, this_msg) {
(None, this_msg) => {
batched = Some(this_msg);
}
(
Some(DebouncedFeMessage::GetPage {
span: _,
shard: accum_shard,
pages: accum_pages,
effective_request_lsn: accum_lsn,
}),
DebouncedFeMessage::GetPage {
span: _,
shard: this_shard,
pages: this_pages,
effective_request_lsn: this_lsn,
},
) if async {
assert_eq!(this_pages.len(), 1);
if accum_pages.len() >= Timeline::MAX_GET_VECTORED_KEYS as usize {
assert_eq!(accum_pages.len(), Timeline::MAX_GET_VECTORED_KEYS as usize);
return false;
}
if (accum_shard.tenant_shard_id, accum_shard.timeline_id)
!= (this_shard.tenant_shard_id, this_shard.timeline_id)
{
// TODO: we _could_ batch & execute each shard seperately (and in parallel).
// But the current logig for keeping responses in order does not support that.
return false;
}
// the vectored get currently only supports a single LSN, so, bounce as soon
// as the effective request_lsn changes
return *accum_lsn == this_lsn;
}
.await =>
{
// ok to batch
accum_pages.extend(this_pages);
}
(Some(_), this_msg) => {
// by default, don't continue batching
break Some(this_msg);
}
}
// debounce impl piece
let started_at = debounce.get_or_insert_with(Instant::now);
if started_at.elapsed() > *BOUNCE_TIMEOUT {
break None;
}
};
// invoke handler function
let (handler_result, span) = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let (handler_results, span): (
smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]>,
_,
) = match batched.take().expect("loop above ensures this") {
DebouncedFeMessage::Exists(req) => {
fail::fail_point!("ps::handle-pagerequest-message::exists");
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
smallvec::smallvec![
self.handle_get_rel_exists_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await
],
span,
)
}
PagestreamFeMessage::Nblocks(req) => {
DebouncedFeMessage::Nblocks(req) => {
fail::fail_point!("ps::handle-pagerequest-message::nblocks");
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.request_lsn);
(
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
smallvec::smallvec![
self.handle_get_nblocks_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
],
span,
)
}
PagestreamFeMessage::GetPage(req) => {
DebouncedFeMessage::GetPage {
span,
shard,
effective_request_lsn,
pages,
} => {
CONSECUTIVE_NONBLOCKING_GETPAGE_REQUESTS_HISTOGRAM.observe(pages.len() as f64);
span.record("batch_size", pages.len() as u64);
fail::fail_point!("ps::handle-pagerequest-message::getpage");
// shard_id is filled in by the handler
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.request_lsn);
(
self.handle_get_page_at_lsn_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
{
let npages = pages.len();
let res = self
.handle_get_page_at_lsn_request_batched(
&shard,
effective_request_lsn,
pages,
&ctx,
)
.instrument(span.clone())
.await;
assert_eq!(res.len(), npages);
res
},
span,
)
}
PagestreamFeMessage::DbSize(req) => {
DebouncedFeMessage::DbSize(req) => {
fail::fail_point!("ps::handle-pagerequest-message::dbsize");
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.request_lsn);
(
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await,
smallvec::smallvec![
self.handle_db_size_request(tenant_id, timeline_id, &req, &ctx)
.instrument(span.clone())
.await
],
span,
)
}
PagestreamFeMessage::GetSlruSegment(req) => {
DebouncedFeMessage::GetSlruSegment(req) => {
fail::fail_point!("ps::handle-pagerequest-message::slrusegment");
let span = tracing::info_span!("handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.request_lsn);
(
self.handle_get_slru_segment_request(tenant_id, timeline_id, &req, &ctx)
smallvec::smallvec![
self.handle_get_slru_segment_request(
tenant_id,
timeline_id,
&req,
&ctx
)
.instrument(span.clone())
.await,
.await
],
span,
)
}
DebouncedFeMessage::RespondError(span, e) => {
// We've already decided to respond with an error, so we don't need to
// call the handler.
(smallvec::smallvec![Err(e)], span)
}
};
// Map handler result to protocol behavior.
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
let response_msg = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
let full = utils::error::report_compact_sources(&e);
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
}
},
Ok(response_msg) => response_msg,
};
for handler_result in handler_results {
let response_msg = match handler_result {
Err(e) => match &e {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
// shutdown, then do not send the error to the client. Instead just drop the
// connection.
span.in_scope(|| info!("dropping connection due to shutdown"));
return Err(QueryError::Shutdown);
}
PageStreamError::Reconnect(reason) => {
span.in_scope(|| info!("handler requested reconnect: {reason}"));
return Err(QueryError::Reconnect);
}
PageStreamError::Read(_)
| PageStreamError::LsnTimeout(_)
| PageStreamError::NotFound(_)
| PageStreamError::BadRequest(_) => {
// print the all details to the log with {:#}, but for the client the
// error message is enough. Do not log if shutting down, as the anyhow::Error
// here includes cancellation which is not an error.
let full = utils::error::report_compact_sources(&e);
span.in_scope(|| {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
}
},
Ok(response_msg) => response_msg,
};
// marshal & transmit response message
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
// marshal & transmit response message
pgb.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?;
}
tokio::select! {
biased;
_ = self.cancel.cancelled() => {
@@ -706,6 +899,9 @@ impl PageServerHandler {
res?;
}
}
assert!(batched.is_none(), "we take() earlier");
batched = next_batched;
}
Ok(())
}
@@ -949,60 +1145,30 @@ impl PageServerHandler {
}))
}
#[instrument(skip_all, fields(shard_id))]
async fn handle_get_page_at_lsn_request(
#[instrument(skip_all)]
async fn handle_get_page_at_lsn_request_batched(
&mut self,
tenant_id: TenantId,
timeline_id: TimelineId,
req: &PagestreamGetPageRequest,
timeline: &Timeline,
effective_lsn: Lsn,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
ctx: &RequestContext,
) -> Result<PagestreamBeMessage, PageStreamError> {
let timeline = match self
.timeline_handles
.get(
tenant_id,
timeline_id,
ShardSelector::Page(rel_block_to_key(req.rel, req.blkno)),
)
.await
{
Ok(tl) => tl,
Err(GetActiveTimelineError::Tenant(GetActiveTenantError::NotFound(_))) => {
// We already know this tenant exists in general, because we resolved it at
// start of connection. Getting a NotFound here indicates that the shard containing
// the requested page is not present on this node: the client's knowledge of shard->pageserver
// mapping is out of date.
//
// Closing the connection by returning ``::Reconnect` has the side effect of rate-limiting above message, via
// client's reconnect backoff, as well as hopefully prompting the client to load its updated configuration
// and talk to a different pageserver.
return Err(PageStreamError::Reconnect(
"getpage@lsn request routed to wrong shard".into(),
));
}
Err(e) => return Err(e.into()),
};
let _timer = timeline
.query_metrics
.start_timer(metrics::SmgrQueryType::GetPageAtLsn, ctx);
let latest_gc_cutoff_lsn = timeline.get_latest_gc_cutoff_lsn();
let lsn = Self::wait_or_get_last_lsn(
&timeline,
req.request_lsn,
req.not_modified_since,
&latest_gc_cutoff_lsn,
) -> smallvec::SmallVec<[Result<PagestreamBeMessage, PageStreamError>; 1]> {
debug_assert_current_span_has_tenant_and_timeline_id();
let _timer = timeline.query_metrics.start_timer_many(
metrics::SmgrQueryType::GetPageAtLsn,
pages.len(),
ctx,
)
.await?;
);
let page = timeline
.get_rel_page_at_lsn(req.rel, req.blkno, Version::Lsn(lsn), ctx)
.await?;
let pages = timeline
.get_rel_page_at_lsn_batched(pages, Version::Lsn(effective_lsn), ctx)
.await;
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page,
smallvec::SmallVec::from_iter(pages.into_iter().map(|page| {
page.map(|page| {
PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { page })
})
.map_err(PageStreamError::Read)
}))
}
@@ -1199,6 +1365,7 @@ impl PageServerHandler {
}
}
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,
@@ -1498,3 +1665,10 @@ fn set_tracing_field_shard_id(timeline: &Timeline) {
);
debug_assert_current_span_has_tenant_and_timeline_id();
}
struct WaitedForLsn(Lsn);
impl From<WaitedForLsn> for Lsn {
fn from(WaitedForLsn(lsn): WaitedForLsn) -> Self {
lsn
}
}

View File

@@ -9,12 +9,17 @@
use super::tenant::{PageReconstructError, Timeline};
use crate::context::RequestContext;
use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::span::{
debug_assert_current_span_has_tenant_and_timeline_id,
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id,
};
use crate::tenant::timeline::GetVectoredError;
use crate::walrecord::NeonWalRecord;
use crate::{aux_file, repository::*};
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use itertools::Itertools;
use pageserver_api::key::{
dbdir_key_range, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key,
relmap_file_key, repl_origin_key, repl_origin_key_range, slru_block_to_key, slru_dir_to_key,
@@ -28,7 +33,7 @@ use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, RepOriginId, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
use std::collections::{hash_map, HashMap, HashSet};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
@@ -191,26 +196,184 @@ impl Timeline {
version: Version<'_>,
ctx: &RequestContext,
) -> Result<Bytes, PageReconstructError> {
if tag.relnode == 0 {
return Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
));
}
let pages = smallvec::smallvec![(tag, blknum)];
let res = self.get_rel_page_at_lsn_batched(pages, version, ctx).await;
assert_eq!(res.len(), 1);
res.into_iter().next().unwrap()
}
let nblocks = self.get_rel_size(tag, version, ctx).await?;
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
return Ok(ZERO_PAGE.clone());
/// Like [`get_rel_page_at_lsn`], but returns a batch of pages.
pub(crate) async fn get_rel_page_at_lsn_batched(
&self,
pages: smallvec::SmallVec<[(RelTag, BlockNumber); 1]>,
version: Version<'_>,
ctx: &RequestContext,
) -> smallvec::SmallVec<[Result<Bytes, PageReconstructError>; 1]> {
debug_assert_current_span_has_tenant_and_timeline_id();
let request_lsn = match version {
Version::Lsn(lsn) => lsn,
Version::Modified(_) => panic!("unsupported"),
};
enum KeyState {
NeedsVectoredGet,
Done(Result<Bytes, PageReconstructError>),
}
let mut key_states = BTreeMap::new();
let mut vectored_gets: smallvec::SmallVec<[_; 1]> =
smallvec::SmallVec::with_capacity(pages.len());
for (response_order, (tag, blknum)) in pages.into_iter().enumerate() {
let key = rel_block_to_key(tag, blknum);
use std::collections::btree_map::Entry;
let key_state_slot = match key_states.entry((key, response_order)) {
Entry::Occupied(_entry) => unreachable!(
"enumerate makes keys unique, even if batch contains same key twice"
),
Entry::Vacant(entry) => entry,
};
let key = rel_block_to_key(tag, blknum);
version.get(self, key, ctx).await
if tag.relnode == 0 {
key_state_slot.insert(KeyState::Done(Err(PageReconstructError::Other(
RelationError::InvalidRelnode.into(),
))));
continue;
}
let nblocks = match self.get_rel_size(tag, version, ctx).await {
Ok(nblocks) => nblocks,
Err(err) => {
key_state_slot.insert(KeyState::Done(Err(err)));
continue;
}
};
if blknum >= nblocks {
debug!(
"read beyond EOF at {} blk {} at {}, size is {}: returning all-zeros page",
tag,
blknum,
version.get_lsn(),
nblocks
);
key_state_slot.insert(KeyState::Done(Ok(ZERO_PAGE.clone())));
continue;
}
vectored_gets.push(key);
key_state_slot.insert(KeyState::NeedsVectoredGet);
}
// turn vectored_gets into a keyspace
let keyspace = {
// add_key reuqires monotonicity
vectored_gets.sort_unstable();
let mut acc = KeySpaceAccum::new();
for key in vectored_gets
.into_iter()
// in fact it requires strong monotonicity
.dedup()
{
acc.add_key(key);
}
acc.to_keyspace()
};
match self.get_vectored(keyspace, request_lsn, ctx).await {
Ok(results) => {
for (key, res) in results {
if let Err(err) = &res {
warn!(%key, ?err, "a key inside get_vectored failed with a per-key error");
}
let mut interests = key_states.range_mut((key, 0)..(key.next(), 0)).peekable();
let first_interest = interests.next().unwrap();
let next_interest = interests.peek().is_some();
if !next_interest {
match first_interest.1 {
KeyState::NeedsVectoredGet => {
*first_interest.1 = KeyState::Done(res);
}
KeyState::Done(_) => unreachable!(),
}
continue;
} else {
for ((_, _), state) in [first_interest].into_iter().chain(interests) {
match state {
KeyState::NeedsVectoredGet => {
*state = KeyState::Done(match &res {
Ok(buf) => Ok(buf.clone()),
// this `match` is working around the fact that we cannot Clone the PageReconstructError
Err(err) => Err(match err {
PageReconstructError::Cancelled => {
PageReconstructError::Cancelled
}
x @ PageReconstructError::Other(_) |
x @ PageReconstructError::AncestorLsnTimeout(_) |
x @ PageReconstructError::WalRedo(_) |
x @ PageReconstructError::MissingKey(_) => {
PageReconstructError::Other(anyhow::anyhow!("there was more than one request for this key in the batch, error logged once: {x:?}"))
},
}),
});
}
KeyState::Done(_) => unreachable!(),
}
}
}
}
}
Err(err) => {
warn!(?err, "get_vectored failed with a global error, mapping that error to per-key failure");
// this cannot really happen because get_vectored only errors globally on invalid LSN or too large batch size
for ((_, _), state) in key_states.iter_mut() {
// this whole `match` is a lot like `From<GetVectoredError> for PageReconstructError`
// but without taking ownership of the GetVectoredError
match &err {
GetVectoredError::Cancelled => {
*state = KeyState::Done(Err(PageReconstructError::Cancelled));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::MissingKey(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more of the requested keys were missing: {err:?}"))));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::GetReadyAncestorError(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(anyhow::anyhow!("whole vectored get request failed because one or more key required ancestor that wasn't ready: {err:?}"))));
}
// TODO: restructure get_vectored API to make this error per-key
GetVectoredError::Other(err) => {
*state = KeyState::Done(Err(PageReconstructError::Other(
anyhow::anyhow!("whole vectored get request failed: {err:?}"),
)));
}
// TODO: we can prevent this error class by moving this check into the type system
GetVectoredError::InvalidLsn(e) => {
*state =
KeyState::Done(Err(anyhow::anyhow!("invalid LSN: {e:?}").into()));
}
// NB: this should never happen in practice because we limit MAX_GET_VECTORED_KEYS
// TODO: we can prevent this error class by moving this check into the type system
GetVectoredError::Oversized(err) => {
*state = KeyState::Done(Err(anyhow::anyhow!(
"batching oversized: {err:?}"
)
.into()));
}
}
}
}
};
// get the results into the order in which they were requested
let mut return_order: smallvec::SmallVec<[_; Timeline::MAX_GET_VECTORED_KEYS as usize]> =
smallvec::SmallVec::with_capacity(key_states.len());
return_order.extend(key_states.keys().map(|(key, idx)| (*key, *idx)));
return_order.sort_unstable_by_key(|(_, idx)| *idx);
let mut res = smallvec::SmallVec::with_capacity(key_states.len());
res.extend(return_order.into_iter().map(|key_states_key| {
match key_states.remove(&key_states_key).unwrap() {
KeyState::Done(res) => res,
KeyState::NeedsVectoredGet => unreachable!(),
}
}));
res
}
// Get size of a database in blocks
@@ -1205,13 +1368,6 @@ impl<'a> DatadirModification<'a> {
img: Bytes,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
}
self.put(rel_block_to_key(rel, blknum), Value::Image(img));
Ok(())
}
@@ -1223,34 +1379,14 @@ impl<'a> DatadirModification<'a> {
blknum: BlockNumber,
img: Bytes,
) -> anyhow::Result<()> {
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver at {}",
key
);
}
self.put(key, Value::Image(img));
self.put(slru_block_to_key(kind, segno, blknum), Value::Image(img));
Ok(())
}
pub(crate) fn put_rel_page_image_zero(
&mut self,
rel: RelTag,
blknum: BlockNumber,
) -> anyhow::Result<()> {
anyhow::ensure!(rel.relnode != 0, RelationError::InvalidRelnode);
let key = rel_block_to_key(rel, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
}
self.pending_zero_data_pages.insert(key.to_compact());
pub(crate) fn put_rel_page_image_zero(&mut self, rel: RelTag, blknum: BlockNumber) {
self.pending_zero_data_pages
.insert(rel_block_to_key(rel, blknum).to_compact());
self.pending_bytes += ZERO_PAGE.len();
Ok(())
}
pub(crate) fn put_slru_page_image_zero(
@@ -1258,18 +1394,10 @@ impl<'a> DatadirModification<'a> {
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
) -> anyhow::Result<()> {
let key = slru_block_to_key(kind, segno, blknum);
if !key.is_valid_key_on_write_path() {
anyhow::bail!(
"the request contains data not supported by pageserver: {} @ {}",
key,
self.lsn
);
}
self.pending_zero_data_pages.insert(key.to_compact());
) {
self.pending_zero_data_pages
.insert(slru_block_to_key(kind, segno, blknum).to_compact());
self.pending_bytes += ZERO_PAGE.len();
Ok(())
}
/// Call this at the end of each WAL record.

View File

@@ -73,6 +73,21 @@ impl ValueBytes {
Ok(raw[8] == 1)
}
pub(crate) fn is_image(raw: &[u8]) -> Result<bool, InvalidInput> {
if raw.len() < 12 {
return Err(InvalidInput::TooShortValue);
}
let value_discriminator = &raw[0..4];
if value_discriminator == [0, 0, 0, 0] {
// Value::Image always initializes
return Ok(true);
}
Ok(false)
}
}
#[cfg(test)]

View File

@@ -7091,13 +7091,13 @@ mod tests {
vec![
// Image layer at GC horizon
PersistentLayerKey {
key_range: Key::MIN..Key::MAX,
key_range: Key::MIN..Key::NON_L0_MAX,
lsn_range: Lsn(0x30)..Lsn(0x31),
is_delta: false
},
// The delta layer below the horizon
// The delta layer covers the full range (with the layer key hack to avoid being recognized as L0)
PersistentLayerKey {
key_range: get_key(3)..get_key(4),
key_range: Key::MIN..Key::NON_L0_MAX,
lsn_range: Lsn(0x30)..Lsn(0x48),
is_delta: true
},

View File

@@ -452,8 +452,7 @@ impl TryFrom<toml_edit::Item> for TenantConfOpt {
.map_err(|e| anyhow::anyhow!("{}: {}", e.path(), e.inner().message()));
}
toml_edit::Item::Table(table) => {
let deserializer =
toml_edit::de::Deserializer::from(toml_edit::DocumentMut::from(table));
let deserializer = toml_edit::de::Deserializer::new(table.into());
return serde_path_to_error::deserialize(deserializer)
.map_err(|e| anyhow::anyhow!("{}: {}", e.path(), e.inner().message()));
}

View File

@@ -8,15 +8,17 @@ mod layer_desc;
mod layer_name;
pub mod merge_iterator;
use tokio::sync::{self};
use utils::bin_ser::BeSer;
pub mod split_writer;
use crate::context::{AccessStatsBehavior, RequestContext};
use crate::repository::Value;
use crate::repository::{Value, ValueBytes};
use crate::walrecord::NeonWalRecord;
use bytes::Bytes;
use pageserver_api::key::Key;
use pageserver_api::key::{Key, DBDIR_KEY};
use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum};
use std::cmp::{Ordering, Reverse};
use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{BinaryHeap, HashMap};
use std::ops::Range;
@@ -79,12 +81,18 @@ pub(crate) enum ValueReconstructSituation {
}
/// Reconstruct data accumulated for a single key during a vectored get
#[derive(Debug, Default, Clone)]
#[derive(Debug, Default)]
pub(crate) struct VectoredValueReconstructState {
pub(crate) records: Vec<(Lsn, NeonWalRecord)>,
pub(crate) img: Option<(Lsn, Bytes)>,
pub(crate) records: Vec<(
Lsn,
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
)>,
pub(crate) img: Option<(
Lsn,
tokio::sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
)>,
situation: ValueReconstructSituation,
pub(crate) situation: ValueReconstructSituation,
}
impl VectoredValueReconstructState {
@@ -93,16 +101,57 @@ impl VectoredValueReconstructState {
}
}
impl From<VectoredValueReconstructState> for ValueReconstructState {
fn from(mut state: VectoredValueReconstructState) -> Self {
// walredo expects the records to be descending in terms of Lsn
state.records.sort_by_key(|(lsn, _)| Reverse(*lsn));
pub(crate) async fn convert(
_key: Key,
from: VectoredValueReconstructState,
) -> Result<ValueReconstructState, PageReconstructError> {
let mut to = ValueReconstructState::default();
ValueReconstructState {
records: state.records,
img: state.img,
for (lsn, fut) in from.records {
match fut.await {
Ok(res) => match res {
Ok(bytes) => {
let value = Value::des(&bytes)
.map_err(|err| PageReconstructError::Other(err.into()))?;
match value {
Value::WalRecord(rec) => {
to.records.push((lsn, rec));
},
Value::Image(img) => {
assert!(to.img.is_none());
to.img = Some((lsn, img));
}
}
}
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
},
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
}
}
if to.img.is_none() {
let (lsn, fut) = from.img.expect("Need an image");
match fut.await {
Ok(res) => match res {
Ok(bytes) => {
to.img = Some((lsn, bytes));
}
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
},
Err(err) => {
return Err(PageReconstructError::Other(err.into()));
}
}
}
Ok(to)
}
/// Bag of data accumulated during a vectored get..
@@ -200,7 +249,8 @@ impl ValuesReconstructState {
&mut self,
key: &Key,
lsn: Lsn,
value: Value,
completes: bool,
value: sync::oneshot::Receiver<Result<Bytes, std::io::Error>>,
) -> ValueReconstructSituation {
let state = self
.keys
@@ -208,31 +258,14 @@ impl ValuesReconstructState {
.or_insert(Ok(VectoredValueReconstructState::default()));
if let Ok(state) = state {
let key_done = match state.situation {
match state.situation {
ValueReconstructSituation::Complete => unreachable!(),
ValueReconstructSituation::Continue => match value {
Value::Image(img) => {
state.img = Some((lsn, img));
true
}
Value::WalRecord(rec) => {
debug_assert!(
Some(lsn) > state.get_cached_lsn(),
"Attempt to collect a record below cached LSN for walredo: {} < {}",
lsn,
state
.get_cached_lsn()
.expect("Assertion can only fire if a cached lsn is present")
);
ValueReconstructSituation::Continue => {
state.records.push((lsn, value));
}
}
let will_init = rec.will_init();
state.records.push((lsn, rec));
will_init
}
},
};
if key_done && state.situation == ValueReconstructSituation::Continue {
if completes && state.situation == ValueReconstructSituation::Continue {
state.situation = ValueReconstructSituation::Complete;
self.keys_done.add_key(*key);
}

View File

@@ -42,13 +42,12 @@ use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead,
VectoredReadCoalesceMode, VectoredReadPlanner,
};
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::{FullSlice, IoBufExt};
use crate::virtual_file::{self, VirtualFile};
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use anyhow::{bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
@@ -58,14 +57,14 @@ use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::OnceCell;
use tokio::sync::{self, OnceCell};
use tokio_epoll_uring::IoBuf;
use tracing::*;
@@ -224,7 +223,7 @@ pub struct DeltaLayerInner {
index_start_blk: u32,
index_root_blk: u32,
file: VirtualFile,
file: Arc<VirtualFile>,
file_id: FileId,
layer_key_range: Range<Key>,
@@ -788,9 +787,11 @@ impl DeltaLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
.await
.context("open layer file")?;
let file = Arc::new(
VirtualFile::open(path, ctx)
.await
.context("open layer file")?,
);
let file_id = page_cache::next_file_id();
@@ -980,77 +981,59 @@ impl DeltaLayerInner {
reconstruct_state: &mut ValuesReconstructState,
ctx: &RequestContext,
) {
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
let mut ignore_key_with_err = None;
let max_vectored_read_bytes = self
.max_vectored_read_bytes
.expect("Layer is loaded with max vectored bytes config")
.0
.into();
let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
let mut buf = Some(BytesMut::with_capacity(buf_size));
// Note that reads are processed in reverse order (from highest key+lsn).
// This is the order that `ReconstructState` requires such that it can
// track when a key is done.
for read in reads.into_iter().rev() {
let res = vectored_blob_reader
.read_blobs(&read, buf.take().expect("Should have a buffer"), ctx)
.await;
let blobs_buf = match res {
Ok(blobs_buf) => blobs_buf,
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::Other(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
kind
)),
);
}
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
buf = Some(BytesMut::with_capacity(buf_size));
continue;
}
};
for meta in blobs_buf.blobs.iter().rev() {
if Some(meta.meta.key) == ignore_key_with_err {
continue;
}
let value = Value::des(&blobs_buf.buf[meta.start..meta.end]);
let value = match value {
Ok(v) => v,
Err(e) => {
reconstruct_state.on_key_error(
meta.meta.key,
PageReconstructError::Other(anyhow!(e).context(format!(
"Failed to deserialize blob from virtual file {}",
self.file.path,
))),
);
ignore_key_with_err = Some(meta.meta.key);
continue;
}
};
// Invariant: once a key reaches [`ValueReconstructSituation::Complete`]
// state, no further updates shall be made to it. The call below will
// panic if the invariant is violated.
reconstruct_state.update_key(&meta.meta.key, meta.meta.lsn, value);
let mut senders: HashMap<
(Key, Lsn),
sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
> = Default::default();
for (_, blob_meta) in read.blobs_at.as_slice() {
let (tx, rx) = sync::oneshot::channel();
senders.insert((blob_meta.key, blob_meta.lsn), tx);
reconstruct_state.update_key(
&blob_meta.key,
blob_meta.lsn,
blob_meta.will_init,
rx,
);
}
buf = Some(blobs_buf.buf);
let read_from = self.file.clone();
let read_ctx = ctx.attached_child();
tokio::task::spawn(async move {
let vectored_blob_reader = VectoredBlobReader::new(&read_from);
let buf = BytesMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
match res {
Ok(blobs_buf) => {
for meta in blobs_buf.blobs.iter().rev() {
let buf = &blobs_buf.buf[meta.start..meta.end];
let sender = senders
.remove(&(meta.meta.key, meta.meta.lsn))
.expect("sender must exist");
let _ = sender.send(Ok(Bytes::copy_from_slice(buf)));
}
assert!(senders.is_empty());
}
Err(err) => {
for (_, sender) in senders {
let _ = sender
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
}
}
}
});
}
}
@@ -1190,7 +1173,14 @@ impl DeltaLayerInner {
let actionable = if let Some((key, lsn, start_offset)) = prev.take() {
let end_offset = offset;
Some((BlobMeta { key, lsn }, start_offset..end_offset))
Some((
BlobMeta {
key,
lsn,
will_init: false,
},
start_offset..end_offset,
))
} else {
None
};

View File

@@ -21,7 +21,7 @@
//!
//! Every image layer file consists of three parts: "summary",
//! "index", and "values". The summary is a fixed size header at the
//! beginning of the file, and it contains basic information about the
//! beginningof the file, and it contains basic information about the
//! layer, and offsets to the other parts. The "index" is a B-tree,
//! mapping from Key to an offset in the "values" part. The
//! actual page images are stored in the "values" part.
@@ -38,11 +38,11 @@ use crate::tenant::timeline::GetVectoredError;
use crate::tenant::vectored_blob_io::{
BlobFlag, StreamingVectoredReadPlanner, VectoredBlobReader, VectoredRead, VectoredReadPlanner,
};
use crate::tenant::{PageReconstructError, Timeline};
use crate::tenant::Timeline;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::virtual_file::{self, VirtualFile};
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{anyhow, bail, ensure, Context, Result};
use anyhow::{bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use hex;
@@ -52,13 +52,14 @@ use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::fs::File;
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::OnceCell;
use tokio_stream::StreamExt;
use tracing::*;
@@ -163,7 +164,7 @@ pub struct ImageLayerInner {
key_range: Range<Key>,
lsn: Lsn,
file: VirtualFile,
file: Arc<VirtualFile>,
file_id: FileId,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
@@ -390,9 +391,11 @@ impl ImageLayerInner {
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
.await
.context("open layer file")?;
let file = Arc::new(
VirtualFile::open(path, ctx)
.await
.context("open layer file")?,
);
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader
@@ -579,8 +582,16 @@ impl ImageLayerInner {
.0
.into();
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
for read in reads.into_iter() {
let mut senders: HashMap<(Key, Lsn), oneshot::Sender<Result<Bytes, std::io::Error>>> =
Default::default();
for (_, blob_meta) in read.blobs_at.as_slice() {
let (tx, rx) = oneshot::channel();
senders.insert((blob_meta.key, blob_meta.lsn), tx);
reconstruct_state.update_key(&blob_meta.key, blob_meta.lsn, true, rx);
}
let buf_size = read.size();
if buf_size > max_vectored_read_bytes {
@@ -599,36 +610,33 @@ impl ImageLayerInner {
);
}
let buf = BytesMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf, ctx).await;
let read_from = self.file.clone();
let read_ctx = ctx.attached_child();
tokio::task::spawn(async move {
let buf = BytesMut::with_capacity(buf_size);
let vectored_blob_reader = VectoredBlobReader::new(&*read_from);
let res = vectored_blob_reader.read_blobs(&read, buf, &read_ctx).await;
match res {
Ok(blobs_buf) => {
let frozen_buf = blobs_buf.buf.freeze();
match res {
Ok(blobs_buf) => {
for meta in blobs_buf.blobs.iter().rev() {
let buf = &blobs_buf.buf[meta.start..meta.end];
let sender = senders
.remove(&(meta.meta.key, meta.meta.lsn))
.expect("sender must exist");
let _ = sender.send(Ok(Bytes::copy_from_slice(buf)));
}
for meta in blobs_buf.blobs.iter() {
let img_buf = frozen_buf.slice(meta.start..meta.end);
reconstruct_state.update_key(
&meta.meta.key,
self.lsn,
Value::Image(img_buf),
);
assert!(senders.is_empty());
}
Err(err) => {
for (_, sender) in senders {
let _ = sender
.send(Err(std::io::Error::new(err.kind(), "vec read failed")));
}
}
}
Err(err) => {
let kind = err.kind();
for (_, blob_meta) in read.blobs_at.as_slice() {
reconstruct_state.on_key_error(
blob_meta.key,
PageReconstructError::from(anyhow!(
"Failed to read blobs from virtual file {}: {}",
self.file.path,
kind
)),
);
}
}
};
});
}
}

View File

@@ -10,10 +10,9 @@ use crate::context::{PageContentKind, RequestContext, RequestContextBuilder};
use crate::repository::{Key, Value};
use crate::tenant::ephemeral_file::EphemeralFile;
use crate::tenant::timeline::GetVectoredError;
use crate::tenant::PageReconstructError;
use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt;
use crate::{l0_flush, page_cache};
use anyhow::{anyhow, Context, Result};
use anyhow::{Context, Result};
use bytes::Bytes;
use camino::Utf8PathBuf;
use pageserver_api::key::CompactKey;
@@ -35,9 +34,7 @@ use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::atomic::{AtomicU64, AtomicUsize};
use tokio::sync::RwLock;
use super::{
DeltaLayerWriter, PersistentLayerDesc, ValueReconstructSituation, ValuesReconstructState,
};
use super::{DeltaLayerWriter, PersistentLayerDesc, ValuesReconstructState};
pub(crate) mod vectored_dio_read;
@@ -87,7 +84,7 @@ pub struct InMemoryLayerInner {
/// The values are stored in a serialized format in this file.
/// Each serialized Value is preceded by a 'u32' length field.
/// PerSeg::page_versions map stores offsets into this file.
file: EphemeralFile,
file: Arc<tokio::sync::RwLock<EphemeralFile>>,
resource_units: GlobalResourceUnits,
}
@@ -381,7 +378,11 @@ impl InMemoryLayer {
}
pub(crate) fn try_len(&self) -> Option<u64> {
self.inner.try_read().map(|i| i.file.len()).ok()
self.inner
.try_read()
.map(|i| i.file.try_read().map(|i| i.len()).ok())
.ok()
.flatten()
}
pub(crate) fn assert_writable(&self) {
@@ -432,6 +433,10 @@ impl InMemoryLayer {
read: vectored_dio_read::LogicalRead<Vec<u8>>,
}
let mut reads: HashMap<Key, Vec<ValueRead>> = HashMap::new();
let mut senders: HashMap<
(Key, Lsn),
tokio::sync::oneshot::Sender<Result<Bytes, std::io::Error>>,
> = Default::default();
for range in keyspace.ranges.iter() {
for (key, vec_map) in inner
@@ -459,6 +464,11 @@ impl InMemoryLayer {
Vec::with_capacity(len as usize),
),
});
let (tx, rx) = tokio::sync::oneshot::channel();
senders.insert((key, *entry_lsn), tx);
reconstruct_state.update_key(&key, *entry_lsn, will_init, rx);
if will_init {
break;
}
@@ -466,46 +476,42 @@ impl InMemoryLayer {
}
}
// Execute the reads.
let read_from = inner.file.clone();
let read_ctx = ctx.attached_child();
tokio::task::spawn(async move {
let locked = read_from.read().await;
let f = vectored_dio_read::execute(
&*locked,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
&read_ctx,
);
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
.await;
let f = vectored_dio_read::execute(
&inner.file,
reads
.iter()
.flat_map(|(_, value_reads)| value_reads.iter().map(|v| &v.read)),
&ctx,
);
send_future::SendFuture::send(f) // https://github.com/rust-lang/rust/issues/96865
.await;
// Process results into the reconstruct state
'next_key: for (key, value_reads) in reads {
for ValueRead { entry_lsn, read } in value_reads {
match read.into_result().expect("we run execute() above") {
Err(e) => {
reconstruct_state.on_key_error(key, PageReconstructError::from(anyhow!(e)));
continue 'next_key;
}
Ok(value_buf) => {
let value = Value::des(&value_buf);
if let Err(e) = value {
reconstruct_state
.on_key_error(key, PageReconstructError::from(anyhow!(e)));
continue 'next_key;
for (key, value_reads) in reads {
for ValueRead { entry_lsn, read } in value_reads {
let sender = senders
.remove(&(key, entry_lsn))
.expect("sender must exist");
match read.into_result().expect("we run execute() above") {
Err(e) => {
let sender = senders
.remove(&(key, entry_lsn))
.expect("sender must exist");
let _ = sender
.send(Err(std::io::Error::new(e.kind(), "dio vec read failed")));
}
let key_situation =
reconstruct_state.update_key(&key, entry_lsn, value.unwrap());
if key_situation == ValueReconstructSituation::Complete {
// TODO: metric to see if we fetched more values than necessary
continue 'next_key;
Ok(value_buf) => {
let _ = sender.send(Ok(value_buf.into()));
}
// process the next value in the next iteration of the loop
}
}
}
}
assert!(senders.is_empty());
});
reconstruct_state.on_lsn_advanced(&keyspace, self.start_lsn);
@@ -600,7 +606,8 @@ impl InMemoryLayer {
/// Get layer size.
pub async fn size(&self) -> Result<u64> {
let inner = self.inner.read().await;
Ok(inner.file.len())
let locked = inner.file.try_read().expect("no contention");
Ok(locked.len())
}
/// Create a new, empty, in-memory layer
@@ -614,9 +621,10 @@ impl InMemoryLayer {
) -> Result<InMemoryLayer> {
trace!("initializing new empty InMemoryLayer for writing on timeline {timeline_id} at {start_lsn}");
let file =
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?;
let key = InMemoryLayerFileId(file.page_cache_file_id());
let file = Arc::new(tokio::sync::RwLock::new(
EphemeralFile::create(conf, tenant_shard_id, timeline_id, gate_guard, ctx).await?,
));
let key = InMemoryLayerFileId(file.read().await.page_cache_file_id());
Ok(InMemoryLayer {
file_id: key,
@@ -648,7 +656,7 @@ impl InMemoryLayer {
let mut inner = self.inner.write().await;
self.assert_writable();
let base_offset = inner.file.len();
let base_offset = inner.file.read().await.len();
let SerializedBatch {
raw,
@@ -672,8 +680,13 @@ impl InMemoryLayer {
}
// Write the batch to the file
inner.file.write_raw(&raw, ctx).await?;
let new_size = inner.file.len();
// FIXME: can't borrow arc
let new_size = {
let mut locked = inner.file.write().await;
locked.write_raw(&raw, ctx).await?;
locked.len()
};
let expected_new_len = base_offset
.checked_add(raw.len().into_u64())
// write_raw would error if we were to overflow u64.
@@ -713,7 +726,7 @@ impl InMemoryLayer {
pub(crate) async fn tick(&self) -> Option<u64> {
let mut inner = self.inner.write().await;
let size = inner.file.len();
let size = inner.file.read().await.len();
inner.resource_units.publish_size(size)
}
@@ -809,7 +822,7 @@ impl InMemoryLayer {
match l0_flush_global_state {
l0_flush::Inner::Direct { .. } => {
let file_contents: Vec<u8> = inner.file.load_to_vec(ctx).await?;
let file_contents: Vec<u8> = inner.file.read().await.load_to_vec(ctx).await?;
let file_contents = Bytes::from(file_contents);

View File

@@ -107,6 +107,8 @@ async fn smoke_test() {
.expect("tenant harness writes the control file")
};
let img_before = (img_before.0, img_before.1.await.unwrap().unwrap());
let img_after = (img_after.0, img_after.1.await.unwrap().unwrap());
assert_eq!(img_before, img_after);
// evict_and_wait can timeout, but it doesn't cancel the evicting itself

View File

@@ -188,7 +188,7 @@ impl SplitImageLayerWriter {
.await
}
/// This function will be deprecated with #8841.
/// When split writer fails, the caller should call this function and handle partially generated layers.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, ImageLayerWriter)> {
Ok((self.generated_layers, self.inner))
}
@@ -204,7 +204,7 @@ impl SplitImageLayerWriter {
/// will split them into multiple files based on size.
#[must_use]
pub struct SplitDeltaLayerWriter {
inner: Option<(Key, DeltaLayerWriter)>,
inner: DeltaLayerWriter,
target_layer_size: u64,
generated_layers: Vec<SplitWriterResult>,
conf: &'static PageServerConf,
@@ -212,6 +212,7 @@ pub struct SplitDeltaLayerWriter {
tenant_shard_id: TenantShardId,
lsn_range: Range<Lsn>,
last_key_written: Key,
start_key: Key,
}
impl SplitDeltaLayerWriter {
@@ -219,18 +220,29 @@ impl SplitDeltaLayerWriter {
conf: &'static PageServerConf,
timeline_id: TimelineId,
tenant_shard_id: TenantShardId,
start_key: Key,
lsn_range: Range<Lsn>,
target_layer_size: u64,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
Ok(Self {
target_layer_size,
inner: None,
inner: DeltaLayerWriter::new(
conf,
timeline_id,
tenant_shard_id,
start_key,
lsn_range.clone(),
ctx,
)
.await?,
generated_layers: Vec::new(),
conf,
timeline_id,
tenant_shard_id,
lsn_range,
last_key_written: Key::MIN,
start_key,
})
}
@@ -253,26 +265,9 @@ impl SplitDeltaLayerWriter {
//
// Also, keep all updates of a single key in a single file. TODO: split them using the legacy compaction
// strategy. https://github.com/neondatabase/neon/issues/8837
if self.inner.is_none() {
self.inner = Some((
key,
DeltaLayerWriter::new(
self.conf,
self.timeline_id,
self.tenant_shard_id,
key,
self.lsn_range.clone(),
ctx,
)
.await?,
));
}
let (_, inner) = self.inner.as_mut().unwrap();
let addition_size_estimation = KEY_SIZE as u64 + 8 /* LSN u64 size */ + 80 /* value size estimation */;
if inner.num_keys() >= 1
&& inner.estimated_size() + addition_size_estimation >= self.target_layer_size
if self.inner.num_keys() >= 1
&& self.inner.estimated_size() + addition_size_estimation >= self.target_layer_size
{
if key != self.last_key_written {
let next_delta_writer = DeltaLayerWriter::new(
@@ -284,13 +279,13 @@ impl SplitDeltaLayerWriter {
ctx,
)
.await?;
let (start_key, prev_delta_writer) =
std::mem::replace(&mut self.inner, Some((key, next_delta_writer))).unwrap();
let prev_delta_writer = std::mem::replace(&mut self.inner, next_delta_writer);
let layer_key = PersistentLayerKey {
key_range: start_key..key,
key_range: self.start_key..key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
self.start_key = key;
if discard(&layer_key).await {
drop(prev_delta_writer);
self.generated_layers
@@ -301,18 +296,17 @@ impl SplitDeltaLayerWriter {
self.generated_layers
.push(SplitWriterResult::Produced(delta_layer));
}
} else if inner.estimated_size() >= S3_UPLOAD_LIMIT {
} else if self.inner.estimated_size() >= S3_UPLOAD_LIMIT {
// We have to produce a very large file b/c a key is updated too often.
anyhow::bail!(
"a single key is updated too often: key={}, estimated_size={}, and the layer file cannot be produced",
key,
inner.estimated_size()
self.inner.estimated_size()
);
}
}
self.last_key_written = key;
let (_, inner) = self.inner.as_mut().unwrap();
inner.put_value(key, lsn, val, ctx).await
self.inner.put_value(key, lsn, val, ctx).await
}
pub async fn put_value(
@@ -331,6 +325,7 @@ impl SplitDeltaLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
discard: D,
) -> anyhow::Result<Vec<SplitWriterResult>>
where
@@ -342,15 +337,11 @@ impl SplitDeltaLayerWriter {
inner,
..
} = self;
let Some((start_key, inner)) = inner else {
return Ok(generated_layers);
};
if inner.num_keys() == 0 {
return Ok(generated_layers);
}
let end_key = self.last_key_written.next();
let layer_key = PersistentLayerKey {
key_range: start_key..end_key,
key_range: self.start_key..end_key,
lsn_range: self.lsn_range.clone(),
is_delta: true,
};
@@ -369,14 +360,15 @@ impl SplitDeltaLayerWriter {
self,
tline: &Arc<Timeline>,
ctx: &RequestContext,
end_key: Key,
) -> anyhow::Result<Vec<SplitWriterResult>> {
self.finish_with_discard_fn(tline, ctx, |_| async { false })
self.finish_with_discard_fn(tline, ctx, end_key, |_| async { false })
.await
}
/// This function will be deprecated with #8841.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, Option<DeltaLayerWriter>)> {
Ok((self.generated_layers, self.inner.map(|x| x.1)))
/// When split writer fails, the caller should call this function and handle partially generated layers.
pub(crate) fn take(self) -> anyhow::Result<(Vec<SplitWriterResult>, DeltaLayerWriter)> {
Ok((self.generated_layers, self.inner))
}
}
@@ -440,8 +432,10 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
@@ -466,22 +460,11 @@ mod tests {
)
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
let layers = delta_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
assert_eq!(layers.len(), 1);
assert_eq!(
layers
.into_iter()
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
}
#[tokio::test]
@@ -518,8 +501,10 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
@@ -548,7 +533,10 @@ mod tests {
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
let delta_layers = delta_writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
if discard {
for layer in image_layers {
layer.into_discarded_layer();
@@ -567,14 +555,6 @@ mod tests {
.collect_vec();
assert_eq!(image_layers.len(), N / 512 + 1);
assert_eq!(delta_layers.len(), N / 512 + 1);
assert_eq!(
delta_layers.first().unwrap().layer_desc().key_range.start,
get_key(0)
);
assert_eq!(
delta_layers.last().unwrap().layer_desc().key_range.end,
get_key(N as u32)
);
for idx in 0..image_layers.len() {
assert_ne!(image_layers[idx].layer_desc().key_range.start, Key::MIN);
assert_ne!(image_layers[idx].layer_desc().key_range.end, Key::MAX);
@@ -622,8 +602,10 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x18)..Lsn(0x20),
4 * 1024,
&ctx,
)
.await
.unwrap();
@@ -662,35 +644,11 @@ mod tests {
)
.await
.unwrap();
let layers = delta_writer.finish(&tline, &ctx).await.unwrap();
let layers = delta_writer
.finish(&tline, &ctx, get_key(10))
.await
.unwrap();
assert_eq!(layers.len(), 2);
let mut layers_iter = layers.into_iter();
assert_eq!(
layers_iter
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
assert_eq!(
layers_iter
.next()
.unwrap()
.into_resident_layer()
.layer_desc()
.key(),
PersistentLayerKey {
key_range: get_key(1)..get_key(2),
lsn_range: Lsn(0x18)..Lsn(0x20),
is_delta: true
}
);
}
#[tokio::test]
@@ -710,8 +668,10 @@ mod tests {
tenant.conf,
tline.timeline_id,
tenant.tenant_shard_id,
get_key(0),
Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
4 * 1024 * 1024,
&ctx,
)
.await
.unwrap();
@@ -729,20 +689,10 @@ mod tests {
.await
.unwrap();
}
let delta_layers = delta_writer.finish(&tline, &ctx).await.unwrap();
let delta_layers = delta_writer
.finish(&tline, &ctx, get_key(N as u32))
.await
.unwrap();
assert_eq!(delta_layers.len(), 1);
let delta_layer = delta_layers
.into_iter()
.next()
.unwrap()
.into_resident_layer();
assert_eq!(
delta_layer.layer_desc().key(),
PersistentLayerKey {
key_range: get_key(0)..get_key(1),
lsn_range: Lsn(0x10)..Lsn(N as u64 * 16 + 0x10),
is_delta: true
}
);
}
}

View File

@@ -18,6 +18,7 @@ use camino::Utf8Path;
use chrono::{DateTime, Utc};
use enumset::EnumSet;
use fail::fail_point;
use futures::{stream::FuturesUnordered, StreamExt};
use handle::ShardTimelineId;
use once_cell::sync::Lazy;
use pageserver_api::{
@@ -68,7 +69,9 @@ use crate::{
tenant::{
layer_map::{LayerMap, SearchResult},
metadata::TimelineMetadata,
storage_layer::{inmemory_layer::IndexEntry, PersistentLayerDesc},
storage_layer::{
convert, inmemory_layer::IndexEntry, PersistentLayerDesc, ValueReconstructSituation,
},
},
walredo,
};
@@ -1129,22 +1132,38 @@ impl Timeline {
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
.for_get_kind(get_kind)
.start_timer();
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
let layers_visited = reconstruct_state.get_layers_visited();
let futs = FuturesUnordered::new();
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
match res {
Err(err) => {
results.insert(key, Err(err));
}
Ok(state) => {
let state = ValueReconstructState::from(state);
futs.push({
let walredo_self = self.myself.upgrade().expect("&self method holds the arc");
async move {
let state = res.expect("Read path is infallible");
assert!(matches!(
state.situation,
ValueReconstructSituation::Complete
));
let reconstruct_res = self.reconstruct_value(key, lsn, state).await;
results.insert(key, reconstruct_res);
let converted = match convert(key, state).await {
Ok(ok) => ok,
Err(err) => {
return (key, Err(err));
}
};
(
key,
walredo_self.reconstruct_value(key, lsn, converted).await,
)
}
}
});
}
let results = futs
.collect::<BTreeMap<Key, Result<Bytes, PageReconstructError>>>()
.await;
reconstruct_timer.stop_and_record();
// For aux file keys (v1 or v2) the vectored read path does not return an error
@@ -5496,30 +5515,30 @@ impl Timeline {
#[cfg(test)]
pub(crate) async fn inspect_image_layers(
self: &Arc<Timeline>,
lsn: Lsn,
ctx: &RequestContext,
_lsn: Lsn,
_ctx: &RequestContext,
) -> anyhow::Result<Vec<(Key, Bytes)>> {
let mut all_data = Vec::new();
let guard = self.layers.read().await;
for layer in guard.layer_map()?.iter_historic_layers() {
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
let layer = guard.get_from_desc(&layer);
let mut reconstruct_data = ValuesReconstructState::default();
layer
.get_values_reconstruct_data(
KeySpace::single(Key::MIN..Key::MAX),
lsn..Lsn(lsn.0 + 1),
&mut reconstruct_data,
ctx,
)
.await?;
for (k, v) in reconstruct_data.keys {
all_data.push((k, v?.img.unwrap().1));
}
}
}
all_data.sort();
Ok(all_data)
// let mut all_data = Vec::new();
// let guard = self.layers.read().await;
// for layer in guard.layer_map()?.iter_historic_layers() {
// if !layer.is_delta() && layer.image_layer_lsn() == lsn {
// let layer = guard.get_from_desc(&layer);
// let mut reconstruct_data = ValuesReconstructState::default();
// layer
// .get_values_reconstruct_data(
// KeySpace::single(Key::MIN..Key::MAX),
// lsn..Lsn(lsn.0 + 1),
// &mut reconstruct_data,
// ctx,
// )
// .await?;
// for (k, v) in reconstruct_data.keys {
// all_data.push((k, v?.img.unwrap().1));
// }
// }
// }
// all_data.sort();
Ok(Vec::new())
}
/// Get all historic layer descriptors in the layer map

View File

@@ -1809,6 +1809,7 @@ impl Timeline {
.unwrap();
// We don't want any of the produced layers to cover the full key range (i.e., MIN..MAX) b/c it will then be recognized
// as an L0 layer.
let hack_end_key = Key::NON_L0_MAX;
let mut delta_layers = Vec::new();
let mut image_layers = Vec::new();
let mut downloaded_layers = Vec::new();
@@ -1854,8 +1855,10 @@ impl Timeline {
self.conf,
self.timeline_id,
self.tenant_shard_id,
Key::MIN,
lowest_retain_lsn..end_lsn,
self.get_compaction_target_size(),
ctx,
)
.await?;
@@ -1962,7 +1965,7 @@ impl Timeline {
let produced_image_layers = if let Some(writer) = image_layer_writer {
if !dry_run {
writer
.finish_with_discard_fn(self, ctx, Key::MAX, discard)
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
.await?
} else {
let (layers, _) = writer.take()?;
@@ -1975,7 +1978,7 @@ impl Timeline {
let produced_delta_layers = if !dry_run {
delta_layer_writer
.finish_with_discard_fn(self, ctx, discard)
.finish_with_discard_fn(self, ctx, hack_end_key, discard)
.await?
} else {
let (layers, _) = delta_layer_writer.take()?;

View File

@@ -33,6 +33,7 @@ use crate::virtual_file::{self, VirtualFile};
pub struct BlobMeta {
pub key: Key,
pub lsn: Lsn,
pub will_init: bool,
}
/// Blob offsets into [`VectoredBlobsBuf::buf`]
@@ -355,7 +356,8 @@ pub enum BlobFlag {
/// * Iterate over the collected blobs and coalesce them into reads at the end
pub struct VectoredReadPlanner {
// Track all the blob offsets. Start offsets must be ordered.
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64)>>,
// Note: last bool is will_init
blobs: BTreeMap<Key, Vec<(Lsn, u64, u64, bool)>>,
// Arguments for previous blob passed into [`VectoredReadPlanner::handle`]
prev: Option<(Key, Lsn, u64, BlobFlag)>,
@@ -420,12 +422,12 @@ impl VectoredReadPlanner {
match flag {
BlobFlag::None => {
let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.push((lsn, start_offset, end_offset));
blobs_for_key.push((lsn, start_offset, end_offset, false));
}
BlobFlag::ReplaceAll => {
let blobs_for_key = self.blobs.entry(key).or_default();
blobs_for_key.clear();
blobs_for_key.push((lsn, start_offset, end_offset));
blobs_for_key.push((lsn, start_offset, end_offset, true));
}
BlobFlag::Ignore => {}
}
@@ -436,11 +438,17 @@ impl VectoredReadPlanner {
let mut reads = Vec::new();
for (key, blobs_for_key) in self.blobs {
for (lsn, start_offset, end_offset) in blobs_for_key {
for (lsn, start_offset, end_offset, will_init) in blobs_for_key {
let extended = match &mut current_read_builder {
Some(read_builder) => {
read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn })
}
Some(read_builder) => read_builder.extend(
start_offset,
end_offset,
BlobMeta {
key,
lsn,
will_init,
},
),
None => VectoredReadExtended::No,
};
@@ -448,7 +456,11 @@ impl VectoredReadPlanner {
let next_read_builder = VectoredReadBuilder::new(
start_offset,
end_offset,
BlobMeta { key, lsn },
BlobMeta {
key,
lsn,
will_init,
},
self.max_read_size,
self.mode,
);
@@ -665,10 +677,19 @@ impl StreamingVectoredReadPlanner {
start_offset: u64,
end_offset: u64,
is_last_blob_in_read: bool,
// destination: oneshot::Sender<Result<Bytes, std::io::Error>>,
) -> Option<VectoredRead> {
match &mut self.read_builder {
Some(read_builder) => {
let extended = read_builder.extend(start_offset, end_offset, BlobMeta { key, lsn });
let extended = read_builder.extend(
start_offset,
end_offset,
BlobMeta {
key,
lsn,
will_init: false,
},
);
assert_eq!(extended, VectoredReadExtended::Yes);
}
None => {
@@ -676,7 +697,11 @@ impl StreamingVectoredReadPlanner {
Some(VectoredReadBuilder::new_streaming(
start_offset,
end_offset,
BlobMeta { key, lsn },
BlobMeta {
key,
lsn,
will_init: false,
},
self.mode,
))
};
@@ -1008,6 +1033,7 @@ mod tests {
let meta = BlobMeta {
key: Key::MIN,
lsn: Lsn(0),
will_init: false,
};
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {

View File

@@ -25,7 +25,9 @@ use std::time::Duration;
use std::time::SystemTime;
use pageserver_api::shard::ShardIdentity;
use postgres_ffi::{dispatch_pgversion, enum_pgversion, enum_pgversion_dispatch, TimestampTz};
use postgres_ffi::v14::nonrelfile_utils::clogpage_precedes;
use postgres_ffi::v14::nonrelfile_utils::slru_may_delete_clogsegment;
use postgres_ffi::TimestampTz;
use postgres_ffi::{fsm_logical_to_physical, page_is_new, page_set_lsn};
use anyhow::{bail, Context, Result};
@@ -46,31 +48,16 @@ use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::{BlockNumber, RelTag, SlruKind};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::CheckPoint;
use postgres_ffi::TransactionId;
use postgres_ffi::BLCKSZ;
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
enum_pgversion! {CheckPoint, pgv::CheckPoint}
impl CheckPoint {
fn encode(&self) -> Result<Bytes, SerializeError> {
enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.encode() })
}
fn update_next_xid(&mut self, xid: u32) -> bool {
enum_pgversion_dispatch!(self, CheckPoint, cp, { cp.update_next_xid(xid) })
}
pub fn update_next_multixid(&mut self, multi_xid: u32, multi_offset: u32) -> bool {
enum_pgversion_dispatch!(self, CheckPoint, cp, {
cp.update_next_multixid(multi_xid, multi_offset)
})
}
}
pub struct WalIngest {
shard: ShardIdentity,
pg_version: u32,
checkpoint: CheckPoint,
checkpoint_modified: bool,
warn_ingest_lag: WarnIngestLag,
@@ -91,16 +78,12 @@ impl WalIngest {
// Fetch the latest checkpoint into memory, so that we can compare with it
// quickly in `ingest_record` and update it when it changes.
let checkpoint_bytes = timeline.get_checkpoint(startpoint, ctx).await?;
let pgversion = timeline.pg_version;
let checkpoint = dispatch_pgversion!(pgversion, {
let checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
<pgv::CheckPoint as Into<CheckPoint>>::into(checkpoint)
});
let checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!("CheckPoint.nextXid = {}", checkpoint.nextXid.value);
Ok(WalIngest {
shard: *timeline.get_shard_identity(),
pg_version: timeline.pg_version,
checkpoint,
checkpoint_modified: false,
warn_ingest_lag: WarnIngestLag {
@@ -134,7 +117,7 @@ impl WalIngest {
modification.set_lsn(lsn)?;
if decoded.is_dbase_create_copy(pg_version) {
if decoded.is_dbase_create_copy(self.pg_version) {
// Records of this type should always be preceded by a commit(), as they
// rely on reading data pages back from the Timeline.
assert!(!modification.has_dirty_data_pages());
@@ -354,67 +337,70 @@ impl WalIngest {
pg_constants::RM_XLOG_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
if cp.nextOid != next_oid {
cp.nextOid = next_oid;
self.checkpoint_modified = true;
}
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut checkpoint_bytes = [0u8; pgv::xlog_utils::SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = pgv::CheckPoint::decode(&checkpoint_bytes)?;
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid,
cp.oldestXid
);
if (cp.oldestXid.wrapping_sub(xlog_checkpoint.oldestXid) as i32) < 0 {
cp.oldestXid = xlog_checkpoint.oldestXid;
}
trace!(
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
xlog_checkpoint.oldestActiveXid,
cp.oldestActiveXid
);
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
// because at shutdown, all in-progress transactions will implicitly
// end. Postgres startup code knows that, and allows hot standby to start
// immediately from a shutdown checkpoint.
//
// In Neon, Postgres hot standby startup always behaves as if starting from
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
// instead of overwriting self.checkpoint.oldestActiveXid with
// InvalidTransactionid from the checkpoint WAL record, update it to a
// proper value, knowing that there are no in-progress transactions at this
// point, except for prepared transactions.
//
// See also the neon code changes in the InitWalRecovery() function.
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut oldest_active_xid = cp.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = xid;
}
}
cp.oldestActiveXid = oldest_active_xid;
} else {
cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
// have some trace of the checkpoint records in the layer files at the same
// LSNs.
if info == pg_constants::XLOG_NEXTOID {
let next_oid = buf.get_u32_le();
if self.checkpoint.nextOid != next_oid {
self.checkpoint.nextOid = next_oid;
self.checkpoint_modified = true;
}
});
} else if info == pg_constants::XLOG_CHECKPOINT_ONLINE
|| info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut checkpoint_bytes = [0u8; SIZEOF_CHECKPOINT];
buf.copy_to_slice(&mut checkpoint_bytes);
let xlog_checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
trace!(
"xlog_checkpoint.oldestXid={}, checkpoint.oldestXid={}",
xlog_checkpoint.oldestXid,
self.checkpoint.oldestXid
);
if (self
.checkpoint
.oldestXid
.wrapping_sub(xlog_checkpoint.oldestXid) as i32)
< 0
{
self.checkpoint.oldestXid = xlog_checkpoint.oldestXid;
}
trace!(
"xlog_checkpoint.oldestActiveXid={}, checkpoint.oldestActiveXid={}",
xlog_checkpoint.oldestActiveXid,
self.checkpoint.oldestActiveXid
);
// A shutdown checkpoint has `oldestActiveXid == InvalidTransactionid`,
// because at shutdown, all in-progress transactions will implicitly
// end. Postgres startup code knows that, and allows hot standby to start
// immediately from a shutdown checkpoint.
//
// In Neon, Postgres hot standby startup always behaves as if starting from
// an online checkpoint. It needs a valid `oldestActiveXid` value, so
// instead of overwriting self.checkpoint.oldestActiveXid with
// InvalidTransactionid from the checkpoint WAL record, update it to a
// proper value, knowing that there are no in-progress transactions at this
// point, except for prepared transactions.
//
// See also the neon code changes in the InitWalRecovery() function.
if xlog_checkpoint.oldestActiveXid == pg_constants::INVALID_TRANSACTION_ID
&& info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN
{
let mut oldest_active_xid = self.checkpoint.nextXid.value as u32;
for xid in modification.tline.list_twophase_files(lsn, ctx).await? {
if (xid.wrapping_sub(oldest_active_xid) as i32) < 0 {
oldest_active_xid = xid;
}
}
self.checkpoint.oldestActiveXid = oldest_active_xid;
} else {
self.checkpoint.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
// have some trace of the checkpoint records in the layer files at the same
// LSNs.
self.checkpoint_modified = true;
}
}
pg_constants::RM_LOGICALMSG_ID => {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
@@ -438,11 +424,7 @@ impl WalIngest {
let info = decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK;
if info == pg_constants::XLOG_RUNNING_XACTS {
let xlrec = crate::walrecord::XlRunningXacts::decode(&mut buf);
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestActiveXid = xlrec.oldest_running_xid;
});
self.checkpoint.oldestActiveXid = xlrec.oldest_running_xid;
self.checkpoint_modified = true;
}
}
@@ -557,7 +539,7 @@ impl WalIngest {
&& blk.has_image
&& decoded.xl_rmid == pg_constants::RM_XLOG_ID
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, modification.tline.pg_version)
// do not materialize null pages because them most likely be soon replaced with real data
@@ -1222,7 +1204,7 @@ impl WalIngest {
if rec.blkno % pg_constants::SLOTS_PER_FSM_PAGE != 0 {
// Tail of last remaining FSM page has to be zeroed.
// We are not precise here and instead of digging in FSM bitmap format just clear the whole page.
modification.put_rel_page_image_zero(rel, fsm_physical_page_no)?;
modification.put_rel_page_image_zero(rel, fsm_physical_page_no);
fsm_physical_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
@@ -1244,7 +1226,7 @@ impl WalIngest {
if rec.blkno % pg_constants::VM_HEAPBLOCKS_PER_PAGE != 0 {
// Tail of last remaining vm page has to be zeroed.
// We are not precise here and instead of digging in VM bitmap format just clear the whole page.
modification.put_rel_page_image_zero(rel, vm_page_no)?;
modification.put_rel_page_image_zero(rel, vm_page_no);
vm_page_no += 1;
}
let nblocks = get_relsize(modification, rel, ctx).await?;
@@ -1260,17 +1242,12 @@ impl WalIngest {
fn warn_on_ingest_lag(
&mut self,
conf: &crate::config::PageServerConf,
wal_timestamp: TimestampTz,
wal_timestmap: TimestampTz,
) {
debug_assert_current_span_has_tenant_and_timeline_id();
let now = SystemTime::now();
let rate_limits = &mut self.warn_ingest_lag;
let ts = enum_pgversion_dispatch!(&self.checkpoint, CheckPoint, _cp, {
pgv::xlog_utils::try_from_pg_timestamp(wal_timestamp)
});
match ts {
match try_from_pg_timestamp(wal_timestmap) {
Ok(ts) => {
match now.duration_since(ts) {
Ok(lag) => {
@@ -1280,7 +1257,7 @@ impl WalIngest {
warn!(%rate_limit_stats, %lag, "ingesting record with timestamp lagging more than wait_lsn_timeout");
})
}
}
},
Err(e) => {
let delta_t = e.duration();
// determined by prod victoriametrics query: 1000 * (timestamp(node_time_seconds{neon_service="pageserver"}) - node_time_seconds)
@@ -1294,6 +1271,7 @@ impl WalIngest {
}
}
};
}
Err(error) => {
rate_limits.timestamp_invalid_msg_ratelimit.call2(|rate_limit_stats| {
@@ -1401,17 +1379,14 @@ impl WalIngest {
// truncated, but a checkpoint record with the updated values isn't written until
// later. In Neon, a server can start at any LSN, not just on a checkpoint record,
// so we keep the oldestXid and oldestXidDB up-to-date.
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestXid = xlrec.oldest_xid;
cp.oldestXidDB = xlrec.oldest_xid_db;
});
self.checkpoint.oldestXid = xlrec.oldest_xid;
self.checkpoint.oldestXidDB = xlrec.oldest_xid_db;
self.checkpoint_modified = true;
// TODO Treat AdvanceOldestClogXid() or write a comment why we don't need it
let latest_page_number =
enum_pgversion_dispatch!(self.checkpoint, CheckPoint, cp, { cp.nextXid.value }) as u32
/ pg_constants::CLOG_XACTS_PER_PAGE;
self.checkpoint.nextXid.value as u32 / pg_constants::CLOG_XACTS_PER_PAGE;
// Now delete all segments containing pages between xlrec.pageno
// and latest_page_number.
@@ -1419,9 +1394,7 @@ impl WalIngest {
// First, make an important safety check:
// the current endpoint page must not be eligible for removal.
// See SimpleLruTruncate() in slru.c
if dispatch_pgversion!(modification.tline.pg_version, {
pgv::nonrelfile_utils::clogpage_precedes(latest_page_number, xlrec.pageno)
}) {
if clogpage_precedes(latest_page_number, xlrec.pageno) {
info!("could not truncate directory pg_xact apparent wraparound");
return Ok(());
}
@@ -1438,12 +1411,7 @@ impl WalIngest {
.await?
{
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, xlrec.pageno)
});
if may_delete {
if slru_may_delete_clogsegment(segpage, xlrec.pageno) {
modification
.drop_slru_segment(SlruKind::Clog, segno, ctx)
.await?;
@@ -1562,23 +1530,14 @@ impl WalIngest {
xlrec: &XlMultiXactTruncate,
ctx: &RequestContext,
) -> Result<()> {
let (maxsegment, startsegment, endsegment) =
enum_pgversion_dispatch!(&mut self.checkpoint, CheckPoint, cp, {
cp.oldestMulti = xlrec.end_trunc_off;
cp.oldestMultiDB = xlrec.oldest_multi_db;
let maxsegment: i32 = pgv::nonrelfile_utils::mx_offset_to_member_segment(
pg_constants::MAX_MULTIXACT_OFFSET,
);
let startsegment: i32 =
pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.start_trunc_memb);
let endsegment: i32 =
pgv::nonrelfile_utils::mx_offset_to_member_segment(xlrec.end_trunc_memb);
(maxsegment, startsegment, endsegment)
});
self.checkpoint.oldestMulti = xlrec.end_trunc_off;
self.checkpoint.oldestMultiDB = xlrec.oldest_multi_db;
self.checkpoint_modified = true;
// PerformMembersTruncation
let maxsegment: i32 = mx_offset_to_member_segment(pg_constants::MAX_MULTIXACT_OFFSET);
let startsegment: i32 = mx_offset_to_member_segment(xlrec.start_trunc_memb);
let endsegment: i32 = mx_offset_to_member_segment(xlrec.end_trunc_memb);
let mut segment: i32 = startsegment;
// Delete all the segments except the last one. The last segment can still
@@ -1737,7 +1696,7 @@ impl WalIngest {
continue;
}
modification.put_rel_page_image_zero(rel, gap_blknum)?;
modification.put_rel_page_image_zero(rel, gap_blknum);
}
}
Ok(())
@@ -1803,7 +1762,7 @@ impl WalIngest {
// fill the gap with zeros
for gap_blknum in old_nblocks..blknum {
modification.put_slru_page_image_zero(kind, segno, gap_blknum)?;
modification.put_slru_page_image_zero(kind, segno, gap_blknum);
}
}
Ok(())
@@ -1852,23 +1811,11 @@ mod tests {
// TODO
}
#[tokio::test]
async fn test_zeroed_checkpoint_decodes_correctly() -> Result<()> {
for i in 14..=16 {
dispatch_pgversion!(i, {
pgv::CheckPoint::decode(&pgv::ZERO_CHECKPOINT)?;
});
}
Ok(())
}
static ZERO_CHECKPOINT: Bytes = Bytes::from_static(&[0u8; SIZEOF_CHECKPOINT]);
async fn init_walingest_test(tline: &Timeline, ctx: &RequestContext) -> Result<WalIngest> {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(dispatch_pgversion!(
tline.pg_version,
pgv::ZERO_CHECKPOINT.clone()
))?;
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
m.commit(ctx).await?;
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;

View File

@@ -311,9 +311,7 @@ async fn auth_quirks(
let (allowed_ips, maybe_secret) = api.get_allowed_ips_and_secret(ctx, &info).await?;
// check allowed list
if config.ip_allowlist_check_enabled
&& !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
{
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed(ctx.peer_addr()));
}
@@ -605,7 +603,6 @@ mod tests {
rate_limiter_enabled: true,
rate_limiter: AuthRateLimiter::new(&RateBucketInfo::DEFAULT_AUTH_SET),
rate_limit_ip_subnet: 64,
ip_allowlist_check_enabled: true,
});
async fn read_message(r: &mut (impl AsyncRead + Unpin), b: &mut BytesMut) -> PgMessage {

View File

@@ -538,17 +538,4 @@ mod tests {
));
Ok(())
}
#[test]
fn test_connection_blocker() {
fn check(v: serde_json::Value) -> bool {
let peer_addr = IpAddr::from([127, 0, 0, 1]);
let ip_list: Vec<IpPattern> = serde_json::from_value(v).unwrap();
check_peer_addr_is_in_list(&peer_addr, &ip_list)
}
assert!(check(json!([])));
assert!(check(json!(["127.0.0.1"])));
assert!(!check(json!(["255.255.255.255"])));
}
}

View File

@@ -224,7 +224,6 @@ fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig
rate_limiter_enabled: false,
rate_limiter: BucketRateLimiter::new(vec![]),
rate_limit_ip_subnet: 64,
ip_allowlist_check_enabled: true,
},
require_client_ip: false,
handshake_timeout: Duration::from_secs(10),

View File

@@ -224,10 +224,6 @@ struct ProxyCliArgs {
/// Whether to retry the wake_compute request
#[clap(long, default_value = config::RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)]
wake_compute_retry: String,
/// Configure if this is a private access proxy for the POC: In that case the proxy will ignore the IP allowlist
#[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)]
is_private_access_proxy: bool,
}
#[derive(clap::Args, Clone, Copy, Debug)]
@@ -686,7 +682,6 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
rate_limiter_enabled: args.auth_rate_limit_enabled,
rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()),
rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet,
ip_allowlist_check_enabled: !args.is_private_access_proxy,
};
let config = Box::leak(Box::new(ProxyConfig {

View File

@@ -242,6 +242,6 @@ mod tests {
#[test]
fn test() {
let s = "{\"branch_created\":null,\"endpoint_created\":{\"endpoint_id\":\"ep-rapid-thunder-w0qqw2q9\"},\"project_created\":null,\"type\":\"endpoint_created\"}";
serde_json::from_str::<ControlPlaneEventKey>(s).unwrap();
let _: ControlPlaneEventKey = serde_json::from_str(s).unwrap();
}
}

View File

@@ -64,7 +64,6 @@ pub struct AuthenticationConfig {
pub rate_limiter_enabled: bool,
pub rate_limiter: AuthRateLimiter,
pub rate_limit_ip_subnet: u8,
pub ip_allowlist_check_enabled: bool,
}
impl TlsConfig {

View File

@@ -395,7 +395,7 @@ mod tests {
}
}
});
serde_json::from_str::<KickSession<'_>>(&json.to_string())?;
let _: KickSession<'_> = serde_json::from_str(&json.to_string())?;
Ok(())
}
@@ -403,7 +403,7 @@ mod tests {
#[test]
fn parse_db_info() -> anyhow::Result<()> {
// with password
serde_json::from_value::<DatabaseInfo>(json!({
let _: DatabaseInfo = serde_json::from_value(json!({
"host": "localhost",
"port": 5432,
"dbname": "postgres",
@@ -413,7 +413,7 @@ mod tests {
}))?;
// without password
serde_json::from_value::<DatabaseInfo>(json!({
let _: DatabaseInfo = serde_json::from_value(json!({
"host": "localhost",
"port": 5432,
"dbname": "postgres",
@@ -422,7 +422,7 @@ mod tests {
}))?;
// new field (forward compatibility)
serde_json::from_value::<DatabaseInfo>(json!({
let _: DatabaseInfo = serde_json::from_value(json!({
"host": "localhost",
"port": 5432,
"dbname": "postgres",
@@ -441,7 +441,7 @@ mod tests {
"address": "0.0.0.0",
"aux": dummy_aux(),
});
serde_json::from_str::<WakeCompute>(&json.to_string())?;
let _: WakeCompute = serde_json::from_str(&json.to_string())?;
Ok(())
}
@@ -451,18 +451,18 @@ mod tests {
let json = json!({
"role_secret": "secret",
});
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
let json = json!({
"role_secret": "secret",
"allowed_ips": ["8.8.8.8"],
});
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
let json = json!({
"role_secret": "secret",
"allowed_ips": ["8.8.8.8"],
"project_id": "project",
});
serde_json::from_str::<GetRoleSecret>(&json.to_string())?;
let _: GetRoleSecret = serde_json::from_str(&json.to_string())?;
Ok(())
}

View File

@@ -78,7 +78,7 @@ pub(crate) type ComputeReady = DatabaseInfo;
// TODO: replace with an http-based protocol.
struct MgmtHandler;
#[async_trait::async_trait]
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
async fn process_query(
&mut self,

View File

@@ -6,7 +6,7 @@ use pq_proto::StartupMessageParams;
use smol_str::SmolStr;
use std::net::IpAddr;
use tokio::sync::mpsc;
use tracing::{debug, field::display, info, info_span, Span};
use tracing::{field::display, info, info_span, Span};
use try_lock::TryLock;
use uuid::Uuid;
@@ -362,9 +362,7 @@ impl RequestMonitoringInner {
});
}
if let Some(tx) = self.sender.take() {
tx.send(RequestData::from(&*self))
.inspect_err(|e| debug!("tx send failed: {e}"))
.ok();
let _: Result<(), _> = tx.send(RequestData::from(&*self));
}
}
@@ -373,9 +371,7 @@ impl RequestMonitoringInner {
// Here we log the length of the session.
self.disconnect_timestamp = Some(Utc::now());
if let Some(tx) = self.disconnect_sender.take() {
tx.send(RequestData::from(&*self))
.inspect_err(|e| debug!("tx send failed: {e}"))
.ok();
let _: Result<(), _> = tx.send(RequestData::from(&*self));
}
}
}

View File

@@ -290,7 +290,7 @@ async fn worker_inner(
}
if !w.flushed_row_groups().is_empty() {
let _rtchk: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
let _: Writer<BytesMut> = upload_parquet(w, len, &storage).await?;
}
Ok(())
@@ -598,15 +598,15 @@ mod tests {
assert_eq!(
file_stats,
[
(1312632, 3, 6000),
(1312621, 3, 6000),
(1312680, 3, 6000),
(1312637, 3, 6000),
(1312773, 3, 6000),
(1312610, 3, 6000),
(1312404, 3, 6000),
(1312639, 3, 6000),
(437848, 1, 2000)
(1315874, 3, 6000),
(1315867, 3, 6000),
(1315927, 3, 6000),
(1315884, 3, 6000),
(1316014, 3, 6000),
(1315856, 3, 6000),
(1315648, 3, 6000),
(1315884, 3, 6000),
(438913, 1, 2000)
]
);
@@ -638,11 +638,11 @@ mod tests {
assert_eq!(
file_stats,
[
(1203465, 5, 10000),
(1203189, 5, 10000),
(1203490, 5, 10000),
(1203475, 5, 10000),
(1203729, 5, 10000)
(1208861, 5, 10000),
(1208592, 5, 10000),
(1208885, 5, 10000),
(1208873, 5, 10000),
(1209128, 5, 10000)
]
);
@@ -667,15 +667,15 @@ mod tests {
assert_eq!(
file_stats,
[
(1312632, 3, 6000),
(1312621, 3, 6000),
(1312680, 3, 6000),
(1312637, 3, 6000),
(1312773, 3, 6000),
(1312610, 3, 6000),
(1312404, 3, 6000),
(1312639, 3, 6000),
(437848, 1, 2000)
(1315874, 3, 6000),
(1315867, 3, 6000),
(1315927, 3, 6000),
(1315884, 3, 6000),
(1316014, 3, 6000),
(1315856, 3, 6000),
(1315648, 3, 6000),
(1315884, 3, 6000),
(438913, 1, 2000)
]
);
@@ -712,7 +712,7 @@ mod tests {
// files are smaller than the size threshold, but they took too long to fill so were flushed early
assert_eq!(
file_stats,
[(657696, 2, 3001), (657410, 2, 3000), (657206, 2, 2999)]
[(659836, 2, 3001), (659550, 2, 3000), (659346, 2, 2999)]
);
tmpdir.close().unwrap();

View File

@@ -3,7 +3,7 @@
#![deny(
deprecated,
future_incompatible,
let_underscore,
// TODO: consider let_underscore
nonstandard_style,
rust_2024_compatibility
)]

View File

@@ -268,7 +268,7 @@ async fn keepalive_is_inherited() -> anyhow::Result<()> {
anyhow::Ok(keepalive)
});
TcpStream::connect(("127.0.0.1", port)).await?;
let _ = TcpStream::connect(("127.0.0.1", port)).await?;
assert!(t.await??, "keepalive should be inherited");
Ok(())

View File

@@ -6,7 +6,7 @@ use redis::{
ConnectionInfo, IntoConnectionInfo, RedisConnectionInfo, RedisResult,
};
use tokio::task::JoinHandle;
use tracing::{debug, error, info};
use tracing::{error, info};
use super::elasticache::CredentialsProvider;
@@ -109,10 +109,7 @@ impl ConnectionWithCredentialsProvider {
let credentials_provider = credentials_provider.clone();
let con2 = con.clone();
let f = tokio::spawn(async move {
Self::keep_connection(con2, credentials_provider)
.await
.inspect_err(|e| debug!("keep_connection failed: {e}"))
.ok();
let _ = Self::keep_connection(con2, credentials_provider).await;
});
self.refresh_token_task = Some(f);
}

View File

@@ -50,9 +50,7 @@ impl PoolingBackend {
.as_ref()
.map(|()| user_info.clone());
let (allowed_ips, maybe_secret) = backend.get_allowed_ips_and_secret(ctx).await?;
if config.ip_allowlist_check_enabled
&& !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips)
{
if !check_peer_addr_is_in_list(&ctx.peer_addr(), &allowed_ips) {
return Err(AuthError::ip_address_not_allowed(ctx.peer_addr()));
}
if !self

View File

@@ -12,7 +12,6 @@ use std::{io, task};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_rustls::server::TlsStream;
use tracing::debug;
/// Stream wrapper which implements libpq's protocol.
///
@@ -139,10 +138,9 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
);
// already error case, ignore client IO error
self.write_message(&BeMessage::ErrorResponse(msg, None))
.await
.inspect_err(|e| debug!("write_message failed: {e}"))
.ok();
let _: Result<_, std::io::Error> = self
.write_message(&BeMessage::ErrorResponse(msg, None))
.await;
Err(ReportedError {
source: anyhow::anyhow!(msg),
@@ -166,10 +164,9 @@ impl<S: AsyncWrite + Unpin> PqStream<S> {
);
// already error case, ignore client IO error
self.write_message(&BeMessage::ErrorResponse(&msg, None))
.await
.inspect_err(|e| debug!("write_message failed: {e}"))
.ok();
let _: Result<_, std::io::Error> = self
.write_message(&BeMessage::ErrorResponse(&msg, None))
.await;
Err(ReportedError {
source: anyhow::anyhow!(error),

View File

@@ -57,7 +57,7 @@ mod tests {
fn bad_url() {
let url = "test:foobar";
url.parse::<url::Url>().expect("unexpected parsing failure");
url.parse::<ApiUrl>().expect_err("should not parse");
let _ = url.parse::<ApiUrl>().expect_err("should not parse");
}
#[test]

View File

@@ -2,7 +2,6 @@
//! protocol commands.
use anyhow::Context;
use std::future::Future;
use std::str::{self, FromStr};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
@@ -96,6 +95,7 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
}
}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
for SafekeeperPostgresHandler
{
@@ -197,51 +197,49 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
Ok(())
}
fn process_query(
async fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> impl Future<Output = Result<(), QueryError>> {
Box::pin(async move {
if query_string
.to_ascii_lowercase()
.starts_with("set datestyle to ")
{
// important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
return Ok(());
) -> Result<(), QueryError> {
if query_string
.to_ascii_lowercase()
.starts_with("set datestyle to ")
{
// important for debug because psycopg2 executes "SET datestyle TO 'ISO'" on connect
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
return Ok(());
}
let cmd = parse_cmd(query_string)?;
let cmd_str = cmd_to_string(&cmd);
let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
info!("got query {:?}", query_string);
let tenant_id = self.tenant_id.context("tenantid is required")?;
let timeline_id = self.timeline_id.context("timelineid is required")?;
self.check_permission(Some(tenant_id))?;
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
match cmd {
SafekeeperPostgresCommand::StartWalPush => {
self.handle_start_wal_push(pgb)
.instrument(info_span!("WAL receiver"))
.await
}
let cmd = parse_cmd(query_string)?;
let cmd_str = cmd_to_string(&cmd);
let _guard = PG_QUERIES_GAUGE.with_label_values(&[cmd_str]).guard();
info!("got query {:?}", query_string);
let tenant_id = self.tenant_id.context("tenantid is required")?;
let timeline_id = self.timeline_id.context("timelineid is required")?;
self.check_permission(Some(tenant_id))?;
self.ttid = TenantTimelineId::new(tenant_id, timeline_id);
match cmd {
SafekeeperPostgresCommand::StartWalPush => {
self.handle_start_wal_push(pgb)
.instrument(info_span!("WAL receiver"))
.await
}
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
self.handle_start_replication(pgb, start_lsn, term)
.instrument(info_span!("WAL sender"))
.await
}
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
handle_json_ctrl(self, pgb, cmd).await
}
SafekeeperPostgresCommand::StartReplication { start_lsn, term } => {
self.handle_start_replication(pgb, start_lsn, term)
.instrument(info_span!("WAL sender"))
.await
}
})
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
handle_json_ctrl(self, pgb, cmd).await
}
}
}
}

View File

@@ -18,7 +18,8 @@ Prerequisites:
Regression tests are in the 'regress' directory. They can be run in
parallel to minimize total runtime. Most regression test sets up their
environment with its own pageservers and safekeepers.
environment with its own pageservers and safekeepers (but see
`TEST_SHARED_FIXTURES`).
'pg_clients' contains tests for connecting with various client
libraries. Each client test uses a Dockerfile that pulls an image that
@@ -73,6 +74,7 @@ This is used to construct full path to the postgres binaries.
Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION=16`
`TEST_OUTPUT`: Set the directory where test state and test output files
should go.
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.
`RUST_LOG`: logging configuration to pass into Neon CLI
Useful parameters and commands:
@@ -257,8 +259,11 @@ compute Postgres nodes. The connections between them can be configured to use JW
authentication tokens, and some other configuration options can be tweaked too.
The easiest way to get access to a Neon Environment is by using the `neon_simple_env`
fixture. For convenience, there is a branch called `main` in environments created with
'neon_simple_env', ready to be used in the test.
fixture. The 'simple' env may be shared across multiple tests, so don't shut down the nodes
or make other destructive changes in that environment. Also don't assume that
there are no tenants or branches or data in the cluster. For convenience, there is a
branch called `empty`, though. The convention is to create a test-specific branch of
that and load any test data there, instead of the 'main' branch.
For more complicated cases, you can build a custom Neon Environment, with the `neon_env`
fixture:

View File

@@ -57,6 +57,7 @@ from _pytest.fixtures import FixtureRequest
from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import cursor as PgCursor
from psycopg2.extensions import make_dsn, parse_dsn
from typing_extensions import Literal
from urllib3.util.retry import Retry
from fixtures import overlayfs
@@ -220,6 +221,33 @@ def neon_api(neon_api_key: str, neon_api_base_url: str) -> NeonAPI:
return NeonAPI(neon_api_key, neon_api_base_url)
def shareable_scope(fixture_name: str, config: Config) -> Literal["session", "function"]:
"""Return either session of function scope, depending on TEST_SHARED_FIXTURES envvar.
This function can be used as a scope like this:
@pytest.fixture(scope=shareable_scope)
def myfixture(...)
...
"""
scope: Literal["session", "function"]
if os.environ.get("TEST_SHARED_FIXTURES") is None:
# Create the environment in the per-test output directory
scope = "function"
elif (
os.environ.get("BUILD_TYPE") is not None
and os.environ.get("DEFAULT_PG_VERSION") is not None
):
scope = "session"
else:
pytest.fail(
"Shared environment(TEST_SHARED_FIXTURES) requires BUILD_TYPE and DEFAULT_PG_VERSION to be set",
pytrace=False,
)
return scope
@pytest.fixture(scope="session")
def worker_port_num():
return (32768 - BASE_PORT) // int(os.environ.get("PYTEST_XDIST_WORKER_COUNT", "1"))
@@ -553,6 +581,10 @@ class NeonEnvBuilder:
self.env = NeonEnv(self)
return self.env
def start(self):
assert self.env is not None, "environment is not already initialized, call init() first"
self.env.start()
def init_start(
self,
initial_tenant_conf: Optional[Dict[str, Any]] = None,
@@ -568,7 +600,7 @@ class NeonEnvBuilder:
Configuring pageserver with remote storage is now the default. There will be a warning if pageserver is created without one.
"""
env = self.init_configs(default_remote_storage_if_missing=default_remote_storage_if_missing)
env.start()
self.start()
# Prepare the default branch to start the postgres on later.
# Pageserver itself does not create tenants and timelines, until started first and asked via HTTP API.
@@ -1069,6 +1101,9 @@ class NeonEnv:
self.pg_distrib_dir = config.pg_distrib_dir
self.endpoint_counter = 0
self.storage_controller_config = config.storage_controller_config
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
self.initial_tenant = config.initial_tenant
self.initial_timeline = config.initial_timeline
@@ -1396,8 +1431,8 @@ class NeonEnv:
return "ep-" + str(self.endpoint_counter)
@pytest.fixture(scope="function")
def neon_simple_env(
@pytest.fixture(scope=shareable_scope)
def _shared_simple_env(
request: FixtureRequest,
pytestconfig: Config,
port_distributor: PortDistributor,
@@ -1415,13 +1450,19 @@ def neon_simple_env(
pageserver_io_buffer_alignment: Optional[int],
) -> Iterator[NeonEnv]:
"""
Simple Neon environment, with no authentication and no safekeepers.
# Internal fixture backing the `neon_simple_env` fixture. If TEST_SHARED_FIXTURES
is set, this is shared by all tests using `neon_simple_env`.
This fixture will use RemoteStorageKind.LOCAL_FS with pageserver.
"""
# Create the environment in the per-test output directory
repo_dir = get_test_repo_dir(request, top_output_dir)
if os.environ.get("TEST_SHARED_FIXTURES") is None:
# Create the environment in the per-test output directory
repo_dir = get_test_repo_dir(request, top_output_dir)
else:
# We're running shared fixtures. Share a single directory.
repo_dir = top_output_dir / "shared_repo"
shutil.rmtree(repo_dir, ignore_errors=True)
with NeonEnvBuilder(
top_output_dir=top_output_dir,
@@ -1443,9 +1484,27 @@ def neon_simple_env(
) as builder:
env = builder.init_start()
# For convenience in tests, create a branch from the freshly-initialized cluster.
env.neon_cli.create_branch("empty", ancestor_branch_name=DEFAULT_BRANCH_NAME)
yield env
@pytest.fixture(scope="function")
def neon_simple_env(_shared_simple_env: NeonEnv) -> Iterator[NeonEnv]:
"""
Simple Neon environment, with no authentication and no safekeepers.
If TEST_SHARED_FIXTURES environment variable is set, we reuse the same
environment for all tests that use 'neon_simple_env', keeping the
page server and safekeepers running. Any compute nodes are stopped after
each the test, however.
"""
yield _shared_simple_env
_shared_simple_env.endpoints.stop_all()
@pytest.fixture(scope="function")
def neon_env_builder(
pytestconfig: Config,
@@ -1514,6 +1573,14 @@ class PageserverPort:
http: int
CREATE_TIMELINE_ID_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg]
r"^Created timeline '(?P<timeline_id>[^']+)'", re.MULTILINE
)
TIMELINE_DATA_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg]
r"\s?(?P<branch_name>[^\s]+)\s\[(?P<timeline_id>[^\]]+)\]", re.MULTILINE
)
class AbstractNeonCli(abc.ABC):
"""
A typed wrapper around an arbitrary Neon CLI tool.
@@ -1742,9 +1809,6 @@ class NeonCli(AbstractNeonCli):
tenant_id: Optional[TenantId] = None,
timeline_id: Optional[TimelineId] = None,
) -> TimelineId:
if timeline_id is None:
timeline_id = TimelineId.generate()
cmd = [
"timeline",
"create",
@@ -1752,16 +1816,23 @@ class NeonCli(AbstractNeonCli):
new_branch_name,
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--timeline-id",
str(timeline_id),
"--pg-version",
self.env.pg_version,
]
if timeline_id is not None:
cmd.extend(["--timeline-id", str(timeline_id)])
res = self.raw_cli(cmd)
res.check_returncode()
return timeline_id
matches = CREATE_TIMELINE_ID_EXTRACTOR.search(res.stdout)
created_timeline_id = None
if matches is not None:
created_timeline_id = matches.group("timeline_id")
return TimelineId(str(created_timeline_id))
def create_branch(
self,
@@ -1769,17 +1840,12 @@ class NeonCli(AbstractNeonCli):
ancestor_branch_name: Optional[str] = None,
tenant_id: Optional[TenantId] = None,
ancestor_start_lsn: Optional[Lsn] = None,
new_timeline_id: Optional[TimelineId] = None,
) -> TimelineId:
if new_timeline_id is None:
new_timeline_id = TimelineId.generate()
cmd = [
"timeline",
"branch",
"--branch-name",
new_branch_name,
"--timeline-id",
str(new_timeline_id),
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
]
@@ -1791,7 +1857,16 @@ class NeonCli(AbstractNeonCli):
res = self.raw_cli(cmd)
res.check_returncode()
return TimelineId(str(new_timeline_id))
matches = CREATE_TIMELINE_ID_EXTRACTOR.search(res.stdout)
created_timeline_id = None
if matches is not None:
created_timeline_id = matches.group("timeline_id")
if created_timeline_id is None:
raise Exception("could not find timeline id after `neon timeline create` invocation")
else:
return TimelineId(str(created_timeline_id))
def list_timelines(self, tenant_id: Optional[TenantId] = None) -> List[Tuple[str, TimelineId]]:
"""
@@ -1800,9 +1875,6 @@ class NeonCli(AbstractNeonCli):
# main [b49f7954224a0ad25cc0013ea107b54b]
# ┣━ @0/16B5A50: test_cli_branch_list_main [20f98c79111b9015d84452258b7d5540]
TIMELINE_DATA_EXTRACTOR: re.Pattern = re.compile( # type: ignore[type-arg]
r"\s?(?P<branch_name>[^\s]+)\s\[(?P<timeline_id>[^\]]+)\]", re.MULTILINE
)
res = self.raw_cli(
["timeline", "list", "--tenant-id", str(tenant_id or self.env.initial_tenant)]
)
@@ -4826,7 +4898,14 @@ SMALL_DB_FILE_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
# This is autouse, so the test output directory always gets created, even
# if a test doesn't put anything there.
# if a test doesn't put anything there. It also solves a problem with the
# neon_simple_env fixture: if TEST_SHARED_FIXTURES is not set, it
# creates the repo in the test output directory. But it cannot depend on
# 'test_output_dir' fixture, because when TEST_SHARED_FIXTURES is not set,
# it has 'session' scope and cannot access fixtures with 'function'
# scope. So it uses the get_test_output_dir() function to get the path, and
# this fixture ensures that the directory exists. That works because
# 'autouse' fixtures are run before other fixtures.
#
# NB: we request the overlay dir fixture so the fixture does its cleanups
@pytest.fixture(scope="function", autouse=True)

View File

@@ -22,8 +22,10 @@ if TYPE_CHECKING:
def test_logical_replication(neon_simple_env: NeonEnv, pg_bin: PgBin, vanilla_pg):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
env.neon_cli.create_branch("test_logical_replication", "empty")
endpoint = env.endpoints.create_start("test_logical_replication")
log.info("postgres is running on 'test_logical_replication' branch")
pg_bin.run_capture(["pgbench", "-i", "-s10", endpoint.connstr()])
endpoint.safe_psql("create publication pub1 for table pgbench_accounts, pgbench_history")

View File

@@ -84,7 +84,7 @@ def test_storage_controller_many_tenants(
compute_reconfigure_listener.register_on_notify(lambda body: time.sleep(0.01))
env = neon_env_builder.init_configs()
env.start()
neon_env_builder.start()
# We will intentionally stress reconciler concurrrency, which triggers a warning when lots
# of shards are hitting the delayed path.

View File

@@ -8,10 +8,11 @@ from fixtures.neon_fixtures import NeonEnv
#
def test_basebackup_error(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_basebackup_error", "empty")
pageserver_http = env.pageserver.http_client()
# Introduce failpoint
pageserver_http.configure_failpoints(("basebackup-before-control-file", "return"))
with pytest.raises(Exception, match="basebackup-before-control-file"):
env.endpoints.create_start("main")
env.endpoints.create_start("test_basebackup_error")

View File

@@ -11,6 +11,7 @@ from fixtures.utils import query_scalar
#
def test_clog_truncate(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_clog_truncate", "empty")
# set aggressive autovacuum to make sure that truncation will happen
config = [
@@ -23,7 +24,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
"autovacuum_freeze_max_age=100000",
]
endpoint = env.endpoints.create_start("main", config_lines=config)
endpoint = env.endpoints.create_start("test_clog_truncate", config_lines=config)
# Install extension containing function needed for test
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")
@@ -57,7 +58,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
# create new branch after clog truncation and start a compute node on it
log.info(f"create branch at lsn_after_truncation {lsn_after_truncation}")
env.neon_cli.create_branch(
"test_clog_truncate_new", "main", ancestor_start_lsn=lsn_after_truncation
"test_clog_truncate_new", "test_clog_truncate", ancestor_start_lsn=lsn_after_truncation
)
endpoint2 = env.endpoints.create_start("test_clog_truncate_new")

View File

@@ -178,7 +178,7 @@ def test_backward_compatibility(
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.from_repo_dir(compatibility_snapshot_dir / "repo")
env.pageserver.allowed_errors.append(ingest_lag_log_line)
env.start()
neon_env_builder.start()
check_neon_works(
env,
@@ -265,7 +265,7 @@ def test_forward_compatibility(
# does not include logs from previous runs
assert not env.pageserver.log_contains("git-env:" + prev_pageserver_version)
env.start()
neon_env_builder.start()
# ensure the specified pageserver is running
assert env.pageserver.log_contains("git-env:" + prev_pageserver_version)

View File

@@ -4,8 +4,9 @@ from fixtures.neon_fixtures import NeonEnv
def test_compute_catalog(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_config", "empty")
endpoint = env.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
endpoint = env.endpoints.create_start("test_config", config_lines=["log_min_messages=debug1"])
client = endpoint.http_client()
objects = client.dbs_and_roles()

View File

@@ -9,9 +9,10 @@ from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder
#
def test_config(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_config", "empty")
# change config
endpoint = env.endpoints.create_start("main", config_lines=["log_min_messages=debug1"])
endpoint = env.endpoints.create_start("test_config", config_lines=["log_min_messages=debug1"])
with closing(endpoint.connect()) as conn:
with conn.cursor() as cur:

View File

@@ -17,7 +17,9 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
if env.pg_version == PgVersion.V14 and strategy == "wal_log":
pytest.skip("wal_log strategy not supported on PostgreSQL 14")
endpoint = env.endpoints.create_start("main")
env.neon_cli.create_branch("test_createdb", "empty")
endpoint = env.endpoints.create_start("test_createdb")
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
@@ -31,7 +33,7 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create a branch
env.neon_cli.create_branch("test_createdb2", "main", ancestor_start_lsn=lsn)
env.neon_cli.create_branch("test_createdb2", "test_createdb", ancestor_start_lsn=lsn)
endpoint2 = env.endpoints.create_start("test_createdb2")
# Test that you can connect to the new database on both branches
@@ -60,7 +62,8 @@ def test_createdb(neon_simple_env: NeonEnv, strategy: str):
#
def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
env.neon_cli.create_branch("test_dropdb", "empty")
endpoint = env.endpoints.create_start("test_dropdb")
with endpoint.cursor() as cur:
cur.execute("CREATE DATABASE foodb")
@@ -77,10 +80,14 @@ def test_dropdb(neon_simple_env: NeonEnv, test_output_dir):
lsn_after_drop = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create two branches before and after database drop.
env.neon_cli.create_branch("test_before_dropdb", "main", ancestor_start_lsn=lsn_before_drop)
env.neon_cli.create_branch(
"test_before_dropdb", "test_dropdb", ancestor_start_lsn=lsn_before_drop
)
endpoint_before = env.endpoints.create_start("test_before_dropdb")
env.neon_cli.create_branch("test_after_dropdb", "main", ancestor_start_lsn=lsn_after_drop)
env.neon_cli.create_branch(
"test_after_dropdb", "test_dropdb", ancestor_start_lsn=lsn_after_drop
)
endpoint_after = env.endpoints.create_start("test_after_dropdb")
# Test that database exists on the branch before drop

View File

@@ -7,7 +7,8 @@ from fixtures.utils import query_scalar
#
def test_createuser(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
env.neon_cli.create_branch("test_createuser", "empty")
endpoint = env.endpoints.create_start("test_createuser")
with endpoint.cursor() as cur:
# Cause a 'relmapper' change in the original branch
@@ -18,7 +19,7 @@ def test_createuser(neon_simple_env: NeonEnv):
lsn = query_scalar(cur, "SELECT pg_current_wal_insert_lsn()")
# Create a branch
env.neon_cli.create_branch("test_createuser2", "main", ancestor_start_lsn=lsn)
env.neon_cli.create_branch("test_createuser2", "test_createuser", ancestor_start_lsn=lsn)
endpoint2 = env.endpoints.create_start("test_createuser2")
# Test that you can connect to new branch as a new user

View File

@@ -290,8 +290,9 @@ def assert_db_connlimit(endpoint: Any, db_name: str, connlimit: int, msg: str):
# Here we test the latter. The first one is tested in test_ddl_forwarding
def test_ddl_forwarding_invalid_db(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_ddl_forwarding_invalid_db", "empty")
endpoint = env.endpoints.create_start(
"main",
"test_ddl_forwarding_invalid_db",
# Some non-existent url
config_lines=["neon.console_url=http://localhost:9999/unknown/api/v0/roles_and_databases"],
)

View File

@@ -10,9 +10,11 @@ def test_explain_with_lfc_stats(neon_simple_env: NeonEnv):
cache_dir = Path(env.repo_dir) / "file_cache"
cache_dir.mkdir(exist_ok=True)
log.info("Creating endpoint with 1MB shared_buffers and 64 MB LFC")
branchname = "test_explain_with_lfc_stats"
env.neon_cli.create_branch(branchname, "empty")
log.info(f"Creating endopint with 1MB shared_buffers and 64 MB LFC for branch {branchname}")
endpoint = env.endpoints.create_start(
"main",
branchname,
config_lines=[
"shared_buffers='1MB'",
f"neon.file_cache_path='{cache_dir}/file.cache'",

View File

@@ -16,8 +16,9 @@ from fixtures.neon_fixtures import NeonEnv, PgBin
@pytest.mark.timeout(600)
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
env.neon_cli.create_branch("test_lfc_resize", "empty")
endpoint = env.endpoints.create_start(
"main",
"test_lfc_resize",
config_lines=[
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=512MB",

View File

@@ -12,9 +12,11 @@ def test_lfc_working_set_approximation(neon_simple_env: NeonEnv):
cache_dir = Path(env.repo_dir) / "file_cache"
cache_dir.mkdir(exist_ok=True)
log.info("Creating endpoint with 1MB shared_buffers and 64 MB LFC")
branchname = "test_approximate_working_set_size"
env.neon_cli.create_branch(branchname, "empty")
log.info(f"Creating endopint with 1MB shared_buffers and 64 MB LFC for branch {branchname}")
endpoint = env.endpoints.create_start(
"main",
branchname,
config_lines=[
"shared_buffers='1MB'",
f"neon.file_cache_path='{cache_dir}/file.cache'",

View File

@@ -5,7 +5,7 @@ import threading
import time
from typing import List
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.neon_fixtures import DEFAULT_BRANCH_NAME, NeonEnvBuilder
from fixtures.utils import query_scalar
@@ -15,8 +15,11 @@ def test_local_file_cache_unlink(neon_env_builder: NeonEnvBuilder):
cache_dir = os.path.join(env.repo_dir, "file_cache")
os.mkdir(cache_dir)
env.neon_cli.create_branch("empty", ancestor_branch_name=DEFAULT_BRANCH_NAME)
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
endpoint = env.endpoints.create_start(
"main",
"test_local_file_cache_unlink",
config_lines=[
"shared_buffers='1MB'",
f"neon.file_cache_path='{cache_dir}/file.cache'",

View File

@@ -36,8 +36,10 @@ def test_logical_replication(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main", config_lines=["log_statement=all"])
timeline_id = env.neon_cli.create_branch("test_logical_replication", "empty")
endpoint = env.endpoints.create_start(
"test_logical_replication", config_lines=["log_statement=all"]
)
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
@@ -183,9 +185,10 @@ def test_obsolete_slot_drop(neon_simple_env: NeonEnv, vanilla_pg):
env = neon_simple_env
env.neon_cli.create_branch("test_logical_replication", "empty")
# set low neon.logical_replication_max_snap_files
endpoint = env.endpoints.create_start(
"main",
"test_logical_replication",
config_lines=["log_statement=all", "neon.logical_replication_max_snap_files=1"],
)
@@ -469,7 +472,7 @@ def test_slots_and_branching(neon_simple_env: NeonEnv):
def test_replication_shutdown(neon_simple_env: NeonEnv):
# Ensure Postgres can exit without stuck when a replication job is active + neon extension installed
env = neon_simple_env
env.neon_cli.create_branch("test_replication_shutdown_publisher", "main")
env.neon_cli.create_branch("test_replication_shutdown_publisher", "empty")
pub = env.endpoints.create("test_replication_shutdown_publisher")
env.neon_cli.create_branch("test_replication_shutdown_subscriber")

View File

@@ -9,8 +9,9 @@ if TYPE_CHECKING:
def test_migrations(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_migrations", "empty")
endpoint = env.endpoints.create("main")
endpoint = env.endpoints.create("test_migrations")
endpoint.respec(skip_pg_catalog_updates=False)
endpoint.start()

View File

@@ -14,7 +14,8 @@ from fixtures.utils import query_scalar
#
def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
env.neon_cli.create_branch("test_multixact", "empty")
endpoint = env.endpoints.create_start("test_multixact")
cur = endpoint.connect().cursor()
cur.execute(
@@ -72,9 +73,7 @@ def test_multixact(neon_simple_env: NeonEnv, test_output_dir):
assert int(next_multixact_id) > int(next_multixact_id_old)
# Branch at this point
env.neon_cli.create_branch(
"test_multixact_new", ancestor_branch_name="main", ancestor_start_lsn=lsn
)
env.neon_cli.create_branch("test_multixact_new", "test_multixact", ancestor_start_lsn=lsn)
endpoint_new = env.endpoints.create_start("test_multixact_new")
next_multixact_id_new = endpoint_new.safe_psql(

View File

@@ -6,7 +6,7 @@ from fixtures.utils import wait_until
def test_neon_superuser(neon_simple_env: NeonEnv, pg_version: PgVersion):
env = neon_simple_env
env.neon_cli.create_branch("test_neon_superuser_publisher", "main")
env.neon_cli.create_branch("test_neon_superuser_publisher", "empty")
pub = env.endpoints.create("test_neon_superuser_publisher")
env.neon_cli.create_branch("test_neon_superuser_subscriber")

View File

@@ -41,7 +41,8 @@ async def parallel_load_same_table(endpoint: Endpoint, n_parallel: int):
# Load data into one table with COPY TO from 5 parallel connections
def test_parallel_copy(neon_simple_env: NeonEnv, n_parallel=5):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
env.neon_cli.create_branch("test_parallel_copy", "empty")
endpoint = env.endpoints.create_start("test_parallel_copy")
# Create test table
conn = endpoint.connect()

View File

@@ -42,9 +42,11 @@ def test_cancellations(neon_simple_env: NeonEnv):
ps_http = ps.http_client()
ps_http.is_testing_enabled_or_skip()
env.neon_cli.create_branch("test_config", "empty")
# We don't want to have any racy behaviour with autovacuum IOs
ep = env.endpoints.create_start(
"main",
"test_config",
config_lines=[
"autovacuum = off",
"shared_buffers = 128MB",

View File

@@ -22,8 +22,8 @@ def check_wal_segment(pg_waldump_path: str, segment_path: str, test_output_dir):
def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
env = neon_simple_env
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
endpoint = env.endpoints.create_start("main")
timeline_id = env.neon_cli.create_branch("test_pg_waldump", "empty")
endpoint = env.endpoints.create_start("test_pg_waldump")
cur = endpoint.connect().cursor()
cur.execute(

View File

@@ -15,8 +15,12 @@ extensions = ["pageinspect", "neon_test_utils", "pg_buffercache"]
#
def test_read_validation(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_read_validation", "empty")
endpoint = env.endpoints.create_start(
"test_read_validation",
)
endpoint = env.endpoints.create_start("main")
with closing(endpoint.connect()) as con:
with con.cursor() as c:
for e in extensions:
@@ -127,9 +131,13 @@ def test_read_validation(neon_simple_env: NeonEnv):
def test_read_validation_neg(neon_simple_env: NeonEnv):
env = neon_simple_env
env.neon_cli.create_branch("test_read_validation_neg", "empty")
env.pageserver.allowed_errors.append(".*invalid LSN\\(0\\) in request.*")
endpoint = env.endpoints.create_start("main")
endpoint = env.endpoints.create_start(
"test_read_validation_neg",
)
with closing(endpoint.connect()) as con:
with con.cursor() as c:

View File

@@ -22,7 +22,8 @@ from fixtures.utils import query_scalar
#
def test_readonly_node(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint_main = env.endpoints.create_start("main")
env.neon_cli.create_branch("test_readonly_node", "empty")
endpoint_main = env.endpoints.create_start("test_readonly_node")
env.pageserver.allowed_errors.extend(
[
@@ -73,12 +74,12 @@ def test_readonly_node(neon_simple_env: NeonEnv):
# Create first read-only node at the point where only 100 rows were inserted
endpoint_hundred = env.endpoints.create_start(
branch_name="main", endpoint_id="ep-readonly_node_hundred", lsn=lsn_a
branch_name="test_readonly_node", endpoint_id="ep-readonly_node_hundred", lsn=lsn_a
)
# And another at the point where 200100 rows were inserted
endpoint_more = env.endpoints.create_start(
branch_name="main", endpoint_id="ep-readonly_node_more", lsn=lsn_b
branch_name="test_readonly_node", endpoint_id="ep-readonly_node_more", lsn=lsn_b
)
# On the 'hundred' node, we should see only 100 rows
@@ -99,7 +100,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
# Check creating a node at segment boundary
endpoint = env.endpoints.create_start(
branch_name="main",
branch_name="test_readonly_node",
endpoint_id="ep-branch_segment_boundary",
lsn=Lsn("0/3000000"),
)
@@ -111,7 +112,7 @@ def test_readonly_node(neon_simple_env: NeonEnv):
with pytest.raises(Exception, match="invalid basebackup lsn"):
# compute node startup with invalid LSN should fail
env.endpoints.create_start(
branch_name="main",
branch_name="test_readonly_node",
endpoint_id="ep-readonly_node_preinitdb",
lsn=Lsn("0/42"),
)
@@ -217,10 +218,14 @@ def test_readonly_node_gc(neon_env_builder: NeonEnvBuilder):
# Similar test, but with more data, and we force checkpoints
def test_timetravel(neon_simple_env: NeonEnv):
env = neon_simple_env
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
pageserver_http_client = env.pageserver.http_client()
env.neon_cli.create_branch("test_timetravel", "empty")
endpoint = env.endpoints.create_start("test_timetravel")
client = env.pageserver.http_client()
endpoint = env.endpoints.create_start("main")
tenant_id = endpoint.safe_psql("show neon.tenant_id")[0][0]
timeline_id = endpoint.safe_psql("show neon.timeline_id")[0][0]
lsns = []
@@ -244,7 +249,7 @@ def test_timetravel(neon_simple_env: NeonEnv):
wait_for_last_record_lsn(client, tenant_id, timeline_id, current_lsn)
# run checkpoint manually to force a new layer file
client.timeline_checkpoint(tenant_id, timeline_id)
pageserver_http_client.timeline_checkpoint(tenant_id, timeline_id)
##### Restart pageserver
env.endpoints.stop_all()
@@ -253,7 +258,7 @@ def test_timetravel(neon_simple_env: NeonEnv):
for i, lsn in lsns:
endpoint_old = env.endpoints.create_start(
branch_name="main", endpoint_id=f"ep-old_lsn_{i}", lsn=lsn
branch_name="test_timetravel", endpoint_id=f"ep-old_lsn_{i}", lsn=lsn
)
with endpoint_old.cursor() as cur:
assert query_scalar(cur, f"select count(*) from testtab where iteration={i}") == 100000

View File

@@ -374,7 +374,7 @@ def test_sharding_split_smoke(
non_default_tenant_config = {"gc_horizon": 77 * 1024 * 1024}
env = neon_env_builder.init_configs(True)
env.start()
neon_env_builder.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
env.neon_cli.create_tenant(
@@ -1436,7 +1436,7 @@ def test_sharding_unlogged_relation(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_configs()
env.start()
neon_env_builder.start()
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
@@ -1475,7 +1475,7 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder):
"""
env = neon_env_builder.init_configs()
env.start()
neon_env_builder.start()
tenants = []
n_tenants = 8

View File

@@ -9,7 +9,8 @@ from fixtures.neon_fixtures import NeonEnv, check_restored_datadir_content
# CLOG.
def test_subxacts(neon_simple_env: NeonEnv, test_output_dir):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
env.neon_cli.create_branch("test_subxacts", "empty")
endpoint = env.endpoints.create_start("test_subxacts")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()

View File

@@ -68,13 +68,10 @@ def test_timeline_delete(neon_simple_env: NeonEnv):
# construct pair of branches to validate that pageserver prohibits
# deletion of ancestor timelines when they have child branches
parent_timeline_id = env.neon_cli.create_branch(
new_branch_name="test_ancestor_branch_delete_parent", ancestor_branch_name="main"
)
parent_timeline_id = env.neon_cli.create_branch("test_ancestor_branch_delete_parent", "empty")
leaf_timeline_id = env.neon_cli.create_branch(
new_branch_name="test_ancestor_branch_delete_branch1",
ancestor_branch_name="test_ancestor_branch_delete_parent",
"test_ancestor_branch_delete_branch1", "test_ancestor_branch_delete_parent"
)
timeline_path = env.pageserver.timeline_dir(env.initial_tenant, parent_timeline_id)

View File

@@ -36,7 +36,7 @@ from fixtures.utils import get_timeline_dir_size, wait_until
def test_timeline_size(neon_simple_env: NeonEnv):
env = neon_simple_env
new_timeline_id = env.neon_cli.create_branch("test_timeline_size", "main")
new_timeline_id = env.neon_cli.create_branch("test_timeline_size", "empty")
client = env.pageserver.http_client()
client.timeline_wait_logical_size(env.initial_tenant, new_timeline_id)
@@ -68,7 +68,7 @@ def test_timeline_size(neon_simple_env: NeonEnv):
def test_timeline_size_createdropdb(neon_simple_env: NeonEnv):
env = neon_simple_env
new_timeline_id = env.neon_cli.create_branch("test_timeline_size_createdropdb", "main")
new_timeline_id = env.neon_cli.create_branch("test_timeline_size_createdropdb", "empty")
client = env.pageserver.http_client()
client.timeline_wait_logical_size(env.initial_tenant, new_timeline_id)

View File

@@ -9,7 +9,10 @@ from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
#
def test_twophase(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main", config_lines=["max_prepared_transactions=5"])
env.neon_cli.create_branch("test_twophase", "empty")
endpoint = env.endpoints.create_start(
"test_twophase", config_lines=["max_prepared_transactions=5"]
)
conn = endpoint.connect()
cur = conn.cursor()
@@ -53,7 +56,7 @@ def test_twophase(neon_simple_env: NeonEnv):
assert len(twophase_files) == 2
# Create a branch with the transaction in prepared state
fork_at_current_lsn(env, endpoint, "test_twophase_prepared", "main")
fork_at_current_lsn(env, endpoint, "test_twophase_prepared", "test_twophase")
# Start compute on the new branch
endpoint2 = env.endpoints.create_start(

View File

@@ -9,7 +9,8 @@ from fixtures.pg_version import PgVersion
#
def test_unlogged(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
env.neon_cli.create_branch("test_unlogged", "empty")
endpoint = env.endpoints.create_start("test_unlogged")
conn = endpoint.connect()
cur = conn.cursor()
@@ -21,7 +22,7 @@ def test_unlogged(neon_simple_env: NeonEnv):
cur.execute("INSERT INTO iut (id) values (42);")
# create another compute to fetch inital empty contents from pageserver
fork_at_current_lsn(env, endpoint, "test_unlogged_basebackup", "main")
fork_at_current_lsn(env, endpoint, "test_unlogged_basebackup", "test_unlogged")
endpoint2 = env.endpoints.create_start("test_unlogged_basebackup")
conn2 = endpoint2.connect()

View File

@@ -13,7 +13,8 @@ from fixtures.utils import query_scalar
def test_vm_bit_clear(neon_simple_env: NeonEnv):
env = neon_simple_env
endpoint = env.endpoints.create_start("main")
env.neon_cli.create_branch("test_vm_bit_clear", "empty")
endpoint = env.endpoints.create_start("test_vm_bit_clear")
pg_conn = endpoint.connect()
cur = pg_conn.cursor()
@@ -57,7 +58,7 @@ def test_vm_bit_clear(neon_simple_env: NeonEnv):
cur.execute("UPDATE vmtest_cold_update2 SET id = 5000, filler=repeat('x', 200) WHERE id = 1")
# Branch at this point, to test that later
fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "main")
fork_at_current_lsn(env, endpoint, "test_vm_bit_clear_new", "test_vm_bit_clear")
# Clear the buffer cache, to force the VM page to be re-fetched from
# the page server

View File

@@ -23,7 +23,8 @@ run_broken = pytest.mark.skipif(
def test_broken(neon_simple_env: NeonEnv, pg_bin):
env = neon_simple_env
env.endpoints.create_start("main")
env.neon_cli.create_branch("test_broken", "empty")
env.endpoints.create_start("test_broken")
log.info("postgres is running")
log.info("THIS NEXT COMMAND WILL FAIL:")

View File

@@ -1,11 +1,11 @@
{
"v16": [
"16.4",
"0baa7346dfd42d61912eeca554c9bb0a190f0a1e"
"6e9a4ff6249ac02b8175054b7b3f7dfb198be48b"
],
"v15": [
"15.8",
"6f6d77fb5960602fcd3fd130aca9f99ecb1619c9"
"49d5e576a56e4cc59cd6a6a0791b2324b9fa675e"
],
"v14": [
"14.13",

View File

@@ -60,7 +60,7 @@ num-bigint = { version = "0.4" }
num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master", default-features = false, features = ["zstd"] }
prost = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
@@ -83,7 +83,6 @@ time = { version = "0.3", features = ["macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-rustls = { version = "0.24" }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
toml_edit = { version = "0.22", features = ["serde"] }
tonic = { version = "0.9", features = ["tls-roots"] }
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "timeout", "util"] }
tracing = { version = "0.1", features = ["log"] }
@@ -116,7 +115,7 @@ num-bigint = { version = "0.4" }
num-integer = { version = "0.1", features = ["i128"] }
num-traits = { version = "0.2", features = ["i128", "libm"] }
once_cell = { version = "1" }
parquet = { version = "53", default-features = false, features = ["zstd"] }
parquet = { git = "https://github.com/apache/arrow-rs", branch = "master", default-features = false, features = ["zstd"] }
proc-macro2 = { version = "1" }
prost = { version = "0.11" }
quote = { version = "1" }
@@ -127,7 +126,6 @@ serde = { version = "1", features = ["alloc", "derive"] }
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] }
time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] }
toml_edit = { version = "0.22", features = ["serde"] }
zstd = { version = "0.13" }
zstd-safe = { version = "7", default-features = false, features = ["arrays", "legacy", "std", "zdict_builder"] }
zstd-sys = { version = "2", default-features = false, features = ["legacy", "std", "zdict_builder"] }