Compare commits

..

1 Commits

Author SHA1 Message Date
piercypixel
4f4a96ea25 Skip custom extensions in fast import 2025-06-03 14:18:53 +00:00
197 changed files with 2408 additions and 5188 deletions

8
.gitmodules vendored
View File

@@ -1,16 +1,16 @@
[submodule "vendor/postgres-v14"]
path = vendor/postgres-v14
url = https://github.com/neondatabase/postgres.git
branch = REL_14_STABLE_neon
branch = 28934-pg-dump-schema-no-create-v14
[submodule "vendor/postgres-v15"]
path = vendor/postgres-v15
url = https://github.com/neondatabase/postgres.git
branch = REL_15_STABLE_neon
branch = 28934-pg-dump-schema-no-create-v15
[submodule "vendor/postgres-v16"]
path = vendor/postgres-v16
url = https://github.com/neondatabase/postgres.git
branch = REL_16_STABLE_neon
branch = 28934-pg-dump-schema-no-create-v16
[submodule "vendor/postgres-v17"]
path = vendor/postgres-v17
url = https://github.com/neondatabase/postgres.git
branch = REL_17_STABLE_neon
branch = 28934-pg-dump-schema-no-create-v17

46
Cargo.lock generated
View File

@@ -753,7 +753,6 @@ dependencies = [
"axum",
"axum-core",
"bytes",
"form_urlencoded",
"futures-util",
"headers",
"http 1.1.0",
@@ -762,8 +761,6 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
"serde_html_form",
"serde_path_to_error",
"tower 0.5.2",
"tower-layer",
"tower-service",
@@ -903,6 +900,12 @@ version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5"
[[package]]
name = "base64"
version = "0.21.7"
@@ -1294,7 +1297,7 @@ dependencies = [
"aws-smithy-types",
"axum",
"axum-extra",
"base64 0.22.1",
"base64 0.13.1",
"bytes",
"camino",
"cfg-if",
@@ -1420,7 +1423,7 @@ name = "control_plane"
version = "0.1.0"
dependencies = [
"anyhow",
"base64 0.22.1",
"base64 0.13.1",
"camino",
"clap",
"comfy-table",
@@ -1442,7 +1445,6 @@ dependencies = [
"regex",
"reqwest",
"safekeeper_api",
"safekeeper_client",
"scopeguard",
"serde",
"serde_json",
@@ -2052,7 +2054,6 @@ dependencies = [
"axum-extra",
"camino",
"camino-tempfile",
"clap",
"futures",
"http-body-util",
"itertools 0.10.5",
@@ -4236,7 +4237,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"camino",
"clap",
"futures",
@@ -4812,7 +4812,7 @@ dependencies = [
name = "postgres-protocol2"
version = "0.1.0"
dependencies = [
"base64 0.22.1",
"base64 0.20.0",
"byteorder",
"bytes",
"fallible-iterator",
@@ -5184,7 +5184,7 @@ dependencies = [
"aws-config",
"aws-sdk-iam",
"aws-sigv4",
"base64 0.22.1",
"base64 0.13.1",
"bstr",
"bytes",
"camino",
@@ -6419,19 +6419,6 @@ dependencies = [
"syn 2.0.100",
]
[[package]]
name = "serde_html_form"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d2de91cf02bbc07cde38891769ccd5d4f073d22a40683aa4bc7a95781aaa2c4"
dependencies = [
"form_urlencoded",
"indexmap 2.9.0",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "serde_json"
version = "1.0.125"
@@ -6488,17 +6475,15 @@ dependencies = [
[[package]]
name = "serde_with"
version = "3.12.0"
version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6b6f7f2fcb69f747921f79f3926bd1e203fce4fef62c268dd3abfb6d86029aa"
checksum = "07ff71d2c147a7b57362cead5e22f772cd52f6ab31cfcd9edcd7f6aeb2a0afbe"
dependencies = [
"base64 0.22.1",
"base64 0.13.1",
"chrono",
"hex",
"indexmap 1.9.3",
"indexmap 2.9.0",
"serde",
"serde_derive",
"serde_json",
"serde_with_macros",
"time",
@@ -6506,9 +6491,9 @@ dependencies = [
[[package]]
name = "serde_with_macros"
version = "3.12.0"
version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d00caa5193a3c8362ac2b73be6b9e768aa5a4b2f721d8f4b339600c3cb51f8e"
checksum = "881b6f881b17d13214e5d494c939ebab463d01264ce1811e9d4ac3a882e7695f"
dependencies = [
"darling",
"proc-macro2",
@@ -8579,6 +8564,7 @@ dependencies = [
"anyhow",
"axum",
"axum-core",
"base64 0.13.1",
"base64 0.21.7",
"base64ct",
"bytes",

View File

@@ -71,8 +71,8 @@ aws-credential-types = "1.2.0"
aws-sigv4 = { version = "1.2", features = ["sign-http"] }
aws-types = "1.3"
axum = { version = "0.8.1", features = ["ws"] }
axum-extra = { version = "0.10.0", features = ["typed-header", "query"] }
base64 = "0.22"
axum-extra = { version = "0.10.0", features = ["typed-header"] }
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.71"
bit_field = "0.10.2"
@@ -171,7 +171,7 @@ sentry = { version = "0.37", default-features = false, features = ["backtrace",
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_path_to_error = "0.1"
serde_with = { version = "3", features = [ "base64" ] }
serde_with = { version = "2.0", features = [ "base64" ] }
serde_assert = "0.5.0"
sha2 = "0.10.2"
signal-hook = "0.3"

View File

@@ -110,19 +110,6 @@ RUN set -e \
# System postgres for use with client libraries (e.g. in storage controller)
postgresql-15 \
openssl \
unzip \
curl \
&& ARCH=$(uname -m) \
&& if [ "$ARCH" = "x86_64" ]; then \
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"; \
elif [ "$ARCH" = "aarch64" ]; then \
curl "https://awscli.amazonaws.com/awscli-exe-linux-aarch64.zip" -o "awscliv2.zip"; \
else \
echo "Unsupported architecture: $ARCH" && exit 1; \
fi \
&& unzip awscliv2.zip \
&& ./aws/install \
&& rm -rf aws awscliv2.zip \
&& rm -f /etc/apt/apt.conf.d/80-retries \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* \
&& useradd -d /data neon \

View File

@@ -297,7 +297,6 @@ RUN ./autogen.sh && \
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
make staged-install && \
cd extensions/postgis && \
make clean && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
@@ -603,7 +602,7 @@ RUN case "${PG_VERSION:?}" in \
;; \
esac && \
wget https://github.com/knizhnik/online_advisor/archive/refs/tags/1.0.tar.gz -O online_advisor.tar.gz && \
echo "37dcadf8f7cc8d6cc1f8831276ee245b44f1b0274f09e511e47a67738ba9ed0f online_advisor.tar.gz" | sha256sum --check && \
echo "059b7d9e5a90013a58bdd22e9505b88406ce05790675eb2d8434e5b215652d54 online_advisor.tar.gz" | sha256sum --check && \
mkdir online_advisor-src && cd online_advisor-src && tar xzf ../online_advisor.tar.gz --strip-components=1 -C .
FROM pg-build AS online_advisor-build
@@ -1843,25 +1842,10 @@ RUN make PG_VERSION="${PG_VERSION:?}" -C compute
FROM pg-build AS extension-tests
ARG PG_VERSION
# This is required for the PostGIS test
RUN apt-get update && case $DEBIAN_VERSION in \
bullseye) \
apt-get install -y libproj19 libgdal28 time; \
;; \
bookworm) \
apt-get install -y libgdal32 libproj25 time; \
;; \
*) \
echo "Unknown Debian version ${DEBIAN_VERSION}" && exit 1 \
;; \
esac
COPY docker-compose/ext-src/ /ext-src/
COPY --from=pg-build /postgres /postgres
COPY --from=postgis-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=postgis-build /ext-src/postgis-src /ext-src/postgis-src
COPY --from=postgis-build /sfcgal/* /usr
#COPY --from=postgis-src /ext-src/ /ext-src/
COPY --from=plv8-src /ext-src/ /ext-src/
COPY --from=h3-pg-src /ext-src/h3-pg-src /ext-src/h3-pg-src
COPY --from=postgresql-unit-src /ext-src/ /ext-src/
@@ -1902,7 +1886,6 @@ COPY compute/patches/pg_repack.patch /ext-src
RUN cd /ext-src/pg_repack-src && patch -p1 </ext-src/pg_repack.patch && rm -f /ext-src/pg_repack.patch
COPY --chmod=755 docker-compose/run-tests.sh /run-tests.sh
RUN echo /usr/local/pgsql/lib > /etc/ld.so.conf.d/00-neon.conf && /sbin/ldconfig
RUN apt-get update && apt-get install -y libtap-parser-sourcehandler-pgtap-perl jq \
&& apt clean && rm -rf /ext-src/*.tar.gz /ext-src/*.patch /var/lib/apt/lists/*
ENV PATH=/usr/local/pgsql/bin:$PATH

View File

@@ -1,121 +0,0 @@
pg_settings:
# Common settings for primaries and replicas of all versions.
common:
# Check for client disconnection every 1 minute. By default, Postgres will detect the
# loss of the connection only at the next interaction with the socket, when it waits
# for, receives or sends data, so it will likely waste resources till the end of the
# query execution. There should be no drawbacks in setting this for everyone, so enable
# it by default. If anyone will complain, we can allow editing it.
# https://www.postgresql.org/docs/16/runtime-config-connection.html#GUC-CLIENT-CONNECTION-CHECK-INTERVAL
client_connection_check_interval: "60000" # 1 minute
# ---- IO ----
effective_io_concurrency: "20"
maintenance_io_concurrency: "100"
fsync: "off"
hot_standby: "off"
# We allow users to change this if needed, but by default we
# just don't want to see long-lasting idle transactions, as they
# prevent activity monitor from suspending projects.
idle_in_transaction_session_timeout: "300000" # 5 minutes
listen_addresses: "*"
# --- LOGGING ---- helps investigations
log_connections: "on"
log_disconnections: "on"
# 1GB, unit is KB
log_temp_files: "1048576"
# Disable dumping customer data to logs, both to increase data privacy
# and to reduce the amount the logs.
log_error_verbosity: "terse"
log_min_error_statement: "panic"
max_connections: "100"
# --- WAL ---
# - flush lag is the max amount of WAL that has been generated but not yet stored
# to disk in the page server. A smaller value means less delay after a pageserver
# restart, but if you set it too small you might again need to slow down writes if the
# pageserver cannot flush incoming WAL to disk fast enough. This must be larger
# than the pageserver's checkpoint interval, currently 1 GB! Otherwise you get a
# a deadlock where the compute node refuses to generate more WAL before the
# old WAL has been uploaded to S3, but the pageserver is waiting for more WAL
# to be generated before it is uploaded to S3.
max_replication_flush_lag: "10GB"
max_replication_slots: "10"
# Backpressure configuration:
# - write lag is the max amount of WAL that has been generated by Postgres but not yet
# processed by the page server. Making this smaller reduces the worst case latency
# of a GetPage request, if you request a page that was recently modified. On the other
# hand, if this is too small, the compute node might need to wait on a write if there is a
# hiccup in the network or page server so that the page server has temporarily fallen
# behind.
#
# Previously it was set to 500 MB, but it caused compute being unresponsive under load
# https://github.com/neondatabase/neon/issues/2028
max_replication_write_lag: "500MB"
max_wal_senders: "10"
# A Postgres checkpoint is cheap in storage, as doesn't involve any significant amount
# of real I/O. Only the SLRU buffers and some other small files are flushed to disk.
# However, as long as we have full_page_writes=on, page updates after a checkpoint
# include full-page images which bloats the WAL. So may want to bump max_wal_size to
# reduce the WAL bloating, but at the same it will increase pg_wal directory size on
# compute and can lead to out of disk error on k8s nodes.
max_wal_size: "1024"
wal_keep_size: "0"
wal_level: "replica"
# Reduce amount of WAL generated by default.
wal_log_hints: "off"
# - without wal_sender_timeout set we don't get feedback messages,
# required for backpressure.
wal_sender_timeout: "10000"
# We have some experimental extensions, which we don't want users to install unconsciously.
# To install them, users would need to set the `neon.allow_unstable_extensions` setting.
# There are two of them currently:
# - `pgrag` - https://github.com/neondatabase-labs/pgrag - extension is actually called just `rag`,
# and two dependencies:
# - `rag_bge_small_en_v15`
# - `rag_jina_reranker_v1_tiny_en`
# - `pg_mooncake` - https://github.com/Mooncake-Labs/pg_mooncake/
neon.unstable_extensions: "rag,rag_bge_small_en_v15,rag_jina_reranker_v1_tiny_en,pg_mooncake,anon"
neon.protocol_version: "3"
password_encryption: "scram-sha-256"
# This is important to prevent Postgres from trying to perform
# a local WAL redo after backend crash. It should exit and let
# the systemd or k8s to do a fresh startup with compute_ctl.
restart_after_crash: "off"
# By default 3. We have the following persistent connections in the VM:
# * compute_activity_monitor (from compute_ctl)
# * postgres-exporter (metrics collector; it has 2 connections)
# * sql_exporter (metrics collector; we have 2 instances [1 for us & users; 1 for autoscaling])
# * vm-monitor (to query & change file cache size)
# i.e. total of 6. Let's reserve 7, so there's still at least one left over.
superuser_reserved_connections: "7"
synchronous_standby_names: "walproposer"
replica:
hot_standby: "on"
per_version:
17:
common:
# PostgreSQL 17 has a new IO system called "read stream", which can combine IOs up to some
# size. It still has some issues with readahead, though, so we default to disabled/
# "no combining of IOs" to make sure we get the maximum prefetch depth.
# See also: https://github.com/neondatabase/neon/pull/9860
io_combine_limit: "1"
replica:
# prefetching of blocks referenced in WAL doesn't make sense for us
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
16:
common:
replica:
# prefetching of blocks referenced in WAL doesn't make sense for us
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
15:
common:
replica:
# prefetching of blocks referenced in WAL doesn't make sense for us
# Neon hot standby ignores pages that are not in the shared_buffers
recovery_prefetch: "off"
14:
common:
replica:

View File

@@ -40,7 +40,7 @@ use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use anyhow::{Context, Result, bail};
use anyhow::{Context, Result};
use clap::Parser;
use compute_api::responses::ComputeConfig;
use compute_tools::compute::{
@@ -57,14 +57,14 @@ use tracing::{error, info};
use url::Url;
use utils::failpoint_support;
#[derive(Debug, Parser)]
#[derive(Parser)]
#[command(rename_all = "kebab-case")]
struct Cli {
#[arg(short = 'b', long, default_value = "postgres", env = "POSTGRES_PATH")]
pub pgbin: String,
/// The base URL for the remote extension storage proxy gateway.
#[arg(short = 'r', long, value_parser = Self::parse_remote_ext_base_url)]
#[arg(short = 'r', long)]
pub remote_ext_base_url: Option<Url>,
/// The port to bind the external listening HTTP server to. Clients running
@@ -126,25 +126,6 @@ struct Cli {
pub installed_extensions_collection_interval: u64,
}
impl Cli {
/// Parse a URL from an argument. By default, this isn't necessary, but we
/// want to do some sanity checking.
fn parse_remote_ext_base_url(value: &str) -> Result<Url> {
// Remove extra trailing slashes, and add one. We use Url::join() later
// when downloading remote extensions. If the base URL is something like
// http://example.com/pg-ext-s3-gateway, and join() is called with
// something like "xyz", the resulting URL is http://example.com/xyz.
let value = value.trim_end_matches('/').to_owned() + "/";
let url = Url::parse(&value)?;
if url.query_pairs().count() != 0 {
bail!("parameters detected in remote extensions base URL")
}
Ok(url)
}
}
fn main() -> Result<()> {
let cli = Cli::parse();
@@ -271,8 +252,7 @@ fn handle_exit_signal(sig: i32) {
#[cfg(test)]
mod test {
use clap::{CommandFactory, Parser};
use url::Url;
use clap::CommandFactory;
use super::Cli;
@@ -280,43 +260,4 @@ mod test {
fn verify_cli() {
Cli::command().debug_assert()
}
#[test]
fn verify_remote_ext_base_url() {
let cli = Cli::parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--remote-ext-base-url",
"https://example.com/subpath",
]);
assert_eq!(
cli.remote_ext_base_url.unwrap(),
Url::parse("https://example.com/subpath/").unwrap()
);
let cli = Cli::parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--remote-ext-base-url",
"https://example.com//",
]);
assert_eq!(
cli.remote_ext_base_url.unwrap(),
Url::parse("https://example.com").unwrap()
);
Cli::try_parse_from([
"compute_ctl",
"--pgdata=test",
"--connstr=test",
"--compute-id=test",
"--remote-ext-base-url",
"https://example.com?hello=world",
])
.expect_err("URL parameters are not allowed");
}
}

View File

@@ -70,6 +70,14 @@ enum Command {
/// and maintenance_work_mem.
#[clap(long, env = "NEON_IMPORTER_MEMORY_MB")]
memory_mb: Option<usize>,
/// List of schemas to dump.
#[clap(long)]
schema: Vec<String>,
/// List of extensions to dump.
#[clap(long)]
extension: Vec<String>,
},
/// Runs pg_dump-pg_restore from source to destination without running local postgres.
@@ -82,6 +90,12 @@ enum Command {
/// real scenario uses encrypted connection string in spec.json from s3.
#[clap(long)]
destination_connection_string: Option<String>,
/// List of schemas to dump.
#[clap(long)]
schema: Vec<String>,
/// List of extensions to dump.
#[clap(long)]
extension: Vec<String>,
},
}
@@ -117,6 +131,8 @@ struct Spec {
source_connstring_ciphertext_base64: Vec<u8>,
#[serde_as(as = "Option<serde_with::base64::Base64>")]
destination_connstring_ciphertext_base64: Option<Vec<u8>>,
schemas: Option<Vec<String>>,
extensions: Option<Vec<String>>,
}
#[derive(serde::Deserialize)]
@@ -337,6 +353,8 @@ async fn run_dump_restore(
pg_lib_dir: Utf8PathBuf,
source_connstring: String,
destination_connstring: String,
schemas: Vec<String>,
extensions: Vec<String>,
) -> Result<(), anyhow::Error> {
let dumpdir = workdir.join("dumpdir");
let num_jobs = num_cpus::get().to_string();
@@ -351,6 +369,7 @@ async fn run_dump_restore(
"--no-subscriptions".to_string(),
"--no-tablespaces".to_string(),
"--no-event-triggers".to_string(),
"--enable-row-security".to_string(),
// format
"--format".to_string(),
"directory".to_string(),
@@ -361,10 +380,36 @@ async fn run_dump_restore(
"--verbose".to_string(),
];
let mut pg_dump_args = vec![
// this makes sure any unsupported extensions are not included in the dump
// even if we don't specify supported extensions explicitly
"--extension".to_string(),
"plpgsql".to_string(),
];
// if no schemas are specified, try to import all schemas
if !schemas.is_empty() {
// always include public schema objects
// but never create the schema itself
// it already exists in any pg cluster by default
pg_dump_args.push("--schema-no-create".to_string());
pg_dump_args.push("public".to_string());
for schema in &schemas {
pg_dump_args.push("--schema".to_string());
pg_dump_args.push(schema.clone());
}
}
for extension in &extensions {
pg_dump_args.push("--extension".to_string());
pg_dump_args.push(extension.clone());
}
info!("dump into the working directory");
{
let mut pg_dump = tokio::process::Command::new(pg_bin_dir.join("pg_dump"))
.args(&common_args)
.args(&pg_dump_args)
.arg("-f")
.arg(&dumpdir)
.arg("--no-sync")
@@ -455,6 +500,8 @@ async fn cmd_pgdata(
maybe_s3_prefix: Option<s3_uri::S3Uri>,
maybe_spec: Option<Spec>,
source_connection_string: Option<String>,
schemas: Vec<String>,
extensions: Vec<String>,
interactive: bool,
pg_port: u16,
workdir: Utf8PathBuf,
@@ -470,19 +517,25 @@ async fn cmd_pgdata(
bail!("only one of spec or source_connection_string can be provided");
}
let source_connection_string = if let Some(spec) = maybe_spec {
let (source_connection_string, schemas, extensions) = if let Some(spec) = maybe_spec {
match spec.encryption_secret {
EncryptionSecret::KMS { key_id } => {
decode_connstring(
let schemas = spec.schemas.unwrap_or(vec![]);
let extensions = spec.extensions.unwrap_or(vec![]);
let source = decode_connstring(
kms_client.as_ref().unwrap(),
&key_id,
spec.source_connstring_ciphertext_base64,
)
.await?
.await
.context("decrypt source connection string")?;
(source, schemas, extensions)
}
}
} else {
source_connection_string.unwrap()
(source_connection_string.unwrap(), schemas, extensions)
};
let superuser = "cloud_admin";
@@ -504,6 +557,8 @@ async fn cmd_pgdata(
pg_lib_dir,
source_connection_string,
destination_connstring,
schemas,
extensions,
)
.await?;
@@ -546,18 +601,26 @@ async fn cmd_pgdata(
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn cmd_dumprestore(
kms_client: Option<aws_sdk_kms::Client>,
maybe_spec: Option<Spec>,
source_connection_string: Option<String>,
destination_connection_string: Option<String>,
schemas: Vec<String>,
extensions: Vec<String>,
workdir: Utf8PathBuf,
pg_bin_dir: Utf8PathBuf,
pg_lib_dir: Utf8PathBuf,
) -> Result<(), anyhow::Error> {
let (source_connstring, destination_connstring) = if let Some(spec) = maybe_spec {
let (source_connstring, destination_connstring, schemas, extensions) = if let Some(spec) =
maybe_spec
{
match spec.encryption_secret {
EncryptionSecret::KMS { key_id } => {
let schemas = spec.schemas.unwrap_or(vec![]);
let extensions = spec.extensions.unwrap_or(vec![]);
let source = decode_connstring(
kms_client.as_ref().unwrap(),
&key_id,
@@ -578,18 +641,17 @@ async fn cmd_dumprestore(
);
};
(source, dest)
(source, dest, schemas, extensions)
}
}
} else {
(
source_connection_string.unwrap(),
if let Some(val) = destination_connection_string {
val
} else {
bail!("destination connection string must be provided for dump_restore command");
},
)
let dest = if let Some(val) = destination_connection_string {
val
} else {
bail!("destination connection string must be provided for dump_restore command");
};
(source_connection_string.unwrap(), dest, schemas, extensions)
};
run_dump_restore(
@@ -598,6 +660,8 @@ async fn cmd_dumprestore(
pg_lib_dir,
source_connstring,
destination_connstring,
schemas,
extensions,
)
.await
}
@@ -679,6 +743,8 @@ pub(crate) async fn main() -> anyhow::Result<()> {
pg_port,
num_cpus,
memory_mb,
schema,
extension,
} => {
cmd_pgdata(
s3_client.as_ref(),
@@ -686,6 +752,8 @@ pub(crate) async fn main() -> anyhow::Result<()> {
args.s3_prefix.clone(),
spec,
source_connection_string,
schema,
extension,
interactive,
pg_port,
args.working_directory.clone(),
@@ -699,12 +767,16 @@ pub(crate) async fn main() -> anyhow::Result<()> {
Command::DumpRestore {
source_connection_string,
destination_connection_string,
schema,
extension,
} => {
cmd_dumprestore(
kms_client,
spec,
source_connection_string,
destination_connection_string,
schema,
extension,
args.working_directory.clone(),
args.pg_bin_dir,
args.pg_lib_dir,

View File

@@ -3,7 +3,7 @@ use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{
ComputeConfig, ComputeCtlConfig, ComputeMetrics, ComputeStatus, LfcOffloadState,
LfcPrewarmState, TlsConfig,
LfcPrewarmState,
};
use compute_api::spec::{
ComputeAudit, ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent,
@@ -396,7 +396,7 @@ impl ComputeNode {
// because QEMU will already have its memory allocated from the host, and
// the necessary binaries will already be cached.
if cli_spec.is_none() {
this.prewarm_postgres_vm_memory()?;
this.prewarm_postgres()?;
}
// Set the up metric with Empty status before starting the HTTP server.
@@ -603,8 +603,6 @@ impl ComputeNode {
});
}
let tls_config = self.tls_config(&pspec.spec);
// If there are any remote extensions in shared_preload_libraries, start downloading them
if pspec.spec.remote_extensions.is_some() {
let (this, spec) = (self.clone(), pspec.spec.clone());
@@ -661,7 +659,7 @@ impl ComputeNode {
info!("tuning pgbouncer");
let pgbouncer_settings = pgbouncer_settings.clone();
let tls_config = tls_config.clone();
let tls_config = self.compute_ctl_config.tls.clone();
// Spawn a background task to do the tuning,
// so that we don't block the main thread that starts Postgres.
@@ -680,10 +678,7 @@ impl ComputeNode {
// Spawn a background task to do the configuration,
// so that we don't block the main thread that starts Postgres.
let mut local_proxy = local_proxy.clone();
local_proxy.tls = tls_config.clone();
let local_proxy = local_proxy.clone();
let _handle = tokio::spawn(async move {
if let Err(err) = local_proxy::configure(&local_proxy) {
error!("error while configuring local_proxy: {err:?}");
@@ -784,8 +779,8 @@ impl ComputeNode {
// Spawn the extension stats background task
self.spawn_extension_stats_task();
if pspec.spec.autoprewarm {
self.prewarm_lfc(None);
if pspec.spec.prewarm_lfc_on_startup {
self.prewarm_lfc();
}
Ok(())
}
@@ -1210,15 +1205,13 @@ impl ComputeNode {
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.params.pgdata);
let tls_config = self.tls_config(&pspec.spec);
// Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?;
config::write_postgres_conf(
pgdata_path,
&pspec.spec,
self.params.internal_http_port,
tls_config,
&self.compute_ctl_config.tls,
)?;
// Syncing safekeepers is only safe with primary nodes: if a primary
@@ -1314,8 +1307,8 @@ impl ComputeNode {
}
/// Start and stop a postgres process to warm up the VM for startup.
pub fn prewarm_postgres_vm_memory(&self) -> Result<()> {
info!("prewarming VM memory");
pub fn prewarm_postgres(&self) -> Result<()> {
info!("prewarming");
// Create pgdata
let pgdata = &format!("{}.warmup", self.params.pgdata);
@@ -1357,7 +1350,7 @@ impl ComputeNode {
kill(pm_pid, Signal::SIGQUIT)?;
info!("sent SIGQUIT signal");
pg.wait()?;
info!("done prewarming vm memory");
info!("done prewarming");
// clean up
let _ok = fs::remove_dir_all(pgdata);
@@ -1543,22 +1536,14 @@ impl ComputeNode {
.clone(),
);
let mut tls_config = None::<TlsConfig>;
if spec.features.contains(&ComputeFeature::TlsExperimental) {
tls_config = self.compute_ctl_config.tls.clone();
}
let max_concurrent_connections = self.max_service_connections(compute_state, &spec);
// Merge-apply spec & changes to PostgreSQL state.
self.apply_spec_sql(spec.clone(), conf.clone(), max_concurrent_connections)?;
if let Some(local_proxy) = &spec.clone().local_proxy_config {
let mut local_proxy = local_proxy.clone();
local_proxy.tls = tls_config.clone();
info!("configuring local_proxy");
local_proxy::configure(&local_proxy).context("apply_config local_proxy")?;
local_proxy::configure(local_proxy).context("apply_config local_proxy")?;
}
// Run migrations separately to not hold up cold starts
@@ -1610,13 +1595,11 @@ impl ComputeNode {
pub fn reconfigure(&self) -> Result<()> {
let spec = self.state.lock().unwrap().pspec.clone().unwrap().spec;
let tls_config = self.tls_config(&spec);
if let Some(ref pgbouncer_settings) = spec.pgbouncer_settings {
info!("tuning pgbouncer");
let pgbouncer_settings = pgbouncer_settings.clone();
let tls_config = tls_config.clone();
let tls_config = self.compute_ctl_config.tls.clone();
// Spawn a background task to do the tuning,
// so that we don't block the main thread that starts Postgres.
@@ -1634,7 +1617,7 @@ impl ComputeNode {
// Spawn a background task to do the configuration,
// so that we don't block the main thread that starts Postgres.
let mut local_proxy = local_proxy.clone();
local_proxy.tls = tls_config.clone();
local_proxy.tls = self.compute_ctl_config.tls.clone();
tokio::spawn(async move {
if let Err(err) = local_proxy::configure(&local_proxy) {
error!("error while configuring local_proxy: {err:?}");
@@ -1652,7 +1635,7 @@ impl ComputeNode {
pgdata_path,
&spec,
self.params.internal_http_port,
tls_config,
&self.compute_ctl_config.tls,
)?;
if !spec.skip_pg_catalog_updates {
@@ -1772,14 +1755,6 @@ impl ComputeNode {
}
}
pub fn tls_config(&self, spec: &ComputeSpec) -> &Option<TlsConfig> {
if spec.features.contains(&ComputeFeature::TlsExperimental) {
&self.compute_ctl_config.tls
} else {
&None::<TlsConfig>
}
}
/// Update the `last_active` in the shared state, but ensure that it's a more recent one.
pub fn update_last_active(&self, last_active: Option<DateTime<Utc>>) {
let mut state = self.state.lock().unwrap();

View File

@@ -25,16 +25,11 @@ struct EndpointStoragePair {
}
const KEY: &str = "lfc_state";
impl EndpointStoragePair {
/// endpoint_id is set to None while prewarming from other endpoint, see replica promotion
/// If not None, takes precedence over pspec.spec.endpoint_id
fn from_spec_and_endpoint(
pspec: &crate::compute::ParsedSpec,
endpoint_id: Option<String>,
) -> Result<Self> {
let endpoint_id = endpoint_id.as_ref().or(pspec.spec.endpoint_id.as_ref());
let Some(ref endpoint_id) = endpoint_id else {
bail!("pspec.endpoint_id missing, other endpoint_id not provided")
impl TryFrom<&crate::compute::ParsedSpec> for EndpointStoragePair {
type Error = anyhow::Error;
fn try_from(pspec: &crate::compute::ParsedSpec) -> Result<Self, Self::Error> {
let Some(ref endpoint_id) = pspec.spec.endpoint_id else {
bail!("pspec.endpoint_id missing")
};
let Some(ref base_uri) = pspec.endpoint_storage_addr else {
bail!("pspec.endpoint_storage_addr missing")
@@ -89,7 +84,7 @@ impl ComputeNode {
}
/// Returns false if there is a prewarm request ongoing, true otherwise
pub fn prewarm_lfc(self: &Arc<Self>, from_endpoint: Option<String>) -> bool {
pub fn prewarm_lfc(self: &Arc<Self>) -> bool {
crate::metrics::LFC_PREWARM_REQUESTS.inc();
{
let state = &mut self.state.lock().unwrap().lfc_prewarm_state;
@@ -102,7 +97,7 @@ impl ComputeNode {
let cloned = self.clone();
spawn(async move {
let Err(err) = cloned.prewarm_impl(from_endpoint).await else {
let Err(err) = cloned.prewarm_impl().await else {
cloned.state.lock().unwrap().lfc_prewarm_state = LfcPrewarmState::Completed;
return;
};
@@ -114,14 +109,13 @@ impl ComputeNode {
true
}
/// from_endpoint: None for endpoint managed by this compute_ctl
fn endpoint_storage_pair(&self, from_endpoint: Option<String>) -> Result<EndpointStoragePair> {
fn endpoint_storage_pair(&self) -> Result<EndpointStoragePair> {
let state = self.state.lock().unwrap();
EndpointStoragePair::from_spec_and_endpoint(state.pspec.as_ref().unwrap(), from_endpoint)
state.pspec.as_ref().unwrap().try_into()
}
async fn prewarm_impl(&self, from_endpoint: Option<String>) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(from_endpoint)?;
async fn prewarm_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
info!(%url, "requesting LFC state from endpoint storage");
let request = Client::new().get(&url).bearer_auth(token);
@@ -179,7 +173,7 @@ impl ComputeNode {
}
async fn offload_lfc_impl(&self) -> Result<()> {
let EndpointStoragePair { url, token } = self.endpoint_storage_pair(None)?;
let EndpointStoragePair { url, token } = self.endpoint_storage_pair()?;
info!(%url, "requesting LFC state from postgres");
let mut compressed = Vec::new();

View File

@@ -166,7 +166,7 @@ pub async fn download_extension(
// TODO add retry logic
let download_buffer =
match download_extension_tar(remote_ext_base_url, &ext_path.to_string()).await {
match download_extension_tar(remote_ext_base_url.as_str(), &ext_path.to_string()).await {
Ok(buffer) => buffer,
Err(error_message) => {
return Err(anyhow::anyhow!(
@@ -271,14 +271,10 @@ pub fn create_control_files(remote_extensions: &RemoteExtSpec, pgbin: &str) {
}
// Do request to extension storage proxy, e.g.,
// curl http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local/latest/v15/extensions/anon.tar.zst
// curl http://pg-ext-s3-gateway/latest/v15/extensions/anon.tar.zst
// using HTTP GET and return the response body as bytes.
async fn download_extension_tar(remote_ext_base_url: &Url, ext_path: &str) -> Result<Bytes> {
let uri = remote_ext_base_url.join(ext_path).with_context(|| {
format!(
"failed to create the remote extension URI for {ext_path} using {remote_ext_base_url}"
)
})?;
async fn download_extension_tar(remote_ext_base_url: &str, ext_path: &str) -> Result<Bytes> {
let uri = format!("{}/{}", remote_ext_base_url, ext_path);
let filename = Path::new(ext_path)
.file_name()
.unwrap_or_else(|| std::ffi::OsStr::new("unknown"))
@@ -288,7 +284,7 @@ async fn download_extension_tar(remote_ext_base_url: &Url, ext_path: &str) -> Re
info!("Downloading extension file '{}' from uri {}", filename, uri);
match do_extension_server_request(uri).await {
match do_extension_server_request(&uri).await {
Ok(resp) => {
info!("Successfully downloaded remote extension data {}", ext_path);
REMOTE_EXT_REQUESTS_TOTAL
@@ -307,7 +303,7 @@ async fn download_extension_tar(remote_ext_base_url: &Url, ext_path: &str) -> Re
// Do a single remote extensions server request.
// Return result or (error message + stringified status code) in case of any failures.
async fn do_extension_server_request(uri: Url) -> Result<Bytes, (String, String)> {
async fn do_extension_server_request(uri: &str) -> Result<Bytes, (String, String)> {
let resp = reqwest::get(uri).await.map_err(|e| {
(
format!(

View File

@@ -48,9 +48,11 @@ impl JsonResponse {
/// Create an error response related to the compute being in an invalid state
pub(self) fn invalid_status(status: ComputeStatus) -> Response {
Self::error(
Self::create_response(
StatusCode::PRECONDITION_FAILED,
format!("invalid compute status: {status}"),
&GenericAPIError {
error: format!("invalid compute status: {status}"),
},
)
}
}

View File

@@ -22,7 +22,7 @@ pub(in crate::http) async fn configure(
State(compute): State<Arc<ComputeNode>>,
request: Json<ConfigurationRequest>,
) -> Response {
let pspec = match ParsedSpec::try_from(request.0.spec) {
let pspec = match ParsedSpec::try_from(request.spec.clone()) {
Ok(p) => p,
Err(e) => return JsonResponse::error(StatusCode::BAD_REQUEST, e),
};

View File

@@ -2,7 +2,6 @@ use crate::compute_prewarm::LfcPrewarmStateWithProgress;
use crate::http::JsonResponse;
use axum::response::{IntoResponse, Response};
use axum::{Json, http::StatusCode};
use axum_extra::extract::OptionalQuery;
use compute_api::responses::LfcOffloadState;
type Compute = axum::extract::State<std::sync::Arc<crate::compute::ComputeNode>>;
@@ -17,16 +16,8 @@ pub(in crate::http) async fn offload_state(compute: Compute) -> Json<LfcOffloadS
Json(compute.lfc_offload_state())
}
#[derive(serde::Deserialize)]
pub struct PrewarmQuery {
pub from_endpoint: String,
}
pub(in crate::http) async fn prewarm(
compute: Compute,
OptionalQuery(query): OptionalQuery<PrewarmQuery>,
) -> Response {
if compute.prewarm_lfc(query.map(|q| q.from_endpoint)) {
pub(in crate::http) async fn prewarm(compute: Compute) -> Response {
if compute.prewarm_lfc() {
StatusCode::ACCEPTED.into_response()
} else {
JsonResponse::error(

View File

@@ -13,12 +13,6 @@ use crate::metrics::{PG_CURR_DOWNTIME_MS, PG_TOTAL_DOWNTIME_MS};
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
/// Struct to store runtime state of the compute monitor thread.
/// In theory, this could be a part of `Compute`, but i)
/// this state is expected to be accessed only by single thread,
/// so we don't need to care about locking; ii) `Compute` is
/// already quite big. Thus, it seems to be a good idea to keep
/// all the activity/health monitoring parts here.
struct ComputeMonitor {
compute: Arc<ComputeNode>,
@@ -76,36 +70,12 @@ impl ComputeMonitor {
)
}
/// Check if compute is in some terminal or soon-to-be-terminal
/// state, then return `true`, signalling the caller that it
/// should exit gracefully. Otherwise, return `false`.
fn check_interrupts(&mut self) -> bool {
let compute_status = self.compute.get_status();
if matches!(
compute_status,
ComputeStatus::Terminated | ComputeStatus::TerminationPending | ComputeStatus::Failed
) {
info!(
"compute is in {} status, stopping compute monitor",
compute_status
);
return true;
}
false
}
/// Spin in a loop and figure out the last activity time in the Postgres.
/// Then update it in the shared state. This function currently never
/// errors out explicitly, but there is a graceful termination path.
/// Every time we receive an error trying to check Postgres, we use
/// [`ComputeMonitor::check_interrupts()`] because it could be that
/// compute is being terminated already, then we can exit gracefully
/// to not produce errors' noise in the log.
/// Then update it in the shared state. This function never errors out.
/// NB: the only expected panic is at `Mutex` unwrap(), all other errors
/// should be handled gracefully.
#[instrument(skip_all)]
pub fn run(&mut self) -> anyhow::Result<()> {
pub fn run(&mut self) {
// Suppose that `connstr` doesn't change
let connstr = self.compute.params.connstr.clone();
let conf = self
@@ -123,10 +93,6 @@ impl ComputeMonitor {
info!("starting compute monitor for {}", connstr);
loop {
if self.check_interrupts() {
break;
}
match &mut client {
Ok(cli) => {
if cli.is_closed() {
@@ -134,10 +100,6 @@ impl ComputeMonitor {
downtime_info = self.downtime_info(),
"connection to Postgres is closed, trying to reconnect"
);
if self.check_interrupts() {
break;
}
self.report_down();
// Connection is closed, reconnect and try again.
@@ -149,19 +111,15 @@ impl ComputeMonitor {
self.compute.update_last_active(self.last_active);
}
Err(e) => {
error!(
downtime_info = self.downtime_info(),
"could not check Postgres: {}", e
);
if self.check_interrupts() {
break;
}
// Although we have many places where we can return errors in `check()`,
// normally it shouldn't happen. I.e., we will likely return error if
// connection got broken, query timed out, Postgres returned invalid data, etc.
// In all such cases it's suspicious, so let's report this as downtime.
self.report_down();
error!(
downtime_info = self.downtime_info(),
"could not check Postgres: {}", e
);
// Reconnect to Postgres just in case. During tests, I noticed
// that queries in `check()` can fail with `connection closed`,
@@ -178,10 +136,6 @@ impl ComputeMonitor {
downtime_info = self.downtime_info(),
"could not connect to Postgres: {}, retrying", e
);
if self.check_interrupts() {
break;
}
self.report_down();
// Establish a new connection and try again.
@@ -193,9 +147,6 @@ impl ComputeMonitor {
self.last_checked = Utc::now();
thread::sleep(MONITOR_CHECK_INTERVAL);
}
// Graceful termination path
Ok(())
}
#[instrument(skip_all)]
@@ -478,10 +429,7 @@ pub fn launch_monitor(compute: &Arc<ComputeNode>) -> thread::JoinHandle<()> {
.spawn(move || {
let span = span!(Level::INFO, "compute_monitor");
let _enter = span.enter();
match monitor.run() {
Ok(_) => info!("compute monitor thread terminated gracefully"),
Err(err) => error!("compute monitor thread terminated abnormally {:?}", err),
}
monitor.run();
})
.expect("cannot launch compute monitor thread")
}

View File

@@ -30,7 +30,7 @@ mod pg_helpers_tests {
r#"fsync = off
wal_level = logical
hot_standby = on
autoprewarm = off
prewarm_lfc_on_startup = off
neon.safekeepers = '127.0.0.1:6502,127.0.0.1:6503,127.0.0.1:6501'
wal_log_hints = on
log_connections = on

View File

@@ -36,7 +36,6 @@ pageserver_api.workspace = true
pageserver_client.workspace = true
postgres_backend.workspace = true
safekeeper_api.workspace = true
safekeeper_client.workspace = true
postgres_connection.workspace = true
storage_broker.workspace = true
http-utils.workspace = true

View File

@@ -45,7 +45,7 @@ use pageserver_api::models::{
use pageserver_api::shard::{DEFAULT_STRIPE_SIZE, ShardCount, ShardStripeSize, TenantShardId};
use postgres_backend::AuthType;
use postgres_connection::parse_host_port;
use safekeeper_api::membership::{SafekeeperGeneration, SafekeeperId};
use safekeeper_api::membership::SafekeeperGeneration;
use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
@@ -1255,45 +1255,6 @@ async fn handle_timeline(cmd: &TimelineCmd, env: &mut local_env::LocalEnv) -> Re
pageserver
.timeline_import(tenant_id, timeline_id, base, pg_wal, args.pg_version)
.await?;
if env.storage_controller.timelines_onto_safekeepers {
println!("Creating timeline on safekeeper ...");
let timeline_info = pageserver
.timeline_info(
TenantShardId::unsharded(tenant_id),
timeline_id,
pageserver_client::mgmt_api::ForceAwaitLogicalSize::No,
)
.await?;
let default_sk = SafekeeperNode::from_env(env, env.safekeepers.first().unwrap());
let default_host = default_sk
.conf
.listen_addr
.clone()
.unwrap_or_else(|| "localhost".to_string());
let mconf = safekeeper_api::membership::Configuration {
generation: SafekeeperGeneration::new(1),
members: safekeeper_api::membership::MemberSet {
m: vec![SafekeeperId {
host: default_host,
id: default_sk.conf.id,
pg_port: default_sk.conf.pg_port,
}],
},
new_members: None,
};
let pg_version = args.pg_version * 10000;
let req = safekeeper_api::models::TimelineCreateRequest {
tenant_id,
timeline_id,
mconf,
pg_version,
system_id: None,
wal_seg_size: None,
start_lsn: timeline_info.last_record_lsn,
commit_lsn: None,
};
default_sk.create_timeline(&req).await?;
}
env.register_branch_mapping(branch_name.to_string(), tenant_id, timeline_id)?;
println!("Done");
}

View File

@@ -45,8 +45,6 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{Context, Result, anyhow, bail};
use base64::Engine;
use base64::prelude::BASE64_URL_SAFE_NO_PAD;
use compute_api::requests::{
COMPUTE_AUDIENCE, ComputeClaims, ComputeClaimsScope, ConfigurationRequest,
};
@@ -166,7 +164,7 @@ impl ComputeControlPlane {
public_key_use: Some(PublicKeyUse::Signature),
key_operations: Some(vec![KeyOperations::Verify]),
key_algorithm: Some(KeyAlgorithm::EdDSA),
key_id: Some(BASE64_URL_SAFE_NO_PAD.encode(key_hash)),
key_id: Some(base64::encode_config(key_hash, base64::URL_SAFE_NO_PAD)),
x509_url: None::<String>,
x509_chain: None::<Vec<String>>,
x509_sha1_fingerprint: None::<String>,
@@ -175,7 +173,7 @@ impl ComputeControlPlane {
algorithm: AlgorithmParameters::OctetKeyPair(OctetKeyPairParameters {
key_type: OctetKeyPairType::OctetKeyPair,
curve: EllipticCurve::Ed25519,
x: BASE64_URL_SAFE_NO_PAD.encode(public_key),
x: base64::encode_config(public_key, base64::URL_SAFE_NO_PAD),
}),
}],
})
@@ -749,7 +747,7 @@ impl Endpoint {
logs_export_host: None::<String>,
endpoint_storage_addr: Some(endpoint_storage_addr),
endpoint_storage_token: Some(endpoint_storage_token),
autoprewarm: false,
prewarm_lfc_on_startup: false,
};
// this strange code is needed to support respec() in tests

View File

@@ -513,6 +513,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'timeline_offloading' as bool")?,
wal_receiver_protocol_override: settings
.remove("wal_receiver_protocol_override")
.map(serde_json::from_str)
.transpose()
.context("parse `wal_receiver_protocol_override` from json")?,
rel_size_v2_enabled: settings
.remove("rel_size_v2_enabled")
.map(|x| x.parse::<bool>())
@@ -635,16 +640,4 @@ impl PageServerNode {
Ok(())
}
pub async fn timeline_info(
&self,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
force_await_logical_size: mgmt_api::ForceAwaitLogicalSize,
) -> anyhow::Result<TimelineInfo> {
let timeline_info = self
.http_client
.timeline_info(tenant_shard_id, timeline_id, force_await_logical_size)
.await?;
Ok(timeline_info)
}
}

View File

@@ -6,6 +6,7 @@
//! .neon/safekeepers/<safekeeper id>
//! ```
use std::error::Error as _;
use std::future::Future;
use std::io::Write;
use std::path::PathBuf;
use std::time::Duration;
@@ -13,9 +14,9 @@ use std::{io, result};
use anyhow::Context;
use camino::Utf8PathBuf;
use http_utils::error::HttpErrorBody;
use postgres_connection::PgConnectionConfig;
use safekeeper_api::models::TimelineCreateRequest;
use safekeeper_client::mgmt_api;
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use utils::auth::{Claims, Scope};
use utils::id::NodeId;
@@ -34,14 +35,25 @@ pub enum SafekeeperHttpError {
type Result<T> = result::Result<T, SafekeeperHttpError>;
fn err_from_client_err(err: mgmt_api::Error) -> SafekeeperHttpError {
use mgmt_api::Error::*;
match err {
ApiError(_, str) => SafekeeperHttpError::Response(str),
Cancelled => SafekeeperHttpError::Response("Cancelled".to_owned()),
ReceiveBody(err) => SafekeeperHttpError::Transport(err),
ReceiveErrorBody(err) => SafekeeperHttpError::Response(err),
Timeout(str) => SafekeeperHttpError::Response(format!("timeout: {str}")),
pub(crate) trait ResponseErrorMessageExt: Sized {
fn error_from_body(self) -> impl Future<Output = Result<Self>> + Send;
}
impl ResponseErrorMessageExt for reqwest::Response {
async fn error_from_body(self) -> Result<Self> {
let status = self.status();
if !(status.is_client_error() || status.is_server_error()) {
return Ok(self);
}
// reqwest does not export its error construction utility functions, so let's craft the message ourselves
let url = self.url().to_owned();
Err(SafekeeperHttpError::Response(
match self.json::<HttpErrorBody>().await {
Ok(err_body) => format!("Error: {}", err_body.msg),
Err(_) => format!("Http error ({}) at {}.", status.as_u16(), url),
},
))
}
}
@@ -58,8 +70,9 @@ pub struct SafekeeperNode {
pub pg_connection_config: PgConnectionConfig,
pub env: LocalEnv,
pub http_client: mgmt_api::Client,
pub http_client: reqwest::Client,
pub listen_addr: String,
pub http_base_url: String,
}
impl SafekeeperNode {
@@ -69,14 +82,13 @@ impl SafekeeperNode {
} else {
"127.0.0.1".to_string()
};
let jwt = None;
let http_base_url = format!("http://{}:{}", listen_addr, conf.http_port);
SafekeeperNode {
id: conf.id,
conf: conf.clone(),
pg_connection_config: Self::safekeeper_connection_config(&listen_addr, conf.pg_port),
env: env.clone(),
http_client: mgmt_api::Client::new(env.create_http_client(), http_base_url, jwt),
http_client: env.create_http_client(),
http_base_url: format!("http://{}:{}/v1", listen_addr, conf.http_port),
listen_addr,
}
}
@@ -266,19 +278,20 @@ impl SafekeeperNode {
)
}
pub async fn check_status(&self) -> Result<()> {
self.http_client
.status()
.await
.map_err(err_from_client_err)?;
Ok(())
fn http_request<U: IntoUrl>(&self, method: Method, url: U) -> reqwest::RequestBuilder {
// TODO: authentication
//if self.env.auth_type == AuthType::NeonJWT {
// builder = builder.bearer_auth(&self.env.safekeeper_auth_token)
//}
self.http_client.request(method, url)
}
pub async fn create_timeline(&self, req: &TimelineCreateRequest) -> Result<()> {
self.http_client
.create_timeline(req)
.await
.map_err(err_from_client_err)?;
pub async fn check_status(&self) -> Result<()> {
self.http_request(Method::GET, format!("{}/{}", self.http_base_url, "status"))
.send()
.await?
.error_from_body()
.await?;
Ok(())
}
}

View File

@@ -61,16 +61,10 @@ enum Command {
#[arg(long)]
scheduling: Option<NodeSchedulingPolicy>,
},
// Set a node status as deleted.
NodeDelete {
#[arg(long)]
node_id: NodeId,
},
/// Delete a tombstone of node from the storage controller.
NodeDeleteTombstone {
#[arg(long)]
node_id: NodeId,
},
/// Modify a tenant's policies in the storage controller
TenantPolicy {
#[arg(long)]
@@ -88,8 +82,6 @@ enum Command {
},
/// List nodes known to the storage controller
Nodes {},
/// List soft deleted nodes known to the storage controller
NodeTombstones {},
/// List tenants known to the storage controller
Tenants {
/// If this field is set, it will list the tenants on a specific node
@@ -908,39 +900,6 @@ async fn main() -> anyhow::Result<()> {
.dispatch::<(), ()>(Method::DELETE, format!("control/v1/node/{node_id}"), None)
.await?;
}
Command::NodeDeleteTombstone { node_id } => {
storcon_client
.dispatch::<(), ()>(
Method::DELETE,
format!("debug/v1/tombstone/{node_id}"),
None,
)
.await?;
}
Command::NodeTombstones {} => {
let mut resp = storcon_client
.dispatch::<(), Vec<NodeDescribeResponse>>(
Method::GET,
"debug/v1/tombstone".to_string(),
None,
)
.await?;
resp.sort_by(|a, b| a.listen_http_addr.cmp(&b.listen_http_addr));
let mut table = comfy_table::Table::new();
table.set_header(["Id", "Hostname", "AZ", "Scheduling", "Availability"]);
for node in resp {
table.add_row([
format!("{}", node.id),
node.listen_http_addr,
node.availability_zone_id,
format!("{:?}", node.scheduling),
format!("{:?}", node.availability),
]);
}
println!("{table}");
}
Command::TenantSetTimeBasedEviction {
tenant_id,
period,

View File

@@ -13,6 +13,6 @@ RUN echo 'Acquire::Retries "5";' > /etc/apt/apt.conf.d/80-retries && \
jq \
netcat-openbsd
#This is required for the pg_hintplan test
RUN mkdir -p /ext-src/pg_hint_plan-src /postgres/contrib/file_fdw /ext-src/postgis-src/ && chown postgres /ext-src/pg_hint_plan-src /postgres/contrib/file_fdw /ext-src/postgis-src
RUN mkdir -p /ext-src/pg_hint_plan-src /postgres/contrib/file_fdw && chown postgres /ext-src/pg_hint_plan-src /postgres/contrib/file_fdw
USER postgres

View File

@@ -1,18 +1,18 @@
#!/usr/bin/env bash
#!/bin/bash
set -eux
# Generate a random tenant or timeline ID
#
# Takes a variable name as argument. The result is stored in that variable.
generate_id() {
local -n resvar=${1}
printf -v resvar '%08x%08x%08x%08x' ${SRANDOM} ${SRANDOM} ${SRANDOM} ${SRANDOM}
local -n resvar=$1
printf -v resvar '%08x%08x%08x%08x' $SRANDOM $SRANDOM $SRANDOM $SRANDOM
}
PG_VERSION=${PG_VERSION:-14}
readonly CONFIG_FILE_ORG=/var/db/postgres/configs/config.json
readonly CONFIG_FILE=/tmp/config.json
CONFIG_FILE_ORG=/var/db/postgres/configs/config.json
CONFIG_FILE=/tmp/config.json
# Test that the first library path that the dynamic loader looks in is the path
# that we use for custom compiled software
@@ -20,17 +20,17 @@ first_path="$(ldconfig --verbose 2>/dev/null \
| grep --invert-match ^$'\t' \
| cut --delimiter=: --fields=1 \
| head --lines=1)"
test "${first_path}" = '/usr/local/lib'
test "$first_path" == '/usr/local/lib'
echo "Waiting pageserver become ready."
while ! nc -z pageserver 6400; do
sleep 1
sleep 1;
done
echo "Page server is ready."
cp "${CONFIG_FILE_ORG}" "${CONFIG_FILE}"
cp ${CONFIG_FILE_ORG} ${CONFIG_FILE}
if [[ -n "${TENANT_ID:-}" && -n "${TIMELINE_ID:-}" ]]; then
if [ -n "${TENANT_ID:-}" ] && [ -n "${TIMELINE_ID:-}" ]; then
tenant_id=${TENANT_ID}
timeline_id=${TIMELINE_ID}
else
@@ -41,7 +41,7 @@ else
"http://pageserver:9898/v1/tenant"
)
tenant_id=$(curl "${PARAMS[@]}" | jq -r .[0].id)
if [[ -z "${tenant_id}" || "${tenant_id}" = null ]]; then
if [ -z "${tenant_id}" ] || [ "${tenant_id}" = null ]; then
echo "Create a tenant"
generate_id tenant_id
PARAMS=(
@@ -51,7 +51,7 @@ else
"http://pageserver:9898/v1/tenant/${tenant_id}/location_config"
)
result=$(curl "${PARAMS[@]}")
printf '%s\n' "${result}" | jq .
echo $result | jq .
fi
echo "Check if a timeline present"
@@ -61,7 +61,7 @@ else
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline"
)
timeline_id=$(curl "${PARAMS[@]}" | jq -r .[0].timeline_id)
if [[ -z "${timeline_id}" || "${timeline_id}" = null ]]; then
if [ -z "${timeline_id}" ] || [ "${timeline_id}" = null ]; then
generate_id timeline_id
PARAMS=(
-sbf
@@ -71,7 +71,7 @@ else
"http://pageserver:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
printf '%s\n' "${result}" | jq .
echo $result | jq .
fi
fi
@@ -82,10 +82,10 @@ else
fi
echo "Adding pgx_ulid"
shared_libraries=$(jq -r '.spec.cluster.settings[] | select(.name=="shared_preload_libraries").value' ${CONFIG_FILE})
sed -i "s|${shared_libraries}|${shared_libraries},${ulid_extension}|" ${CONFIG_FILE}
sed -i "s/${shared_libraries}/${shared_libraries},${ulid_extension}/" ${CONFIG_FILE}
echo "Overwrite tenant id and timeline id in spec file"
sed -i "s|TENANT_ID|${tenant_id}|" ${CONFIG_FILE}
sed -i "s|TIMELINE_ID|${timeline_id}|" ${CONFIG_FILE}
sed -i "s/TENANT_ID/${tenant_id}/" ${CONFIG_FILE}
sed -i "s/TIMELINE_ID/${timeline_id}/" ${CONFIG_FILE}
cat ${CONFIG_FILE}
@@ -93,5 +93,5 @@ echo "Start compute node"
/usr/local/bin/compute_ctl --pgdata /var/db/postgres/compute \
-C "postgresql://cloud_admin@localhost:55433/postgres" \
-b /usr/local/bin/postgres \
--compute-id "compute-${RANDOM}" \
--config "${CONFIG_FILE}"
--compute-id "compute-$RANDOM" \
--config "$CONFIG_FILE"

View File

@@ -186,14 +186,13 @@ services:
neon-test-extensions:
profiles: ["test-extensions"]
image: ${REPOSITORY:-ghcr.io/neondatabase}/neon-test-extensions-v${PG_TEST_VERSION:-${PG_VERSION:-16}}:${TEST_EXTENSIONS_TAG:-${TAG:-latest}}
image: ${REPOSITORY:-ghcr.io/neondatabase}/neon-test-extensions-v${PG_TEST_VERSION:-16}:${TEST_EXTENSIONS_TAG:-${TAG:-latest}}
environment:
- PGUSER=${PGUSER:-cloud_admin}
- PGPASSWORD=${PGPASSWORD:-cloud_admin}
- PGPASSWORD=cloud_admin
entrypoint:
- "/bin/bash"
- "-c"
command:
- sleep 3600
- sleep 1800
depends_on:
- compute

View File

@@ -54,15 +54,6 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
# It cannot be moved to Dockerfile now because the database directory is created after the start of the container
echo Adding dummy config
docker compose exec compute touch /var/db/postgres/compute/compute_ctl_temp_override.conf
# Prepare for the PostGIS test
docker compose exec compute mkdir -p /tmp/pgis_reg/pgis_reg_tmp
TMPDIR=$(mktemp -d)
docker compose cp neon-test-extensions:/ext-src/postgis-src/raster/test "${TMPDIR}"
docker compose cp neon-test-extensions:/ext-src/postgis-src/regress/00-regress-install "${TMPDIR}"
docker compose exec compute mkdir -p /ext-src/postgis-src/raster /ext-src/postgis-src/regress /ext-src/postgis-src/regress/00-regress-install
docker compose cp "${TMPDIR}/test" compute:/ext-src/postgis-src/raster/test
docker compose cp "${TMPDIR}/00-regress-install" compute:/ext-src/postgis-src/regress
rm -rf "${TMPDIR}"
# The following block copies the files for the pg_hintplan test to the compute node for the extension test in an isolated docker-compose environment
TMPDIR=$(mktemp -d)
docker compose cp neon-test-extensions:/ext-src/pg_hint_plan-src/data "${TMPDIR}/data"
@@ -77,7 +68,7 @@ for pg_version in ${TEST_VERSION_ONLY-14 15 16 17}; do
docker compose exec -T neon-test-extensions bash -c "(cd /postgres && patch -p1)" <"../compute/patches/contrib_pg${pg_version}.patch"
# We are running tests now
rm -f testout.txt testout_contrib.txt
docker compose exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
docker compose exec -e USE_PGXS=1 -e SKIP=timescaledb-src,rdkit-src,postgis-src,pg_jsonschema-src,kq_imcx-src,wal2json_2_5-src,rag_jina_reranker_v1_tiny_en-src,rag_bge_small_en_v15-src \
neon-test-extensions /run-tests.sh /ext-src | tee testout.txt && EXT_SUCCESS=1 || EXT_SUCCESS=0
docker compose exec -e SKIP=start-scripts,postgres_fdw,ltree_plpython,jsonb_plpython,jsonb_plperl,hstore_plpython,hstore_plperl,dblink,bool_plperl \
neon-test-extensions /run-tests.sh /postgres/contrib | tee testout_contrib.txt && CONTRIB_SUCCESS=1 || CONTRIB_SUCCESS=0

View File

@@ -1,70 +0,0 @@
# PostGIS Testing in Neon
This directory contains configuration files and patches for running PostGIS tests in the Neon database environment.
## Overview
PostGIS is a spatial database extension for PostgreSQL that adds support for geographic objects. Testing PostGIS compatibility ensures that Neon's modifications to PostgreSQL don't break compatibility with this critical extension.
## PostGIS Versions
- PostgreSQL v17: PostGIS 3.5.0
- PostgreSQL v14/v15/v16: PostGIS 3.3.3
## Test Configuration
The test setup includes:
- `postgis-no-upgrade-test.patch`: Disables upgrade tests by removing the upgrade test section from regress/runtest.mk
- `postgis-regular-v16.patch`: Version-specific patch for PostgreSQL v16
- `postgis-regular-v17.patch`: Version-specific patch for PostgreSQL v17
- `regular-test.sh`: Script to run PostGIS tests as a regular user
- `neon-test.sh`: Script to handle version-specific test configurations
- `raster_outdb_template.sql`: Template for raster tests with explicit file paths
## Excluded Tests
**Important Note:** The test exclusions listed below are specifically for regular-user tests against staging instances. These exclusions are necessary because staging instances run with limited privileges and cannot perform operations requiring superuser access. Docker-compose based tests are not affected by these exclusions.
### Tests Requiring Superuser Permissions
These tests cannot be run as a regular user:
- `estimatedextent`
- `regress/core/legacy`
- `regress/core/typmod`
- `regress/loader/TestSkipANALYZE`
- `regress/loader/TestANALYZE`
### Tests Requiring Filesystem Access
These tests need direct filesystem access that is only possible for superusers:
- `loader/load_outdb`
### Tests with Flaky Results
These tests have assumptions that don't always hold true:
- `regress/core/computed_columns` - Assumes computed columns always outperform alternatives, which is not consistently true
### Tests Requiring Tunable Parameter Modifications
These tests attempt to modify the `postgis.gdal_enabled_drivers` parameter, which is only accessible to superusers:
- `raster/test/regress/rt_wkb`
- `raster/test/regress/rt_addband`
- `raster/test/regress/rt_setbandpath`
- `raster/test/regress/rt_fromgdalraster`
- `raster/test/regress/rt_asgdalraster`
- `raster/test/regress/rt_astiff`
- `raster/test/regress/rt_asjpeg`
- `raster/test/regress/rt_aspng`
- `raster/test/regress/permitted_gdal_drivers`
- Loader tests: `BasicOutDB`, `Tiled10x10`, `Tiled10x10Copy`, `Tiled8x8`, `TiledAuto`, `TiledAutoSkipNoData`, `TiledAutoCopyn`
### Topology Tests (v17 only)
- `populate_topology_layer`
- `renametopogeometrycolumn`
## Other Modifications
- Binary.sql tests are modified to use explicit file paths
- Server-side SQL COPY commands (which require superuser privileges) are converted to client-side `\copy` commands
- Upgrade tests are disabled

View File

@@ -1,6 +0,0 @@
#!/bin/sh
set -ex
cd "$(dirname "$0")"
patch -p1 <"postgis-common-${PG_VERSION}.patch"
trap 'echo Cleaning up; patch -R -p1 <postgis-common-${PG_VERSION}.patch' EXIT
make installcheck-base

View File

@@ -1,37 +0,0 @@
diff --git a/regress/core/tests.mk b/regress/core/tests.mk
index 3abd7bc..64a9254 100644
--- a/regress/core/tests.mk
+++ b/regress/core/tests.mk
@@ -144,11 +144,6 @@ TESTS_SLOW = \
$(top_srcdir)/regress/core/concave_hull_hard \
$(top_srcdir)/regress/core/knn_recheck
-ifeq ($(shell expr "$(POSTGIS_PGSQL_VERSION)" ">=" 120),1)
- TESTS += \
- $(top_srcdir)/regress/core/computed_columns
-endif
-
ifeq ($(shell expr "$(POSTGIS_GEOS_VERSION)" ">=" 30700),1)
# GEOS-3.7 adds:
# ST_FrechetDistance
diff --git a/regress/runtest.mk b/regress/runtest.mk
index c051f03..010e493 100644
--- a/regress/runtest.mk
+++ b/regress/runtest.mk
@@ -24,16 +24,6 @@ check-regress:
POSTGIS_TOP_BUILD_DIR=$(abs_top_builddir) $(PERL) $(top_srcdir)/regress/run_test.pl $(RUNTESTFLAGS) $(RUNTESTFLAGS_INTERNAL) $(TESTS)
- @if echo "$(RUNTESTFLAGS)" | grep -vq -- --upgrade; then \
- echo "Running upgrade test as RUNTESTFLAGS did not contain that"; \
- POSTGIS_TOP_BUILD_DIR=$(abs_top_builddir) $(PERL) $(top_srcdir)/regress/run_test.pl \
- --upgrade \
- $(RUNTESTFLAGS) \
- $(RUNTESTFLAGS_INTERNAL) \
- $(TESTS); \
- else \
- echo "Skipping upgrade test as RUNTESTFLAGS already requested upgrades"; \
- fi
check-long:
$(PERL) $(top_srcdir)/regress/run_test.pl $(RUNTESTFLAGS) $(TESTS) $(TESTS_SLOW)

View File

@@ -1,35 +0,0 @@
diff --git a/regress/core/tests.mk b/regress/core/tests.mk
index 9e05244..90987df 100644
--- a/regress/core/tests.mk
+++ b/regress/core/tests.mk
@@ -143,8 +143,7 @@ TESTS += \
$(top_srcdir)/regress/core/oriented_envelope \
$(top_srcdir)/regress/core/point_coordinates \
$(top_srcdir)/regress/core/out_geojson \
- $(top_srcdir)/regress/core/wrapx \
- $(top_srcdir)/regress/core/computed_columns
+ $(top_srcdir)/regress/core/wrapx
# Slow slow tests
TESTS_SLOW = \
diff --git a/regress/runtest.mk b/regress/runtest.mk
index 4b95b7e..449d5a2 100644
--- a/regress/runtest.mk
+++ b/regress/runtest.mk
@@ -24,16 +24,6 @@ check-regress:
@POSTGIS_TOP_BUILD_DIR=$(abs_top_builddir) $(PERL) $(top_srcdir)/regress/run_test.pl $(RUNTESTFLAGS) $(RUNTESTFLAGS_INTERNAL) $(TESTS)
- @if echo "$(RUNTESTFLAGS)" | grep -vq -- --upgrade; then \
- echo "Running upgrade test as RUNTESTFLAGS did not contain that"; \
- POSTGIS_TOP_BUILD_DIR=$(abs_top_builddir) $(PERL) $(top_srcdir)/regress/run_test.pl \
- --upgrade \
- $(RUNTESTFLAGS) \
- $(RUNTESTFLAGS_INTERNAL) \
- $(TESTS); \
- else \
- echo "Skipping upgrade test as RUNTESTFLAGS already requested upgrades"; \
- fi
check-long:
$(PERL) $(top_srcdir)/regress/run_test.pl $(RUNTESTFLAGS) $(TESTS) $(TESTS_SLOW)

View File

@@ -1,186 +0,0 @@
diff --git a/raster/test/regress/tests.mk b/raster/test/regress/tests.mk
index 00918e1..7e2b6cd 100644
--- a/raster/test/regress/tests.mk
+++ b/raster/test/regress/tests.mk
@@ -17,9 +17,7 @@ override RUNTESTFLAGS_INTERNAL := \
$(RUNTESTFLAGS_INTERNAL) \
--after-upgrade-script $(top_srcdir)/raster/test/regress/hooks/hook-after-upgrade-raster.sql
-RASTER_TEST_FIRST = \
- $(top_srcdir)/raster/test/regress/check_gdal \
- $(top_srcdir)/raster/test/regress/loader/load_outdb
+RASTER_TEST_FIRST =
RASTER_TEST_LAST = \
$(top_srcdir)/raster/test/regress/clean
@@ -33,9 +31,7 @@ RASTER_TEST_IO = \
RASTER_TEST_BASIC_FUNC = \
$(top_srcdir)/raster/test/regress/rt_bytea \
- $(top_srcdir)/raster/test/regress/rt_wkb \
$(top_srcdir)/raster/test/regress/box3d \
- $(top_srcdir)/raster/test/regress/rt_addband \
$(top_srcdir)/raster/test/regress/rt_band \
$(top_srcdir)/raster/test/regress/rt_tile
@@ -73,16 +69,10 @@ RASTER_TEST_BANDPROPS = \
$(top_srcdir)/raster/test/regress/rt_neighborhood \
$(top_srcdir)/raster/test/regress/rt_nearestvalue \
$(top_srcdir)/raster/test/regress/rt_pixelofvalue \
- $(top_srcdir)/raster/test/regress/rt_polygon \
- $(top_srcdir)/raster/test/regress/rt_setbandpath
+ $(top_srcdir)/raster/test/regress/rt_polygon
RASTER_TEST_UTILITY = \
$(top_srcdir)/raster/test/regress/rt_utility \
- $(top_srcdir)/raster/test/regress/rt_fromgdalraster \
- $(top_srcdir)/raster/test/regress/rt_asgdalraster \
- $(top_srcdir)/raster/test/regress/rt_astiff \
- $(top_srcdir)/raster/test/regress/rt_asjpeg \
- $(top_srcdir)/raster/test/regress/rt_aspng \
$(top_srcdir)/raster/test/regress/rt_reclass \
$(top_srcdir)/raster/test/regress/rt_gdalwarp \
$(top_srcdir)/raster/test/regress/rt_gdalcontour \
@@ -120,21 +110,13 @@ RASTER_TEST_SREL = \
RASTER_TEST_BUGS = \
$(top_srcdir)/raster/test/regress/bug_test_car5 \
- $(top_srcdir)/raster/test/regress/permitted_gdal_drivers \
$(top_srcdir)/raster/test/regress/tickets
RASTER_TEST_LOADER = \
$(top_srcdir)/raster/test/regress/loader/Basic \
$(top_srcdir)/raster/test/regress/loader/Projected \
$(top_srcdir)/raster/test/regress/loader/BasicCopy \
- $(top_srcdir)/raster/test/regress/loader/BasicFilename \
- $(top_srcdir)/raster/test/regress/loader/BasicOutDB \
- $(top_srcdir)/raster/test/regress/loader/Tiled10x10 \
- $(top_srcdir)/raster/test/regress/loader/Tiled10x10Copy \
- $(top_srcdir)/raster/test/regress/loader/Tiled8x8 \
- $(top_srcdir)/raster/test/regress/loader/TiledAuto \
- $(top_srcdir)/raster/test/regress/loader/TiledAutoSkipNoData \
- $(top_srcdir)/raster/test/regress/loader/TiledAutoCopyn
+ $(top_srcdir)/raster/test/regress/loader/BasicFilename
RASTER_TESTS := $(RASTER_TEST_FIRST) \
$(RASTER_TEST_METADATA) $(RASTER_TEST_IO) $(RASTER_TEST_BASIC_FUNC) \
diff --git a/regress/core/binary.sql b/regress/core/binary.sql
index 7a36b65..ad78fc7 100644
--- a/regress/core/binary.sql
+++ b/regress/core/binary.sql
@@ -1,4 +1,5 @@
SET client_min_messages TO warning;
+
CREATE SCHEMA tm;
CREATE TABLE tm.geoms (id serial, g geometry);
@@ -31,24 +32,39 @@ SELECT st_force4d(g) FROM tm.geoms WHERE id < 15 ORDER BY id;
INSERT INTO tm.geoms(g)
SELECT st_setsrid(g,4326) FROM tm.geoms ORDER BY id;
-COPY tm.geoms TO :tmpfile WITH BINARY;
+-- define temp file path
+\set tmpfile '/tmp/postgis_binary_test.dat'
+
+-- export
+\set command '\\copy tm.geoms TO ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+-- import
CREATE TABLE tm.geoms_in AS SELECT * FROM tm.geoms LIMIT 0;
-COPY tm.geoms_in FROM :tmpfile WITH BINARY;
-SELECT 'geometry', count(*) FROM tm.geoms_in i, tm.geoms o WHERE i.id = o.id
- AND ST_OrderingEquals(i.g, o.g);
+\set command '\\copy tm.geoms_in FROM ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+SELECT 'geometry', count(*) FROM tm.geoms_in i, tm.geoms o
+WHERE i.id = o.id AND ST_OrderingEquals(i.g, o.g);
CREATE TABLE tm.geogs AS SELECT id,g::geography FROM tm.geoms
WHERE geometrytype(g) NOT LIKE '%CURVE%'
AND geometrytype(g) NOT LIKE '%CIRCULAR%'
AND geometrytype(g) NOT LIKE '%SURFACE%'
AND geometrytype(g) NOT LIKE 'TRIANGLE%'
- AND geometrytype(g) NOT LIKE 'TIN%'
-;
+ AND geometrytype(g) NOT LIKE 'TIN%';
-COPY tm.geogs TO :tmpfile WITH BINARY;
+-- export
+\set command '\\copy tm.geogs TO ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+-- import
CREATE TABLE tm.geogs_in AS SELECT * FROM tm.geogs LIMIT 0;
-COPY tm.geogs_in FROM :tmpfile WITH BINARY;
-SELECT 'geometry', count(*) FROM tm.geogs_in i, tm.geogs o WHERE i.id = o.id
- AND ST_OrderingEquals(i.g::geometry, o.g::geometry);
+\set command '\\copy tm.geogs_in FROM ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+SELECT 'geometry', count(*) FROM tm.geogs_in i, tm.geogs o
+WHERE i.id = o.id AND ST_OrderingEquals(i.g::geometry, o.g::geometry);
DROP SCHEMA tm CASCADE;
+
diff --git a/regress/core/tests.mk b/regress/core/tests.mk
index 64a9254..94903c3 100644
--- a/regress/core/tests.mk
+++ b/regress/core/tests.mk
@@ -23,7 +23,6 @@ current_dir := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
RUNTESTFLAGS_INTERNAL += \
--before-upgrade-script $(top_srcdir)/regress/hooks/hook-before-upgrade.sql \
--after-upgrade-script $(top_srcdir)/regress/hooks/hook-after-upgrade.sql \
- --after-create-script $(top_srcdir)/regress/hooks/hook-after-create.sql \
--before-uninstall-script $(top_srcdir)/regress/hooks/hook-before-uninstall.sql
TESTS += \
@@ -40,7 +39,6 @@ TESTS += \
$(top_srcdir)/regress/core/dumppoints \
$(top_srcdir)/regress/core/dumpsegments \
$(top_srcdir)/regress/core/empty \
- $(top_srcdir)/regress/core/estimatedextent \
$(top_srcdir)/regress/core/forcecurve \
$(top_srcdir)/regress/core/flatgeobuf \
$(top_srcdir)/regress/core/geography \
@@ -55,7 +53,6 @@ TESTS += \
$(top_srcdir)/regress/core/out_marc21 \
$(top_srcdir)/regress/core/in_encodedpolyline \
$(top_srcdir)/regress/core/iscollection \
- $(top_srcdir)/regress/core/legacy \
$(top_srcdir)/regress/core/letters \
$(top_srcdir)/regress/core/long_xact \
$(top_srcdir)/regress/core/lwgeom_regress \
@@ -112,7 +109,6 @@ TESTS += \
$(top_srcdir)/regress/core/temporal_knn \
$(top_srcdir)/regress/core/tickets \
$(top_srcdir)/regress/core/twkb \
- $(top_srcdir)/regress/core/typmod \
$(top_srcdir)/regress/core/wkb \
$(top_srcdir)/regress/core/wkt \
$(top_srcdir)/regress/core/wmsservers \
diff --git a/regress/loader/tests.mk b/regress/loader/tests.mk
index 1fc77ac..c3cb9de 100644
--- a/regress/loader/tests.mk
+++ b/regress/loader/tests.mk
@@ -38,7 +38,5 @@ TESTS += \
$(top_srcdir)/regress/loader/Latin1 \
$(top_srcdir)/regress/loader/Latin1-implicit \
$(top_srcdir)/regress/loader/mfile \
- $(top_srcdir)/regress/loader/TestSkipANALYZE \
- $(top_srcdir)/regress/loader/TestANALYZE \
$(top_srcdir)/regress/loader/CharNoWidth
diff --git a/regress/run_test.pl b/regress/run_test.pl
index 0ec5b2d..1c331f4 100755
--- a/regress/run_test.pl
+++ b/regress/run_test.pl
@@ -147,7 +147,6 @@ $ENV{"LANG"} = "C";
# Add locale info to the psql options
# Add pg12 precision suppression
my $PGOPTIONS = $ENV{"PGOPTIONS"};
-$PGOPTIONS .= " -c lc_messages=C";
$PGOPTIONS .= " -c client_min_messages=NOTICE";
$PGOPTIONS .= " -c extra_float_digits=0";
$ENV{"PGOPTIONS"} = $PGOPTIONS;

View File

@@ -1,208 +0,0 @@
diff --git a/raster/test/regress/tests.mk b/raster/test/regress/tests.mk
index 00918e1..7e2b6cd 100644
--- a/raster/test/regress/tests.mk
+++ b/raster/test/regress/tests.mk
@@ -17,9 +17,7 @@ override RUNTESTFLAGS_INTERNAL := \
$(RUNTESTFLAGS_INTERNAL) \
--after-upgrade-script $(top_srcdir)/raster/test/regress/hooks/hook-after-upgrade-raster.sql
-RASTER_TEST_FIRST = \
- $(top_srcdir)/raster/test/regress/check_gdal \
- $(top_srcdir)/raster/test/regress/loader/load_outdb
+RASTER_TEST_FIRST =
RASTER_TEST_LAST = \
$(top_srcdir)/raster/test/regress/clean
@@ -33,9 +31,7 @@ RASTER_TEST_IO = \
RASTER_TEST_BASIC_FUNC = \
$(top_srcdir)/raster/test/regress/rt_bytea \
- $(top_srcdir)/raster/test/regress/rt_wkb \
$(top_srcdir)/raster/test/regress/box3d \
- $(top_srcdir)/raster/test/regress/rt_addband \
$(top_srcdir)/raster/test/regress/rt_band \
$(top_srcdir)/raster/test/regress/rt_tile
@@ -73,16 +69,10 @@ RASTER_TEST_BANDPROPS = \
$(top_srcdir)/raster/test/regress/rt_neighborhood \
$(top_srcdir)/raster/test/regress/rt_nearestvalue \
$(top_srcdir)/raster/test/regress/rt_pixelofvalue \
- $(top_srcdir)/raster/test/regress/rt_polygon \
- $(top_srcdir)/raster/test/regress/rt_setbandpath
+ $(top_srcdir)/raster/test/regress/rt_polygon
RASTER_TEST_UTILITY = \
$(top_srcdir)/raster/test/regress/rt_utility \
- $(top_srcdir)/raster/test/regress/rt_fromgdalraster \
- $(top_srcdir)/raster/test/regress/rt_asgdalraster \
- $(top_srcdir)/raster/test/regress/rt_astiff \
- $(top_srcdir)/raster/test/regress/rt_asjpeg \
- $(top_srcdir)/raster/test/regress/rt_aspng \
$(top_srcdir)/raster/test/regress/rt_reclass \
$(top_srcdir)/raster/test/regress/rt_gdalwarp \
$(top_srcdir)/raster/test/regress/rt_gdalcontour \
@@ -120,21 +110,13 @@ RASTER_TEST_SREL = \
RASTER_TEST_BUGS = \
$(top_srcdir)/raster/test/regress/bug_test_car5 \
- $(top_srcdir)/raster/test/regress/permitted_gdal_drivers \
$(top_srcdir)/raster/test/regress/tickets
RASTER_TEST_LOADER = \
$(top_srcdir)/raster/test/regress/loader/Basic \
$(top_srcdir)/raster/test/regress/loader/Projected \
$(top_srcdir)/raster/test/regress/loader/BasicCopy \
- $(top_srcdir)/raster/test/regress/loader/BasicFilename \
- $(top_srcdir)/raster/test/regress/loader/BasicOutDB \
- $(top_srcdir)/raster/test/regress/loader/Tiled10x10 \
- $(top_srcdir)/raster/test/regress/loader/Tiled10x10Copy \
- $(top_srcdir)/raster/test/regress/loader/Tiled8x8 \
- $(top_srcdir)/raster/test/regress/loader/TiledAuto \
- $(top_srcdir)/raster/test/regress/loader/TiledAutoSkipNoData \
- $(top_srcdir)/raster/test/regress/loader/TiledAutoCopyn
+ $(top_srcdir)/raster/test/regress/loader/BasicFilename
RASTER_TESTS := $(RASTER_TEST_FIRST) \
$(RASTER_TEST_METADATA) $(RASTER_TEST_IO) $(RASTER_TEST_BASIC_FUNC) \
diff --git a/regress/core/binary.sql b/regress/core/binary.sql
index 7a36b65..ad78fc7 100644
--- a/regress/core/binary.sql
+++ b/regress/core/binary.sql
@@ -1,4 +1,5 @@
SET client_min_messages TO warning;
+
CREATE SCHEMA tm;
CREATE TABLE tm.geoms (id serial, g geometry);
@@ -31,24 +32,39 @@ SELECT st_force4d(g) FROM tm.geoms WHERE id < 15 ORDER BY id;
INSERT INTO tm.geoms(g)
SELECT st_setsrid(g,4326) FROM tm.geoms ORDER BY id;
-COPY tm.geoms TO :tmpfile WITH BINARY;
+-- define temp file path
+\set tmpfile '/tmp/postgis_binary_test.dat'
+
+-- export
+\set command '\\copy tm.geoms TO ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+-- import
CREATE TABLE tm.geoms_in AS SELECT * FROM tm.geoms LIMIT 0;
-COPY tm.geoms_in FROM :tmpfile WITH BINARY;
-SELECT 'geometry', count(*) FROM tm.geoms_in i, tm.geoms o WHERE i.id = o.id
- AND ST_OrderingEquals(i.g, o.g);
+\set command '\\copy tm.geoms_in FROM ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+SELECT 'geometry', count(*) FROM tm.geoms_in i, tm.geoms o
+WHERE i.id = o.id AND ST_OrderingEquals(i.g, o.g);
CREATE TABLE tm.geogs AS SELECT id,g::geography FROM tm.geoms
WHERE geometrytype(g) NOT LIKE '%CURVE%'
AND geometrytype(g) NOT LIKE '%CIRCULAR%'
AND geometrytype(g) NOT LIKE '%SURFACE%'
AND geometrytype(g) NOT LIKE 'TRIANGLE%'
- AND geometrytype(g) NOT LIKE 'TIN%'
-;
+ AND geometrytype(g) NOT LIKE 'TIN%';
-COPY tm.geogs TO :tmpfile WITH BINARY;
+-- export
+\set command '\\copy tm.geogs TO ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+-- import
CREATE TABLE tm.geogs_in AS SELECT * FROM tm.geogs LIMIT 0;
-COPY tm.geogs_in FROM :tmpfile WITH BINARY;
-SELECT 'geometry', count(*) FROM tm.geogs_in i, tm.geogs o WHERE i.id = o.id
- AND ST_OrderingEquals(i.g::geometry, o.g::geometry);
+\set command '\\copy tm.geogs_in FROM ':tmpfile' WITH (FORMAT BINARY)'
+:command
+
+SELECT 'geometry', count(*) FROM tm.geogs_in i, tm.geogs o
+WHERE i.id = o.id AND ST_OrderingEquals(i.g::geometry, o.g::geometry);
DROP SCHEMA tm CASCADE;
+
diff --git a/regress/core/tests.mk b/regress/core/tests.mk
index 90987df..74fe3f1 100644
--- a/regress/core/tests.mk
+++ b/regress/core/tests.mk
@@ -16,14 +16,13 @@ POSTGIS_PGSQL_VERSION=170
POSTGIS_GEOS_VERSION=31101
HAVE_JSON=yes
HAVE_SPGIST=yes
-INTERRUPTTESTS=yes
+INTERRUPTTESTS=no
current_dir := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
RUNTESTFLAGS_INTERNAL += \
--before-upgrade-script $(top_srcdir)/regress/hooks/hook-before-upgrade.sql \
--after-upgrade-script $(top_srcdir)/regress/hooks/hook-after-upgrade.sql \
- --after-create-script $(top_srcdir)/regress/hooks/hook-after-create.sql \
--before-uninstall-script $(top_srcdir)/regress/hooks/hook-before-uninstall.sql
TESTS += \
@@ -40,7 +39,6 @@ TESTS += \
$(top_srcdir)/regress/core/dumppoints \
$(top_srcdir)/regress/core/dumpsegments \
$(top_srcdir)/regress/core/empty \
- $(top_srcdir)/regress/core/estimatedextent \
$(top_srcdir)/regress/core/forcecurve \
$(top_srcdir)/regress/core/flatgeobuf \
$(top_srcdir)/regress/core/frechet \
@@ -60,7 +58,6 @@ TESTS += \
$(top_srcdir)/regress/core/out_marc21 \
$(top_srcdir)/regress/core/in_encodedpolyline \
$(top_srcdir)/regress/core/iscollection \
- $(top_srcdir)/regress/core/legacy \
$(top_srcdir)/regress/core/letters \
$(top_srcdir)/regress/core/lwgeom_regress \
$(top_srcdir)/regress/core/measures \
@@ -119,7 +116,6 @@ TESTS += \
$(top_srcdir)/regress/core/temporal_knn \
$(top_srcdir)/regress/core/tickets \
$(top_srcdir)/regress/core/twkb \
- $(top_srcdir)/regress/core/typmod \
$(top_srcdir)/regress/core/wkb \
$(top_srcdir)/regress/core/wkt \
$(top_srcdir)/regress/core/wmsservers \
diff --git a/regress/loader/tests.mk b/regress/loader/tests.mk
index ac4f8ad..4bad4fc 100644
--- a/regress/loader/tests.mk
+++ b/regress/loader/tests.mk
@@ -38,7 +38,5 @@ TESTS += \
$(top_srcdir)/regress/loader/Latin1 \
$(top_srcdir)/regress/loader/Latin1-implicit \
$(top_srcdir)/regress/loader/mfile \
- $(top_srcdir)/regress/loader/TestSkipANALYZE \
- $(top_srcdir)/regress/loader/TestANALYZE \
$(top_srcdir)/regress/loader/CharNoWidth \
diff --git a/regress/run_test.pl b/regress/run_test.pl
index cac4b2e..4c7c82b 100755
--- a/regress/run_test.pl
+++ b/regress/run_test.pl
@@ -238,7 +238,6 @@ $ENV{"LANG"} = "C";
# Add locale info to the psql options
# Add pg12 precision suppression
my $PGOPTIONS = $ENV{"PGOPTIONS"};
-$PGOPTIONS .= " -c lc_messages=C";
$PGOPTIONS .= " -c client_min_messages=NOTICE";
$PGOPTIONS .= " -c extra_float_digits=0";
$ENV{"PGOPTIONS"} = $PGOPTIONS;
diff --git a/topology/test/tests.mk b/topology/test/tests.mk
index cbe2633..2c7c18f 100644
--- a/topology/test/tests.mk
+++ b/topology/test/tests.mk
@@ -46,9 +46,7 @@ TESTS += \
$(top_srcdir)/topology/test/regress/legacy_query.sql \
$(top_srcdir)/topology/test/regress/legacy_validate.sql \
$(top_srcdir)/topology/test/regress/polygonize.sql \
- $(top_srcdir)/topology/test/regress/populate_topology_layer.sql \
$(top_srcdir)/topology/test/regress/removeunusedprimitives.sql \
- $(top_srcdir)/topology/test/regress/renametopogeometrycolumn.sql \
$(top_srcdir)/topology/test/regress/renametopology.sql \
$(top_srcdir)/topology/test/regress/share_sequences.sql \
$(top_srcdir)/topology/test/regress/sqlmm.sql \

File diff suppressed because one or more lines are too long

View File

@@ -1,17 +0,0 @@
#!/bin/bash
set -ex
cd "$(dirname "${0}")"
dropdb --if-exist contrib_regression
createdb contrib_regression
psql -d contrib_regression -c "ALTER DATABASE contrib_regression SET TimeZone='UTC'" \
-c "ALTER DATABASE contrib_regression SET DateStyle='ISO, MDY'" \
-c "CREATE EXTENSION postgis SCHEMA public" \
-c "CREATE EXTENSION postgis_topology" \
-c "CREATE EXTENSION postgis_tiger_geocoder CASCADE" \
-c "CREATE EXTENSION postgis_raster SCHEMA public" \
-c "CREATE EXTENSION postgis_sfcgal SCHEMA public"
patch -p1 <"postgis-common-${PG_VERSION}.patch"
patch -p1 <"postgis-regular-${PG_VERSION}.patch"
psql -d contrib_regression -f raster_outdb_template.sql
trap 'patch -R -p1 <postgis-regular-${PG_VERSION}.patch && patch -R -p1 <"postgis-common-${PG_VERSION}.patch"' EXIT
POSTGIS_REGRESS_DB=contrib_regression RUNTESTFLAGS=--nocreate make installcheck-base

View File

@@ -63,9 +63,5 @@ done
for d in ${FAILED}; do
cat "$(find $d -name regression.diffs)"
done
for postgis_diff in /tmp/pgis_reg/*_diff; do
echo "${postgis_diff}:"
cat "${postgis_diff}"
done
echo "${FAILED}"
exit 1

View File

@@ -8,7 +8,6 @@ anyhow.workspace = true
axum-extra.workspace = true
axum.workspace = true
camino.workspace = true
clap.workspace = true
futures.workspace = true
jsonwebtoken.workspace = true
prometheus.workspace = true

View File

@@ -4,8 +4,6 @@
//! for large computes.
mod app;
use anyhow::Context;
use clap::Parser;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tracing::info;
use utils::logging;
@@ -14,26 +12,9 @@ const fn max_upload_file_limit() -> usize {
100 * 1024 * 1024
}
const fn listen() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 51243)
}
#[derive(Parser)]
struct Args {
#[arg(exclusive = true)]
config_file: Option<String>,
#[arg(long, default_value = "false", requires = "config")]
/// to allow testing k8s helm chart where we don't have s3 credentials
no_s3_check_on_startup: bool,
#[arg(long, value_name = "FILE")]
/// inline config mode for k8s helm chart
config: Option<String>,
}
#[derive(serde::Deserialize)]
#[serde(tag = "type")]
struct Config {
#[serde(default = "listen")]
listen: std::net::SocketAddr,
pemfile: camino::Utf8PathBuf,
#[serde(flatten)]
@@ -50,18 +31,13 @@ async fn main() -> anyhow::Result<()> {
logging::Output::Stdout,
)?;
let args = Args::parse();
let config: Config = if let Some(config_path) = args.config_file {
info!("Reading config from {config_path}");
let config = std::fs::read_to_string(config_path)?;
serde_json::from_str(&config).context("parsing config")?
} else if let Some(config) = args.config {
info!("Reading inline config");
serde_json::from_str(&config).context("parsing config")?
} else {
anyhow::bail!("Supply either config file path or --config=inline-config");
};
let config: String = std::env::args().skip(1).take(1).collect();
if config.is_empty() {
anyhow::bail!("Usage: endpoint_storage config.json")
}
info!("Reading config from {config}");
let config = std::fs::read_to_string(config.clone())?;
let config: Config = serde_json::from_str(&config).context("parsing config")?;
info!("Reading pemfile from {}", config.pemfile.clone());
let pemfile = std::fs::read(config.pemfile.clone())?;
info!("Loading public key from {}", config.pemfile.clone());
@@ -72,9 +48,7 @@ async fn main() -> anyhow::Result<()> {
let storage = remote_storage::GenericRemoteStorage::from_config(&config.storage_config).await?;
let cancel = tokio_util::sync::CancellationToken::new();
if !args.no_s3_check_on_startup {
app::check_storage_permissions(&storage, cancel.clone()).await?;
}
app::check_storage_permissions(&storage, cancel.clone()).await?;
let proxy = std::sync::Arc::new(endpoint_storage::Storage {
auth,

View File

@@ -178,9 +178,9 @@ pub struct ComputeSpec {
/// JWT for authorizing requests to endpoint storage service
pub endpoint_storage_token: Option<String>,
/// Download LFC state from endpoint_storage and pass it to Postgres on startup
/// If true, download LFC state from endpoint_storage and pass it to Postgres on startup
#[serde(default)]
pub autoprewarm: bool,
pub prewarm_lfc_on_startup: bool,
}
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
@@ -192,9 +192,6 @@ pub enum ComputeFeature {
/// track short-lived connections as user activity.
ActivityMonitorExperimental,
/// Enable TLS functionality.
TlsExperimental,
/// This is a special feature flag that is used to represent unknown feature flags.
/// Basically all unknown to enum flags are represented as this one. See unit test
/// `parse_unknown_features()` for more details.
@@ -253,44 +250,34 @@ impl RemoteExtSpec {
}
match self.extension_data.get(real_ext_name) {
Some(_ext_data) => Ok((
real_ext_name.to_string(),
Self::build_remote_path(build_tag, pg_major_version, real_ext_name)?,
)),
Some(_ext_data) => {
// We have decided to use the Go naming convention due to Kubernetes.
let arch = match std::env::consts::ARCH {
"x86_64" => "amd64",
"aarch64" => "arm64",
arch => arch,
};
// Construct the path to the extension archive
// BUILD_TAG/PG_MAJOR_VERSION/extensions/EXTENSION_NAME.tar.zst
//
// Keep it in sync with path generation in
// https://github.com/neondatabase/build-custom-extensions/tree/main
let archive_path_str = format!(
"{build_tag}/{arch}/{pg_major_version}/extensions/{real_ext_name}.tar.zst"
);
Ok((
real_ext_name.to_string(),
RemotePath::from_string(&archive_path_str)?,
))
}
None => Err(anyhow::anyhow!(
"real_ext_name {} is not found",
real_ext_name
)),
}
}
/// Get the architecture-specific portion of the remote extension path. We
/// use the Go naming convention due to Kubernetes.
fn get_arch() -> &'static str {
match std::env::consts::ARCH {
"x86_64" => "amd64",
"aarch64" => "arm64",
arch => arch,
}
}
/// Build a [`RemotePath`] for an extension.
fn build_remote_path(
build_tag: &str,
pg_major_version: &str,
ext_name: &str,
) -> anyhow::Result<RemotePath> {
let arch = Self::get_arch();
// Construct the path to the extension archive
// BUILD_TAG/PG_MAJOR_VERSION/extensions/EXTENSION_NAME.tar.zst
//
// Keep it in sync with path generation in
// https://github.com/neondatabase/build-custom-extensions/tree/main
RemotePath::from_string(&format!(
"{build_tag}/{arch}/{pg_major_version}/extensions/{ext_name}.tar.zst"
))
}
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Deserialize, Serialize)]
@@ -531,37 +518,6 @@ mod tests {
.expect("Library should be found");
}
#[test]
fn remote_extension_path() {
let rspec: RemoteExtSpec = serde_json::from_value(serde_json::json!({
"public_extensions": ["ext"],
"custom_extensions": [],
"library_index": {
"extlib": "ext",
},
"extension_data": {
"ext": {
"control_data": {
"ext.control": ""
},
"archive_path": ""
}
},
}))
.unwrap();
let (_ext_name, ext_path) = rspec
.get_ext("ext", false, "latest", "v17")
.expect("Extension should be found");
// Starting with a forward slash would have consequences for the
// Url::join() that occurs when downloading a remote extension.
assert!(!ext_path.to_string().starts_with("/"));
assert_eq!(
ext_path,
RemoteExtSpec::build_remote_path("latest", "v17", "ext").unwrap()
);
}
#[test]
fn parse_spec_file() {
let file = File::open("tests/cluster_spec.json").unwrap();

View File

@@ -85,7 +85,7 @@
"vartype": "bool"
},
{
"name": "autoprewarm",
"name": "prewarm_lfc_on_startup",
"value": "off",
"vartype": "bool"
},

View File

@@ -20,6 +20,7 @@ use postgres_backend::AuthType;
use remote_storage::RemoteStorageConfig;
use serde_with::serde_as;
use utils::logging::LogFormat;
use utils::postgres_client::PostgresClientProtocol;
use crate::models::{ImageCompressionAlgorithm, LsnLease};
@@ -188,6 +189,7 @@ pub struct ConfigToml {
pub virtual_file_io_mode: Option<crate::models::virtual_file::IoMode>,
#[serde(skip_serializing_if = "Option::is_none")]
pub no_sync: Option<bool>,
pub wal_receiver_protocol: PostgresClientProtocol,
pub page_service_pipelining: PageServicePipeliningConfig,
pub get_vectored_concurrent_io: GetVectoredConcurrentIo,
pub enable_read_path_debugging: Option<bool>,
@@ -328,8 +330,6 @@ pub struct TimelineImportConfig {
pub import_job_concurrency: NonZeroUsize,
pub import_job_soft_size_limit: NonZeroUsize,
pub import_job_checkpoint_threshold: NonZeroUsize,
/// Max size of the remote storage partial read done by any job
pub import_job_max_byte_range_size: NonZeroUsize,
}
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -525,6 +525,8 @@ pub struct TenantConfigToml {
/// (either this flag or the pageserver-global one need to be set)
pub timeline_offloading: bool,
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
/// Enable rel_size_v2 for this tenant. Once enabled, the tenant will persist this information into
/// `index_part.json`, and it cannot be reversed.
pub rel_size_v2_enabled: bool,
@@ -605,6 +607,9 @@ pub mod defaults {
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
pub const DEFAULT_WAL_RECEIVER_PROTOCOL: utils::postgres_client::PostgresClientProtocol =
utils::postgres_client::PostgresClientProtocol::Vanilla;
pub const DEFAULT_SSL_KEY_FILE: &str = "server.key";
pub const DEFAULT_SSL_CERT_FILE: &str = "server.crt";
}
@@ -703,6 +708,7 @@ impl Default for ConfigToml {
virtual_file_io_mode: None,
tenant_config: TenantConfigToml::default(),
no_sync: None,
wal_receiver_protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
page_service_pipelining: PageServicePipeliningConfig::Pipelined(
PageServicePipeliningConfigPipelined {
max_batch_size: NonZeroUsize::new(32).unwrap(),
@@ -726,7 +732,6 @@ impl Default for ConfigToml {
import_job_concurrency: NonZeroUsize::new(32).unwrap(),
import_job_soft_size_limit: NonZeroUsize::new(256 * 1024 * 1024).unwrap(),
import_job_checkpoint_threshold: NonZeroUsize::new(32).unwrap(),
import_job_max_byte_range_size: NonZeroUsize::new(4 * 1024 * 1024).unwrap(),
},
basebackup_cache_config: None,
posthog_config: None,
@@ -847,6 +852,7 @@ impl Default for TenantConfigToml {
lsn_lease_length: LsnLease::DEFAULT_LENGTH,
lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS,
timeline_offloading: true,
wal_receiver_protocol_override: None,
rel_size_v2_enabled: false,
gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED,
gc_compaction_verification: DEFAULT_GC_COMPACTION_VERIFICATION,

View File

@@ -344,35 +344,6 @@ impl Default for ShardSchedulingPolicy {
}
}
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
pub enum NodeLifecycle {
Active,
Deleted,
}
impl FromStr for NodeLifecycle {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"active" => Ok(Self::Active),
"deleted" => Ok(Self::Deleted),
_ => Err(anyhow::anyhow!("Unknown node lifecycle '{s}'")),
}
}
}
impl From<NodeLifecycle> for String {
fn from(value: NodeLifecycle) -> String {
use NodeLifecycle::*;
match value {
Active => "active",
Deleted => "deleted",
}
.to_string()
}
}
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
pub enum NodeSchedulingPolicy {
Active,

View File

@@ -20,6 +20,7 @@ use serde_with::serde_as;
pub use utilization::PageserverUtilization;
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::postgres_client::PostgresClientProtocol;
use utils::{completion, serde_system_time};
use crate::config::Ratio;
@@ -621,6 +622,8 @@ pub struct TenantConfigPatch {
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub timeline_offloading: FieldPatch<bool>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub wal_receiver_protocol_override: FieldPatch<PostgresClientProtocol>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub rel_size_v2_enabled: FieldPatch<bool>,
#[serde(skip_serializing_if = "FieldPatch::is_noop")]
pub gc_compaction_enabled: FieldPatch<bool>,
@@ -745,6 +748,9 @@ pub struct TenantConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub timeline_offloading: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub wal_receiver_protocol_override: Option<PostgresClientProtocol>,
#[serde(skip_serializing_if = "Option::is_none")]
pub rel_size_v2_enabled: Option<bool>,
@@ -806,6 +812,7 @@ impl TenantConfig {
mut lsn_lease_length,
mut lsn_lease_length_for_ts,
mut timeline_offloading,
mut wal_receiver_protocol_override,
mut rel_size_v2_enabled,
mut gc_compaction_enabled,
mut gc_compaction_verification,
@@ -898,6 +905,9 @@ impl TenantConfig {
.map(|v| humantime::parse_duration(&v))?
.apply(&mut lsn_lease_length_for_ts);
patch.timeline_offloading.apply(&mut timeline_offloading);
patch
.wal_receiver_protocol_override
.apply(&mut wal_receiver_protocol_override);
patch.rel_size_v2_enabled.apply(&mut rel_size_v2_enabled);
patch
.gc_compaction_enabled
@@ -950,6 +960,7 @@ impl TenantConfig {
lsn_lease_length,
lsn_lease_length_for_ts,
timeline_offloading,
wal_receiver_protocol_override,
rel_size_v2_enabled,
gc_compaction_enabled,
gc_compaction_verification,
@@ -1047,6 +1058,9 @@ impl TenantConfig {
timeline_offloading: self
.timeline_offloading
.unwrap_or(global_conf.timeline_offloading),
wal_receiver_protocol_override: self
.wal_receiver_protocol_override
.or(global_conf.wal_receiver_protocol_override),
rel_size_v2_enabled: self
.rel_size_v2_enabled
.unwrap_or(global_conf.rel_size_v2_enabled),

View File

@@ -9,7 +9,7 @@ use utils::id::{NodeId, TimelineId};
use crate::controller_api::NodeRegisterRequest;
use crate::models::{LocationConfigMode, ShardImportStatus};
use crate::shard::{ShardStripeSize, TenantShardId};
use crate::shard::TenantShardId;
/// Upcall message sent by the pageserver to the configured `control_plane_api` on
/// startup.
@@ -36,10 +36,6 @@ pub struct ReAttachResponseTenant {
/// Default value only for backward compat: this field should be set
#[serde(default = "default_mode")]
pub mode: LocationConfigMode,
// Default value only for backward compat: this field should be set
#[serde(default = "ShardStripeSize::default")]
pub stripe_size: ShardStripeSize,
}
#[derive(Serialize, Deserialize)]
pub struct ReAttachResponse {

View File

@@ -6,7 +6,7 @@ use arc_swap::ArcSwap;
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, info_span};
use crate::{CaptureEvent, FeatureStore, PostHogClient, PostHogClientConfig};
use crate::{FeatureStore, PostHogClient, PostHogClientConfig};
/// A background loop that fetches feature flags from PostHog and updates the feature store.
pub struct FeatureResolverBackgroundLoop {
@@ -24,16 +24,9 @@ impl FeatureResolverBackgroundLoop {
}
}
pub fn spawn(
self: Arc<Self>,
handle: &tokio::runtime::Handle,
refresh_period: Duration,
fake_tenants: Vec<CaptureEvent>,
) {
pub fn spawn(self: Arc<Self>, handle: &tokio::runtime::Handle, refresh_period: Duration) {
let this = self.clone();
let cancel = self.cancel.clone();
// Main loop of updating the feature flags.
handle.spawn(
async move {
tracing::info!("Starting PostHog feature resolver");
@@ -55,37 +48,14 @@ impl FeatureResolverBackgroundLoop {
continue;
}
};
let project_id = this.posthog_client.config.project_id.parse::<u64>().ok();
match FeatureStore::new_with_flags(resp.flags, project_id) {
Ok(feature_store) => {
this.feature_store.store(Arc::new(feature_store));
tracing::info!("Feature flag updated");
}
Err(e) => {
tracing::warn!("Cannot process feature flag spec: {}", e);
}
}
let feature_store = FeatureStore::new_with_flags(resp.flags);
this.feature_store.store(Arc::new(feature_store));
tracing::info!("Feature flag updated");
}
tracing::info!("PostHog feature resolver stopped");
}
.instrument(info_span!("posthog_feature_resolver")),
);
// Report fake tenants to PostHog so that we have the combination of all the properties in the UI.
// Do one report per pageserver restart.
let this = self.clone();
handle.spawn(
async move {
tracing::info!("Starting PostHog feature reporter");
for tenant in &fake_tenants {
tracing::info!("Reporting fake tenant: {:?}", tenant);
}
if let Err(e) = this.posthog_client.capture_event_batch(&fake_tenants).await {
tracing::warn!("Cannot report fake tenants: {}", e);
}
}
.instrument(info_span!("posthog_feature_reporter")),
);
}
pub fn feature_store(&self) -> Arc<FeatureStore> {

View File

@@ -22,16 +22,6 @@ pub enum PostHogEvaluationError {
Internal(String),
}
impl PostHogEvaluationError {
pub fn as_variant_str(&self) -> &'static str {
match self {
PostHogEvaluationError::NotAvailable(_) => "not_available",
PostHogEvaluationError::NoConditionGroupMatched => "no_condition_group_matched",
PostHogEvaluationError::Internal(_) => "internal",
}
}
}
#[derive(Deserialize)]
pub struct LocalEvaluationResponse {
pub flags: Vec<LocalEvaluationFlag>,
@@ -39,9 +29,6 @@ pub struct LocalEvaluationResponse {
#[derive(Deserialize)]
pub struct LocalEvaluationFlag {
#[allow(dead_code)]
id: u64,
team_id: u64,
key: String,
filters: LocalEvaluationFlagFilters,
active: bool,
@@ -67,7 +54,7 @@ pub struct LocalEvaluationFlagFilterProperty {
operator: String,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
#[derive(Debug, Serialize, Deserialize)]
#[serde(untagged)]
pub enum PostHogFlagFilterPropertyValue {
String(String),
@@ -110,32 +97,17 @@ impl FeatureStore {
}
}
pub fn new_with_flags(
flags: Vec<LocalEvaluationFlag>,
project_id: Option<u64>,
) -> Result<Self, &'static str> {
pub fn new_with_flags(flags: Vec<LocalEvaluationFlag>) -> Self {
let mut store = Self::new();
store.set_flags(flags, project_id)?;
Ok(store)
store.set_flags(flags);
store
}
pub fn set_flags(
&mut self,
flags: Vec<LocalEvaluationFlag>,
project_id: Option<u64>,
) -> Result<(), &'static str> {
pub fn set_flags(&mut self, flags: Vec<LocalEvaluationFlag>) {
self.flags.clear();
for flag in flags {
if let Some(project_id) = project_id {
if flag.team_id != project_id {
return Err(
"Retrieved a spec with different project id, wrong config? Discarding the feature flags.",
);
}
}
self.flags.insert(flag.key.clone(), flag);
}
Ok(())
}
/// Generate a consistent hash for a user ID (e.g., tenant ID).
@@ -525,13 +497,6 @@ pub struct PostHogClient {
client: reqwest::Client,
}
#[derive(Serialize, Debug)]
pub struct CaptureEvent {
pub event: String,
pub distinct_id: String,
pub properties: serde_json::Value,
}
impl PostHogClient {
pub fn new(config: PostHogClientConfig) -> Self {
let client = reqwest::Client::new();
@@ -552,13 +517,6 @@ impl PostHogClient {
})
}
/// Check if the server API key is a feature flag secure API key. This key can only be
/// used to fetch the feature flag specs and can only be used on a undocumented API
/// endpoint.
fn is_feature_flag_secure_api_key(&self) -> bool {
self.config.server_api_key.starts_with("phs_")
}
/// Fetch the feature flag specs from the server.
///
/// This is unfortunately an undocumented API at:
@@ -572,22 +530,10 @@ impl PostHogClient {
) -> anyhow::Result<LocalEvaluationResponse> {
// BASE_URL/api/projects/:project_id/feature_flags/local_evaluation
// with bearer token of self.server_api_key
// OR
// BASE_URL/api/feature_flag/local_evaluation/
// with bearer token of feature flag specific self.server_api_key
let url = if self.is_feature_flag_secure_api_key() {
// The new feature local evaluation secure API token
format!(
"{}/api/feature_flag/local_evaluation",
self.config.private_api_url
)
} else {
// The old personal API token
format!(
"{}/api/projects/{}/feature_flags/local_evaluation",
self.config.private_api_url, self.config.project_id
)
};
let url = format!(
"{}/api/projects/{}/feature_flags/local_evaluation",
self.config.private_api_url, self.config.project_id
);
let response = self
.client
.get(url)
@@ -614,12 +560,12 @@ impl PostHogClient {
&self,
event: &str,
distinct_id: &str,
properties: &serde_json::Value,
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> anyhow::Result<()> {
// PUBLIC_URL/capture/
// with bearer token of self.client_api_key
let url = format!("{}/capture/", self.config.public_api_url);
let response = self
.client
self.client
.post(url)
.body(serde_json::to_string(&json!({
"api_key": self.config.client_api_key,
@@ -629,39 +575,6 @@ impl PostHogClient {
}))?)
.send()
.await?;
let status = response.status();
let body = response.text().await?;
if !status.is_success() {
return Err(anyhow::anyhow!(
"Failed to capture events: {}, {}",
status,
body
));
}
Ok(())
}
pub async fn capture_event_batch(&self, events: &[CaptureEvent]) -> anyhow::Result<()> {
// PUBLIC_URL/batch/
let url = format!("{}/batch/", self.config.public_api_url);
let response = self
.client
.post(url)
.body(serde_json::to_string(&json!({
"api_key": self.config.client_api_key,
"batch": events,
}))?)
.send()
.await?;
let status = response.status();
let body = response.text().await?;
if !status.is_success() {
return Err(anyhow::anyhow!(
"Failed to capture events: {}, {}",
status,
body
));
}
Ok(())
}
}
@@ -840,7 +753,7 @@ mod tests {
fn evaluate_multivariate() {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags, None).unwrap();
store.set_flags(response.flags);
// This lacks the required properties and cannot be evaluated.
let variant =
@@ -910,7 +823,7 @@ mod tests {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags, None).unwrap();
store.set_flags(response.flags);
// This lacks the required properties and cannot be evaluated.
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &HashMap::new());
@@ -966,7 +879,7 @@ mod tests {
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags, None).unwrap();
store.set_flags(response.flags);
// This lacks the required properties and cannot be evaluated.
let variant =

View File

@@ -5,7 +5,7 @@ edition = "2024"
license = "MIT/Apache-2.0"
[dependencies]
base64.workspace = true
base64 = "0.20"
byteorder.workspace = true
bytes.workspace = true
fallible-iterator.workspace = true

View File

@@ -3,8 +3,6 @@
use std::fmt::Write;
use std::{io, iter, mem, str};
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use hmac::{Hmac, Mac};
use rand::{self, Rng};
use sha2::digest::FixedOutput;
@@ -228,7 +226,7 @@ impl ScramSha256 {
let (client_key, server_key) = match password {
Credentials::Password(password) => {
let salt = match BASE64_STANDARD.decode(parsed.salt) {
let salt = match base64::decode(parsed.salt) {
Ok(salt) => salt,
Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
};
@@ -257,7 +255,7 @@ impl ScramSha256 {
let mut cbind_input = vec![];
cbind_input.extend(channel_binding.gs2_header().as_bytes());
cbind_input.extend(channel_binding.cbind_data());
let cbind_input = BASE64_STANDARD.encode(&cbind_input);
let cbind_input = base64::encode(&cbind_input);
self.message.clear();
write!(&mut self.message, "c={},r={}", cbind_input, parsed.nonce).unwrap();
@@ -274,12 +272,7 @@ impl ScramSha256 {
*proof ^= signature;
}
write!(
&mut self.message,
",p={}",
BASE64_STANDARD.encode(client_proof)
)
.unwrap();
write!(&mut self.message, ",p={}", base64::encode(client_proof)).unwrap();
self.state = State::Finish {
server_key,
@@ -313,7 +306,7 @@ impl ScramSha256 {
ServerFinalMessage::Verifier(verifier) => verifier,
};
let verifier = match BASE64_STANDARD.decode(verifier) {
let verifier = match base64::decode(verifier) {
Ok(verifier) => verifier,
Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidInput, e)),
};

View File

@@ -6,8 +6,6 @@
//! side. This is good because it ensures the cleartext password won't
//! end up in logs pg_stat displays, etc.
use base64::Engine as _;
use base64::prelude::BASE64_STANDARD;
use hmac::{Hmac, Mac};
use rand::RngCore;
use sha2::digest::FixedOutput;
@@ -85,8 +83,8 @@ pub(crate) async fn scram_sha_256_salt(
format!(
"SCRAM-SHA-256${}:{}${}:{}",
SCRAM_DEFAULT_ITERATIONS,
BASE64_STANDARD.encode(salt),
BASE64_STANDARD.encode(stored_key),
BASE64_STANDARD.encode(server_key)
base64::encode(salt),
base64::encode(stored_key),
base64::encode(server_key)
)
}

View File

@@ -10,7 +10,7 @@ use crate::{Error, cancel_query_raw, connect_socket};
pub(crate) async fn cancel_query<T>(
config: Option<SocketConfig>,
ssl_mode: SslMode,
tls: T,
mut tls: T,
process_id: i32,
secret_key: i32,
) -> Result<(), Error>

View File

@@ -17,6 +17,7 @@ use crate::{Client, Connection, Error};
/// TLS configuration.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[non_exhaustive]
pub enum SslMode {
/// Do not use TLS.
Disable,
@@ -230,7 +231,7 @@ impl Config {
/// Requires the `runtime` Cargo feature (enabled by default).
pub async fn connect<T>(
&self,
tls: &T,
tls: T,
) -> Result<(Client, Connection<TcpStream, T::Stream>), Error>
where
T: MakeTlsConnect<TcpStream>,

View File

@@ -13,7 +13,7 @@ use crate::tls::{MakeTlsConnect, TlsConnect};
use crate::{Client, Config, Connection, Error, RawConnection};
pub async fn connect<T>(
tls: &T,
mut tls: T,
config: &Config,
) -> Result<(Client, Connection<TcpStream, T::Stream>), Error>
where

View File

@@ -47,7 +47,7 @@ pub trait MakeTlsConnect<S> {
/// Creates a new `TlsConnect`or.
///
/// The domain name is provided for certificate verification and SNI.
fn make_tls_connect(&self, domain: &str) -> Result<Self::TlsConnect, Self::Error>;
fn make_tls_connect(&mut self, domain: &str) -> Result<Self::TlsConnect, Self::Error>;
}
/// An asynchronous function wrapping a stream in a TLS session.
@@ -85,7 +85,7 @@ impl<S> MakeTlsConnect<S> for NoTls {
type TlsConnect = NoTls;
type Error = NoTlsError;
fn make_tls_connect(&self, _: &str) -> Result<NoTls, NoTlsError> {
fn make_tls_connect(&mut self, _: &str) -> Result<NoTls, NoTlsError> {
Ok(NoTls)
}
}

View File

@@ -10,7 +10,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::{env, io};
use anyhow::{Context, Result, anyhow};
use anyhow::{Context, Result};
use azure_core::request_options::{IfMatchCondition, MaxResults, Metadata, Range};
use azure_core::{Continuable, HttpClient, RetryOptions, TransportOptions};
use azure_storage::StorageCredentials;
@@ -37,7 +37,6 @@ use crate::metrics::{AttemptOutcome, RequestKind, start_measuring_requests};
use crate::{
ConcurrencyLimiter, Download, DownloadError, DownloadKind, DownloadOpts, Listing, ListingMode,
ListingObject, RemotePath, RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel,
Version, VersionKind,
};
pub struct AzureBlobStorage {
@@ -406,39 +405,6 @@ impl AzureBlobStorage {
pub fn container_name(&self) -> &str {
&self.container_name
}
async fn list_versions_with_permit(
&self,
_permit: &tokio::sync::SemaphorePermit<'_>,
prefix: Option<&RemotePath>,
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> Result<crate::VersionListing, DownloadError> {
let customize_builder = |mut builder: ListBlobsBuilder| {
builder = builder.include_versions(true);
// We do not return this info back to `VersionListing` yet.
builder = builder.include_deleted(true);
builder
};
let kind = RequestKind::ListVersions;
let mut stream = std::pin::pin!(self.list_streaming_for_fn(
prefix,
mode,
max_keys,
cancel,
kind,
customize_builder
));
let mut combined: crate::VersionListing =
stream.next().await.expect("At least one item required")?;
while let Some(list) = stream.next().await {
let list = list?;
combined.versions.extend(list.versions.into_iter());
}
Ok(combined)
}
}
trait ListingCollector {
@@ -522,10 +488,27 @@ impl RemoteStorage for AzureBlobStorage {
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> std::result::Result<crate::VersionListing, DownloadError> {
let customize_builder = |mut builder: ListBlobsBuilder| {
builder = builder.include_versions(true);
builder
};
let kind = RequestKind::ListVersions;
let permit = self.permit(kind, cancel).await?;
self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel)
.await
let mut stream = std::pin::pin!(self.list_streaming_for_fn(
prefix,
mode,
max_keys,
cancel,
kind,
customize_builder
));
let mut combined: crate::VersionListing =
stream.next().await.expect("At least one item required")?;
while let Some(list) = stream.next().await {
let list = list?;
combined.versions.extend(list.versions.into_iter());
}
Ok(combined)
}
async fn head_object(
@@ -820,159 +803,14 @@ impl RemoteStorage for AzureBlobStorage {
async fn time_travel_recover(
&self,
prefix: Option<&RemotePath>,
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
_complexity_limit: Option<NonZeroU32>,
_prefix: Option<&RemotePath>,
_timestamp: SystemTime,
_done_if_after: SystemTime,
_cancel: &CancellationToken,
) -> Result<(), TimeTravelError> {
let msg = "PLEASE NOTE: Azure Blob storage time-travel recovery may not work as expected "
.to_string()
+ "for some specific files. If a file gets deleted but then overwritten and we want to recover "
+ "to the time during the file was not present, this functionality will recover the file. Only "
+ "use the functionality for services that can tolerate this. For example, recovering a state of the "
+ "pageserver tenants.";
tracing::error!("{}", msg);
let kind = RequestKind::TimeTravel;
let permit = self.permit(kind, cancel).await?;
let mode = ListingMode::NoDelimiter;
let version_listing = self
.list_versions_with_permit(&permit, prefix, mode, None, cancel)
.await
.map_err(|err| match err {
DownloadError::Other(e) => TimeTravelError::Other(e),
DownloadError::Cancelled => TimeTravelError::Cancelled,
other => TimeTravelError::Other(other.into()),
})?;
let versions_and_deletes = version_listing.versions;
tracing::info!(
"Built list for time travel with {} versions and deletions",
versions_and_deletes.len()
);
// Work on the list of references instead of the objects directly,
// otherwise we get lifetime errors in the sort_by_key call below.
let mut versions_and_deletes = versions_and_deletes.iter().collect::<Vec<_>>();
versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified));
let mut vds_for_key = HashMap::<_, Vec<_>>::new();
for vd in &versions_and_deletes {
let Version { key, .. } = &vd;
let version_id = vd.version_id().map(|v| v.0.as_str());
if version_id == Some("null") {
return Err(TimeTravelError::Other(anyhow!(
"Received ListVersions response for key={key} with version_id='null', \
indicating either disabled versioning, or legacy objects with null version id values"
)));
}
tracing::trace!("Parsing version key={key} kind={:?}", vd.kind);
vds_for_key.entry(key).or_default().push(vd);
}
let warn_threshold = 3;
let max_retries = 10;
let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled);
for (key, versions) in vds_for_key {
let last_vd = versions.last().unwrap();
let key = self.relative_path_to_name(key);
if last_vd.last_modified > done_if_after {
tracing::debug!("Key {key} has version later than done_if_after, skipping");
continue;
}
// the version we want to restore to.
let version_to_restore_to =
match versions.binary_search_by_key(&timestamp, |tpl| tpl.last_modified) {
Ok(v) => v,
Err(e) => e,
};
if version_to_restore_to == versions.len() {
tracing::debug!("Key {key} has no changes since timestamp, skipping");
continue;
}
let mut do_delete = false;
if version_to_restore_to == 0 {
// All versions more recent, so the key didn't exist at the specified time point.
tracing::debug!(
"All {} versions more recent for {key}, deleting",
versions.len()
);
do_delete = true;
} else {
match &versions[version_to_restore_to - 1] {
Version {
kind: VersionKind::Version(version_id),
..
} => {
let source_url = format!(
"{}/{}?versionid={}",
self.client
.url()
.map_err(|e| TimeTravelError::Other(anyhow!("{e}")))?,
key,
version_id.0
);
tracing::debug!(
"Promoting old version {} for {key} at {}...",
version_id.0,
source_url
);
backoff::retry(
|| async {
let blob_client = self.client.blob_client(key.clone());
let op = blob_client.copy(Url::from_str(&source_url).unwrap());
tokio::select! {
res = op => res.map_err(|e| TimeTravelError::Other(e.into())),
_ = cancel.cancelled() => Err(TimeTravelError::Cancelled),
}
},
is_permanent,
warn_threshold,
max_retries,
"copying object version for time_travel_recover",
cancel,
)
.await
.ok_or_else(|| TimeTravelError::Cancelled)
.and_then(|x| x)?;
tracing::info!(?version_id, %key, "Copied old version in Azure blob storage");
}
Version {
kind: VersionKind::DeletionMarker,
..
} => {
do_delete = true;
}
}
};
if do_delete {
if matches!(last_vd.kind, VersionKind::DeletionMarker) {
// Key has since been deleted (but there was some history), no need to do anything
tracing::debug!("Key {key} already deleted, skipping.");
} else {
tracing::debug!("Deleting {key}...");
self.delete(&RemotePath::from_string(&key).unwrap(), cancel)
.await
.map_err(|e| {
// delete_oid0 will use TimeoutOrCancel
if TimeoutOrCancel::caused_by_cancel(&e) {
TimeTravelError::Cancelled
} else {
TimeTravelError::Other(e)
}
})?;
}
}
}
Ok(())
// TODO use Azure point in time recovery feature for this
// https://learn.microsoft.com/en-us/azure/storage/blobs/point-in-time-restore-overview
Err(TimeTravelError::Unimplemented)
}
}

View File

@@ -440,7 +440,6 @@ pub trait RemoteStorage: Send + Sync + 'static {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError>;
}
@@ -652,23 +651,22 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
match self {
Self::LocalFs(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
Self::AwsS3(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
Self::AzureBlob(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
Self::Unreliable(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
s.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
}

View File

@@ -610,7 +610,6 @@ impl RemoteStorage for LocalFs {
_timestamp: SystemTime,
_done_if_after: SystemTime,
_cancel: &CancellationToken,
_complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
Err(TimeTravelError::Unimplemented)
}

View File

@@ -981,16 +981,22 @@ impl RemoteStorage for S3Bucket {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
let kind = RequestKind::TimeTravel;
let permit = self.permit(kind, cancel).await?;
tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}");
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
let mode = ListingMode::NoDelimiter;
let version_listing = self
.list_versions_with_permit(&permit, prefix, mode, complexity_limit, cancel)
.list_versions_with_permit(&permit, prefix, mode, COMPLEXITY_LIMIT, cancel)
.await
.map_err(|err| match err {
DownloadError::Other(e) => TimeTravelError::Other(e),
@@ -1016,7 +1022,6 @@ impl RemoteStorage for S3Bucket {
let Version { key, .. } = &vd;
let version_id = vd.version_id().map(|v| v.0.as_str());
if version_id == Some("null") {
// TODO: check the behavior of using the SDK on a non-versioned container
return Err(TimeTravelError::Other(anyhow!(
"Received ListVersions response for key={key} with version_id='null', \
indicating either disabled versioning, or legacy objects with null version id values"

View File

@@ -240,12 +240,11 @@ impl RemoteStorage for UnreliableWrapper {
timestamp: SystemTime,
done_if_after: SystemTime,
cancel: &CancellationToken,
complexity_limit: Option<NonZeroU32>,
) -> Result<(), TimeTravelError> {
self.attempt(RemoteOp::TimeTravelRecover(prefix.map(|p| p.to_owned())))
.map_err(TimeTravelError::Other)?;
self.inner
.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.time_travel_recover(prefix, timestamp, done_if_after, cancel)
.await
}
}

View File

@@ -157,7 +157,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// No changes after recovery to t2 (no-op)
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t2, t_final, &cancel, None)
.time_travel_recover(None, t2, t_final, &cancel)
.await?;
let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t2: {t2_files_recovered:?}");
@@ -173,7 +173,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// after recovery to t1: path1 is back, path2 has the old content
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t1, t_final, &cancel, None)
.time_travel_recover(None, t1, t_final, &cancel)
.await?;
let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t1: {t1_files_recovered:?}");
@@ -189,7 +189,7 @@ async fn s3_time_travel_recovery_works(ctx: &mut MaybeEnabledStorage) -> anyhow:
// after recovery to t0: everything is gone except for path1
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t0, t_final, &cancel, None)
.time_travel_recover(None, t0, t_final, &cancel)
.await?;
let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t0: {t0_files_recovered:?}");

View File

@@ -13,7 +13,7 @@ use utils::pageserver_feedback::PageserverFeedback;
use crate::membership::Configuration;
use crate::{ServerInfo, Term};
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize)]
pub struct SafekeeperStatus {
pub id: NodeId,
}

View File

@@ -439,7 +439,6 @@ pub fn empty_shmem() -> crate::bindings::WalproposerShmemState {
currentClusterSize: crate::bindings::pg_atomic_uint64 { value: 0 },
shard_ps_feedback: [empty_feedback; 128],
num_shards: 0,
replica_promote: false,
min_ps_feedback: empty_feedback,
}
}

View File

@@ -176,11 +176,9 @@ async fn main() -> anyhow::Result<()> {
let config = RemoteStorageConfig::from_toml_str(&cmd.config_toml_str)?;
let storage = remote_storage::GenericRemoteStorage::from_config(&config).await;
let cancel = CancellationToken::new();
// Complexity limit: as we are running this command locally, we should have a lot of memory available, and we do not
// need to limit the number of versions we are going to delete.
storage
.unwrap()
.time_travel_recover(Some(&prefix), timestamp, done_if_after, &cancel, None)
.time_travel_recover(Some(&prefix), timestamp, done_if_after, &cancel)
.await?;
}
Commands::Key(dkc) => dkc.execute(),

View File

@@ -9,7 +9,6 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
async-trait.workspace = true
bytes.workspace = true
camino.workspace = true
clap.workspace = true
futures.workspace = true

View File

@@ -1,4 +1,4 @@
use std::collections::{HashMap, HashSet, VecDeque};
use std::collections::{HashSet, VecDeque};
use std::future::Future;
use std::num::NonZeroUsize;
use std::pin::Pin;
@@ -8,12 +8,12 @@ use std::time::{Duration, Instant};
use anyhow::Context;
use async_trait::async_trait;
use bytes::Bytes;
use camino::Utf8PathBuf;
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
use pageserver_api::reltag::RelTag;
use pageserver_api::models::{
PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamRequest,
};
use pageserver_api::shard::TenantShardId;
use pageserver_page_api::proto;
use rand::prelude::*;
@@ -77,16 +77,6 @@ pub(crate) struct Args {
#[clap(long, default_value = "1")]
queue_depth: NonZeroUsize,
/// Batch size of contiguous pages generated by each client. This is equivalent to how Postgres
/// will request page batches (e.g. prefetches or vectored reads). A batch counts as 1 RPS and
/// 1 queue depth.
///
/// The libpq protocol does not support client-side batching, and will submit batches as many
/// individual requests, in the hope that the server will batch them. Each batch still counts as
/// 1 RPS and 1 queue depth.
#[clap(long, default_value = "1")]
batch_size: NonZeroUsize,
#[clap(long)]
only_relnode: Option<u32>,
@@ -402,16 +392,7 @@ async fn run_worker(
shared_state.start_work_barrier.wait().await;
let client_start = Instant::now();
let mut ticks_processed = 0;
let mut req_id = 0;
let batch_size: usize = args.batch_size.into();
// Track inflight requests by request ID and start time. This times the request duration, and
// ensures responses match requests. We don't expect responses back in any particular order.
//
// NB: this does not check that all requests received a response, because we don't wait for the
// inflight requests to complete when the duration elapses.
let mut inflight: HashMap<u64, Instant> = HashMap::new();
let mut inflight = VecDeque::new();
while !cancel.is_cancelled() {
// Detect if a request took longer than the RPS rate
if let Some(period) = &rps_period {
@@ -427,72 +408,36 @@ async fn run_worker(
}
while inflight.len() < args.queue_depth.get() {
req_id += 1;
let start = Instant::now();
let (req_lsn, mod_lsn, rel, blks) = {
/// Converts a compact i128 key to a relation tag and block number.
fn key_to_block(key: i128) -> (RelTag, u32) {
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
key.to_rel_block()
.expect("we filter non-rel-block keys out above")
}
// Pick a random page from a random relation.
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let (rel_tag, block_no) = key_to_block(key);
let mut blks = VecDeque::with_capacity(batch_size);
blks.push_back(block_no);
// If requested, populate a batch of sequential pages. This is how Postgres will
// request page batches (e.g. prefetches). If we hit the end of the relation, we
// grow the batch towards the start too.
for i in 1..batch_size {
let (r, b) = key_to_block(key + i as i128);
if r != rel_tag {
break; // went outside relation
}
blks.push_back(b)
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
if blks.len() < batch_size {
// Grow batch backwards if needed.
for i in 1..batch_size {
let (r, b) = key_to_block(key - i as i128);
if r != rel_tag {
break; // went outside relation
}
blks.push_front(b)
}
}
// We assume that the entire batch can fit within the relation.
assert_eq!(blks.len(), batch_size, "incomplete batch");
let req_lsn = if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
};
(req_lsn, r.timeline_lsn, rel_tag, blks.into())
};
client
.send_get_page(req_id, req_lsn, mod_lsn, rel, blks)
.await
.unwrap();
let old = inflight.insert(req_id, start);
assert!(old.is_none(), "duplicate request ID {req_id}");
client.send_get_page(req).await.unwrap();
inflight.push_back(start);
}
let (req_id, pages) = client.recv_get_page().await.unwrap();
assert_eq!(pages.len(), batch_size, "unexpected page count");
assert!(pages.iter().all(|p| !p.is_empty()), "empty page");
let start = inflight
.remove(&req_id)
.expect("response for unknown request ID");
let start = inflight.pop_front().unwrap();
client.recv_get_page().await.unwrap();
let end = Instant::now();
shared_state.live_stats.request_done();
ticks_processed += 1;
@@ -522,24 +467,15 @@ async fn run_worker(
#[async_trait]
trait Client: Send {
/// Sends an asynchronous GetPage request to the pageserver.
async fn send_get_page(
&mut self,
req_id: u64,
req_lsn: Lsn,
mod_lsn: Lsn,
rel: RelTag,
blks: Vec<u32>,
) -> anyhow::Result<()>;
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()>;
/// Receives the next GetPage response from the pageserver.
async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)>;
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse>;
}
/// A libpq-based Pageserver client.
struct LibpqClient {
inner: pageserver_client::page_service::PagestreamClient,
// Track sent batches, so we know how many responses to expect.
batch_sizes: VecDeque<usize>,
}
impl LibpqClient {
@@ -548,55 +484,18 @@ impl LibpqClient {
.await?
.pagestream(ttid.tenant_id, ttid.timeline_id)
.await?;
Ok(Self {
inner,
batch_sizes: VecDeque::new(),
})
Ok(Self { inner })
}
}
#[async_trait]
impl Client for LibpqClient {
async fn send_get_page(
&mut self,
req_id: u64,
req_lsn: Lsn,
mod_lsn: Lsn,
rel: RelTag,
blks: Vec<u32>,
) -> anyhow::Result<()> {
// libpq doesn't support client-side batches, so we send a bunch of individual requests
// instead in the hope that the server will batch them for us. We use the same request ID
// for all, because we'll return a single batch response.
self.batch_sizes.push_back(blks.len());
for blkno in blks {
let req = PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: req_id,
request_lsn: req_lsn,
not_modified_since: mod_lsn,
},
rel,
blkno,
};
self.inner.getpage_send(req).await?;
}
Ok(())
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
self.inner.getpage_send(req).await
}
async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)> {
let batch_size = self.batch_sizes.pop_front().unwrap();
let mut batch = Vec::with_capacity(batch_size);
let mut req_id = None;
for _ in 0..batch_size {
let resp = self.inner.getpage_recv().await?;
if req_id.is_none() {
req_id = Some(resp.req.hdr.reqid);
}
assert_eq!(req_id, Some(resp.req.hdr.reqid), "request ID mismatch");
batch.push(resp.page);
}
Ok((req_id.unwrap(), batch))
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
self.inner.getpage_recv().await
}
}
@@ -633,35 +532,31 @@ impl GrpcClient {
#[async_trait]
impl Client for GrpcClient {
async fn send_get_page(
&mut self,
req_id: u64,
req_lsn: Lsn,
mod_lsn: Lsn,
rel: RelTag,
blks: Vec<u32>,
) -> anyhow::Result<()> {
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
let req = proto::GetPageRequest {
request_id: req_id,
request_id: 0,
request_class: proto::GetPageClass::Normal as i32,
read_lsn: Some(proto::ReadLsn {
request_lsn: req_lsn.0,
not_modified_since_lsn: mod_lsn.0,
request_lsn: req.hdr.request_lsn.0,
not_modified_since_lsn: req.hdr.not_modified_since.0,
}),
rel: Some(rel.into()),
block_number: blks,
rel: Some(req.rel.into()),
block_number: vec![req.blkno],
};
self.req_tx.send(req).await?;
Ok(())
}
async fn recv_get_page(&mut self) -> anyhow::Result<(u64, Vec<Bytes>)> {
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
let resp = self.resp_rx.message().await?.unwrap();
anyhow::ensure!(
resp.status_code == proto::GetPageStatusCode::Ok as i32,
"unexpected status code: {}",
resp.status_code
);
Ok((resp.request_id, resp.page_image))
Ok(PagestreamGetPageResponse {
page: resp.page_image[0].clone(),
req: PagestreamGetPageRequest::default(), // dummy
})
}
}

View File

@@ -1,6 +1,5 @@
use std::{collections::HashMap, sync::Arc};
use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
use camino::{Utf8Path, Utf8PathBuf};
use metrics::core::{AtomicU64, GenericCounter};
@@ -168,17 +167,14 @@ impl BasebackupCache {
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
fn tmp_dir(&self) -> Utf8PathBuf {
self.data_dir.join("tmp")
}
fn entry_tmp_path(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
lsn: Lsn,
) -> Utf8PathBuf {
self.tmp_dir()
self.data_dir
.join("tmp")
.join(Self::entry_filename(tenant_id, timeline_id, lsn))
}
@@ -198,18 +194,15 @@ impl BasebackupCache {
Some((tenant_id, timeline_id, lsn))
}
// Recreate the tmp directory to clear all files in it.
async fn clean_tmp_dir(&self) -> anyhow::Result<()> {
let tmp_dir = self.tmp_dir();
if tmp_dir.exists() {
tokio::fs::remove_dir_all(&tmp_dir).await?;
}
tokio::fs::create_dir_all(&tmp_dir).await?;
Ok(())
}
async fn cleanup(&self) -> anyhow::Result<()> {
self.clean_tmp_dir().await?;
// Cleanup tmp directory.
let tmp_dir = self.data_dir.join("tmp");
let mut tmp_dir = tokio::fs::read_dir(&tmp_dir).await?;
while let Some(dir_entry) = tmp_dir.next_entry().await? {
if let Err(e) = tokio::fs::remove_file(dir_entry.path()).await {
tracing::warn!("Failed to remove basebackup cache tmp file: {:#}", e);
}
}
// Remove outdated entries.
let entries_old = self.entries.lock().unwrap().clone();
@@ -248,14 +241,16 @@ impl BasebackupCache {
}
async fn on_startup(&self) -> anyhow::Result<()> {
// Create data_dir if it does not exist.
tokio::fs::create_dir_all(&self.data_dir)
// Create data_dir and tmp directory if they do not exist.
tokio::fs::create_dir_all(&self.data_dir.join("tmp"))
.await
.context("Failed to create basebackup cache data directory")?;
self.clean_tmp_dir()
.await
.context("Failed to clean tmp directory")?;
.map_err(|e| {
anyhow::anyhow!(
"Failed to create basebackup cache data_dir {:?}: {:?}",
self.data_dir,
e
)
})?;
// Read existing entries from the data_dir and add them to in-memory state.
let mut entries = HashMap::new();
@@ -456,11 +451,6 @@ impl BasebackupCache {
}
// Move the tmp file to the final location atomically.
// The tmp file is fsynced, so it's guaranteed that we will not have a partial file
// in the main directory.
// It's not necessary to fsync the inode after renaming, because the worst case is that
// the rename operation will be rolled back on the disk failure, the entry will disappear
// from the main directory, and the entry access will cause a cache miss.
let entry_path = self.entry_path(tenant_shard_id.tenant_id, timeline_id, req_lsn);
tokio::fs::rename(&entry_tmp_path, &entry_path).await?;
@@ -478,17 +468,16 @@ impl BasebackupCache {
}
/// Prepares a basebackup in a temporary file.
/// Guarantees that the tmp file is fsynced before returning.
async fn prepare_basebackup_tmp(
&self,
entry_tmp_path: &Utf8Path,
emptry_tmp_path: &Utf8Path,
timeline: &Arc<Timeline>,
req_lsn: Lsn,
) -> anyhow::Result<()> {
let ctx = RequestContext::new(TaskKind::BasebackupCache, DownloadBehavior::Download);
let ctx = ctx.with_scope_timeline(timeline);
let file = tokio::fs::File::create(entry_tmp_path).await?;
let file = tokio::fs::File::create(emptry_tmp_path).await?;
let mut writer = BufWriter::new(file);
let mut encoder = GzipEncoder::with_quality(

View File

@@ -23,7 +23,6 @@ use pageserver::deletion_queue::DeletionQueue;
use pageserver::disk_usage_eviction_task::{self, launch_disk_usage_global_eviction_task};
use pageserver::feature_resolver::FeatureResolver;
use pageserver::metrics::{STARTUP_DURATION, STARTUP_IS_LOADING};
use pageserver::page_service::GrpcPageServiceHandler;
use pageserver::task_mgr::{
BACKGROUND_RUNTIME, COMPUTE_REQUEST_RUNTIME, MGMT_REQUEST_RUNTIME, WALRECEIVER_RUNTIME,
};
@@ -159,6 +158,7 @@ fn main() -> anyhow::Result<()> {
// (maybe we should automate this with a visitor?).
info!(?conf.virtual_file_io_engine, "starting with virtual_file IO engine");
info!(?conf.virtual_file_io_mode, "starting with virtual_file IO mode");
info!(?conf.wal_receiver_protocol, "starting with WAL receiver protocol");
info!(?conf.validate_wal_contiguity, "starting with WAL contiguity validation");
info!(?conf.page_service_pipelining, "starting with page service pipelining config");
info!(?conf.get_vectored_concurrent_io, "starting with get_vectored IO concurrency config");
@@ -573,8 +573,7 @@ fn start_pageserver(
tokio::sync::mpsc::unbounded_channel();
let deletion_queue_client = deletion_queue.new_client();
let background_purges = mgr::BackgroundPurges::default();
let tenant_manager = mgr::init(
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
conf,
background_purges.clone(),
TenantSharedResources {
@@ -585,10 +584,10 @@ fn start_pageserver(
basebackup_prepare_sender,
feature_resolver,
},
order,
shutdown_pageserver.clone(),
);
))?;
let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(tenant_manager.clone(), order))?;
let basebackup_cache = BasebackupCache::spawn(
BACKGROUND_RUNTIME.handle(),
@@ -816,11 +815,10 @@ fn start_pageserver(
// necessary?
let mut page_service_grpc = None;
if let Some(grpc_listener) = grpc_listener {
page_service_grpc = Some(GrpcPageServiceHandler::spawn(
page_service_grpc = Some(page_service::spawn_grpc(
tenant_manager.clone(),
grpc_auth,
otel_guard.as_ref().map(|g| g.dispatch.clone()),
conf.get_vectored_concurrent_io,
grpc_listener,
)?);
}

View File

@@ -27,6 +27,7 @@ use reqwest::Url;
use storage_broker::Uri;
use utils::id::{NodeId, TimelineId};
use utils::logging::{LogFormat, SecretString};
use utils::postgres_client::PostgresClientProtocol;
use crate::tenant::storage_layer::inmemory_layer::IndexEntry;
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
@@ -210,6 +211,8 @@ pub struct PageServerConf {
/// Optionally disable disk syncs (unsafe!)
pub no_sync: bool,
pub wal_receiver_protocol: PostgresClientProtocol,
pub page_service_pipelining: pageserver_api::config::PageServicePipeliningConfig,
pub get_vectored_concurrent_io: pageserver_api::config::GetVectoredConcurrentIo,
@@ -418,6 +421,7 @@ impl PageServerConf {
virtual_file_io_engine,
tenant_config,
no_sync,
wal_receiver_protocol,
page_service_pipelining,
get_vectored_concurrent_io,
enable_read_path_debugging,
@@ -480,6 +484,7 @@ impl PageServerConf {
import_pgdata_upcall_api,
import_pgdata_upcall_api_token: import_pgdata_upcall_api_token.map(SecretString::from),
import_pgdata_aws_endpoint_url,
wal_receiver_protocol,
page_service_pipelining,
get_vectored_concurrent_io,
tracing,

View File

@@ -1,29 +1,21 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use pageserver_api::config::NodeMetadata;
use posthog_client_lite::{
CaptureEvent, FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
PostHogFlagFilterPropertyValue,
FeatureResolverBackgroundLoop, PostHogClientConfig, PostHogEvaluationError,
};
use remote_storage::RemoteStorageKind;
use serde_json::json;
use tokio_util::sync::CancellationToken;
use utils::id::TenantId;
use crate::{config::PageServerConf, metrics::FEATURE_FLAG_EVALUATION};
use crate::config::PageServerConf;
#[derive(Clone)]
pub struct FeatureResolver {
inner: Option<Arc<FeatureResolverBackgroundLoop>>,
internal_properties: Option<Arc<HashMap<String, PostHogFlagFilterPropertyValue>>>,
}
impl FeatureResolver {
pub fn new_disabled() -> Self {
Self {
inner: None,
internal_properties: None,
}
Self { inner: None }
}
pub fn spawn(
@@ -44,142 +36,14 @@ impl FeatureResolver {
shutdown_pageserver,
);
let inner = Arc::new(inner);
// The properties shared by all tenants on this pageserver.
let internal_properties = {
let mut properties = HashMap::new();
properties.insert(
"pageserver_id".to_string(),
PostHogFlagFilterPropertyValue::String(conf.id.to_string()),
);
if let Some(availability_zone) = &conf.availability_zone {
properties.insert(
"availability_zone".to_string(),
PostHogFlagFilterPropertyValue::String(availability_zone.clone()),
);
}
// Infer region based on the remote storage config.
if let Some(remote_storage) = &conf.remote_storage_config {
match &remote_storage.storage {
RemoteStorageKind::AwsS3(config) => {
properties.insert(
"region".to_string(),
PostHogFlagFilterPropertyValue::String(format!(
"aws-{}",
config.bucket_region
)),
);
}
RemoteStorageKind::AzureContainer(config) => {
properties.insert(
"region".to_string(),
PostHogFlagFilterPropertyValue::String(format!(
"azure-{}",
config.container_region
)),
);
}
RemoteStorageKind::LocalFs { .. } => {
properties.insert(
"region".to_string(),
PostHogFlagFilterPropertyValue::String("local".to_string()),
);
}
}
}
// TODO: move this to a background task so that we don't block startup in case of slow disk
let metadata_path = conf.metadata_path();
match std::fs::read_to_string(&metadata_path) {
Ok(metadata_str) => match serde_json::from_str::<NodeMetadata>(&metadata_str) {
Ok(metadata) => {
properties.insert(
"hostname".to_string(),
PostHogFlagFilterPropertyValue::String(metadata.http_host),
);
if let Some(cplane_region) = metadata.other.get("region_id") {
if let Some(cplane_region) = cplane_region.as_str() {
// This region contains the cell number
properties.insert(
"neon_region".to_string(),
PostHogFlagFilterPropertyValue::String(
cplane_region.to_string(),
),
);
}
}
}
Err(e) => {
tracing::warn!("Failed to parse metadata.json: {}", e);
}
},
Err(e) => {
tracing::warn!("Failed to read metadata.json: {}", e);
}
}
Arc::new(properties)
};
let fake_tenants = {
let mut tenants = Vec::new();
for i in 0..10 {
let distinct_id = format!(
"fake_tenant_{}_{}_{}",
conf.availability_zone.as_deref().unwrap_or_default(),
conf.id,
i
);
let properties = Self::collect_properties_inner(
distinct_id.clone(),
Some(&internal_properties),
);
tenants.push(CaptureEvent {
event: "initial_tenant_report".to_string(),
distinct_id,
properties: json!({ "$set": properties }), // use `$set` to set the person properties instead of the event properties
});
}
tenants
};
// TODO: make refresh period configurable
inner
.clone()
.spawn(handle, Duration::from_secs(60), fake_tenants);
Ok(FeatureResolver {
inner: Some(inner),
internal_properties: Some(internal_properties),
})
// TODO: make this configurable
inner.clone().spawn(handle, Duration::from_secs(60));
Ok(FeatureResolver { inner: Some(inner) })
} else {
Ok(FeatureResolver {
inner: None,
internal_properties: None,
})
Ok(FeatureResolver { inner: None })
}
}
fn collect_properties_inner(
tenant_id: String,
internal_properties: Option<&HashMap<String, PostHogFlagFilterPropertyValue>>,
) -> HashMap<String, PostHogFlagFilterPropertyValue> {
let mut properties = HashMap::new();
if let Some(internal_properties) = internal_properties {
for (key, value) in internal_properties.iter() {
properties.insert(key.clone(), value.clone());
}
}
properties.insert(
"tenant_id".to_string(),
PostHogFlagFilterPropertyValue::String(tenant_id),
);
properties
}
/// Collect all properties availble for the feature flag evaluation.
pub(crate) fn collect_properties(
&self,
tenant_id: TenantId,
) -> HashMap<String, PostHogFlagFilterPropertyValue> {
Self::collect_properties_inner(tenant_id.to_string(), self.internal_properties.as_deref())
}
/// Evaluate a multivariate feature flag. Currently, we do not support any properties.
///
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
@@ -191,24 +55,11 @@ impl FeatureResolver {
tenant_id: TenantId,
) -> Result<String, PostHogEvaluationError> {
if let Some(inner) = &self.inner {
let res = inner.feature_store().evaluate_multivariate(
inner.feature_store().evaluate_multivariate(
flag_key,
&tenant_id.to_string(),
&self.collect_properties(tenant_id),
);
match &res {
Ok(value) => {
FEATURE_FLAG_EVALUATION
.with_label_values(&[flag_key, "ok", value])
.inc();
}
Err(e) => {
FEATURE_FLAG_EVALUATION
.with_label_values(&[flag_key, "error", e.as_variant_str()])
.inc();
}
}
res
&HashMap::new(),
)
} else {
Err(PostHogEvaluationError::NotAvailable(
"PostHog integration is not enabled".to_string(),
@@ -229,24 +80,11 @@ impl FeatureResolver {
tenant_id: TenantId,
) -> Result<(), PostHogEvaluationError> {
if let Some(inner) = &self.inner {
let res = inner.feature_store().evaluate_boolean(
inner.feature_store().evaluate_boolean(
flag_key,
&tenant_id.to_string(),
&self.collect_properties(tenant_id),
);
match &res {
Ok(()) => {
FEATURE_FLAG_EVALUATION
.with_label_values(&[flag_key, "ok", "true"])
.inc();
}
Err(e) => {
FEATURE_FLAG_EVALUATION
.with_label_values(&[flag_key, "error", e.as_variant_str()])
.inc();
}
}
res
&HashMap::new(),
)
} else {
Err(PostHogEvaluationError::NotAvailable(
"PostHog integration is not enabled".to_string(),

View File

@@ -43,7 +43,6 @@ use pageserver_api::models::{
use pageserver_api::shard::{ShardCount, TenantShardId};
use remote_storage::{DownloadError, GenericRemoteStorage, TimeTravelError};
use scopeguard::defer;
use serde_json::json;
use tenant_size_model::svg::SvgBranchKind;
use tenant_size_model::{SizeResult, StorageModel};
use tokio::time::Instant;
@@ -73,7 +72,6 @@ use crate::tenant::remote_timeline_client::{
use crate::tenant::secondary::SecondaryController;
use crate::tenant::size::ModelInputs;
use crate::tenant::storage_layer::{IoConcurrency, LayerAccessStatsReset, LayerName};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::timeline::offload::{OffloadError, offload_timeline};
use crate::tenant::timeline::{
CompactFlags, CompactOptions, CompactRequest, CompactionError, MarkInvisibleRequest, Timeline,
@@ -1452,10 +1450,7 @@ async fn timeline_layer_scan_disposable_keys(
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download)
.with_scope_timeline(&timeline);
let guard = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = timeline.layers.read().await;
let Some(layer) = guard.try_get_from_key(&layer_name.clone().into()) else {
return Err(ApiError::NotFound(
anyhow::anyhow!("Layer {tenant_shard_id}/{timeline_id}/{layer_name} not found").into(),
@@ -3684,24 +3679,23 @@ async fn tenant_evaluate_feature_flag(
let tenant = state
.tenant_manager
.get_attached_tenant_shard(tenant_shard_id)?;
let properties = tenant.feature_resolver.collect_properties(tenant_shard_id.tenant_id);
if as_type == "boolean" {
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
let result = result.map(|_| true).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
json_response(StatusCode::OK, result)
} else if as_type == "multivariate" {
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
json_response(StatusCode::OK, result)
} else {
// Auto infer the type of the feature flag.
let is_boolean = tenant.feature_resolver.is_feature_flag_boolean(&flag).map_err(|e| ApiError::InternalServerError(anyhow::anyhow!("{e}")))?;
if is_boolean {
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
let result = result.map(|_| true).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
json_response(StatusCode::OK, result)
} else {
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
json_response(StatusCode::OK, json!({ "result": result, "properties": properties }))
json_response(StatusCode::OK, result)
}
}
}

View File

@@ -446,15 +446,6 @@ static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub(crate) static FEATURE_FLAG_EVALUATION: Lazy<CounterVec> = Lazy::new(|| {
register_counter_vec!(
"pageserver_feature_flag_evaluation",
"Number of times a feature flag is evaluated",
&["flag_key", "status", "value"],
)
.unwrap()
});
#[derive(IntoStaticStr)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum PageCacheErrorKind {
@@ -1053,15 +1044,6 @@ pub(crate) static TENANT_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
.expect("Failed to register pageserver_tenant_states_count metric")
});
pub(crate) static TIMELINE_STATE_METRIC: Lazy<UIntGaugeVec> = Lazy::new(|| {
register_uint_gauge_vec!(
"pageserver_timeline_states_count",
"Count of timelines per state",
&["state"]
)
.expect("Failed to register pageserver_timeline_states_count metric")
});
/// A set of broken tenants.
///
/// These are expected to be so rare that a set is fine. Set as in a new timeseries per each broken
@@ -2864,6 +2846,7 @@ pub(crate) struct WalIngestMetrics {
pub(crate) records_received: IntCounter,
pub(crate) records_observed: IntCounter,
pub(crate) records_committed: IntCounter,
pub(crate) records_filtered: IntCounter,
pub(crate) values_committed_metadata_images: IntCounter,
pub(crate) values_committed_metadata_deltas: IntCounter,
pub(crate) values_committed_data_images: IntCounter,
@@ -2919,6 +2902,11 @@ pub(crate) static WAL_INGEST: Lazy<WalIngestMetrics> = Lazy::new(|| {
"Number of WAL records which resulted in writes to pageserver storage"
)
.expect("failed to define a metric"),
records_filtered: register_int_counter!(
"pageserver_wal_ingest_records_filtered",
"Number of WAL records filtered out due to sharding"
)
.expect("failed to define a metric"),
values_committed_metadata_images: values_committed.with_label_values(&["metadata", "image"]),
values_committed_metadata_deltas: values_committed.with_label_values(&["metadata", "delta"]),
values_committed_data_images: values_committed.with_label_values(&["data", "image"]),
@@ -3334,8 +3322,6 @@ impl TimelineMetrics {
&timeline_id,
);
TIMELINE_STATE_METRIC.with_label_values(&["active"]).inc();
TimelineMetrics {
tenant_id,
shard_id,
@@ -3490,8 +3476,6 @@ impl TimelineMetrics {
return;
}
TIMELINE_STATE_METRIC.with_label_values(&["active"]).dec();
let tenant_id = &self.tenant_id;
let timeline_id = &self.timeline_id;
let shard_id = &self.shard_id;

View File

@@ -169,6 +169,96 @@ pub fn spawn(
Listener { cancel, task }
}
/// Spawns a gRPC server for the page service.
///
/// TODO: move this onto GrpcPageServiceHandler::spawn().
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
/// need to reimplement the TCP+TLS accept loop ourselves.
pub fn spawn_grpc(
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
listener: std::net::TcpListener,
) -> anyhow::Result<CancellableTask> {
let cancel = CancellationToken::new();
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
.download_behavior(DownloadBehavior::Download)
.perf_span_dispatch(perf_trace_dispatch)
.detached_child();
let gate = Gate::default();
// Set up the TCP socket. We take a preconfigured TcpListener to bind the
// port early during startup.
let incoming = {
let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
listener.set_nonblocking(true)?;
tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(listener)?)
.with_nodelay(Some(GRPC_TCP_NODELAY))
.with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
};
// Set up the gRPC server.
//
// TODO: consider tuning window sizes.
let mut server = tonic::transport::Server::builder()
.http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
.http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
// Main page service stack. Uses a mix of Tonic interceptors and Tower layers:
//
// * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service.
//
// * Layers: allow async code, can run code after the service response. However, only has access
// to the raw HTTP request/response, not the gRPC types.
let page_service_handler = GrpcPageServiceHandler {
tenant_manager,
ctx,
};
let observability_layer = ObservabilityLayer;
let mut tenant_interceptor = TenantMetadataInterceptor;
let mut auth_interceptor = TenantAuthInterceptor::new(auth);
let page_service = tower::ServiceBuilder::new()
// Create tracing span and record request start time.
.layer(observability_layer)
// Intercept gRPC requests.
.layer(tonic::service::InterceptorLayer::new(move |mut req| {
// Extract tenant metadata.
req = tenant_interceptor.call(req)?;
// Authenticate tenant JWT token.
req = auth_interceptor.call(req)?;
Ok(req)
}))
.service(proto::PageServiceServer::new(page_service_handler));
let server = server.add_service(page_service);
// Reflection service for use with e.g. grpcurl.
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
.build_v1()?;
let server = server.add_service(reflection_service);
// Spawn server task.
let task_cancel = cancel.clone();
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"grpc listener",
async move {
let result = server
.serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
.await;
if result.is_ok() {
// TODO: revisit shutdown logic once page service is implemented.
gate.close().await;
}
result
},
));
Ok(CancellableTask { task, cancel })
}
impl Listener {
pub async fn stop_accepting(self) -> Connections {
self.cancel.cancel();
@@ -407,6 +497,10 @@ async fn page_service_conn_main(
}
/// Page service connection handler.
///
/// TODO: for gRPC, this will be shared by all requests from all connections.
/// Decompose it into global state and per-connection/request state, and make
/// libpq-specific options (e.g. pipelining) separate.
struct PageServerHandler {
auth: Option<Arc<SwappableJwtAuth>>,
claims: Option<Claims>,
@@ -3268,106 +3362,9 @@ where
pub struct GrpcPageServiceHandler {
tenant_manager: Arc<TenantManager>,
ctx: RequestContext,
gate_guard: GateGuard,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
}
impl GrpcPageServiceHandler {
/// Spawns a gRPC server for the page service.
///
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
/// need to reimplement the TCP+TLS accept loop ourselves.
pub fn spawn(
tenant_manager: Arc<TenantManager>,
auth: Option<Arc<SwappableJwtAuth>>,
perf_trace_dispatch: Option<Dispatch>,
get_vectored_concurrent_io: GetVectoredConcurrentIo,
listener: std::net::TcpListener,
) -> anyhow::Result<CancellableTask> {
let cancel = CancellationToken::new();
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
.download_behavior(DownloadBehavior::Download)
.perf_span_dispatch(perf_trace_dispatch)
.detached_child();
let gate = Gate::default();
// Set up the TCP socket. We take a preconfigured TcpListener to bind the
// port early during startup.
let incoming = {
let _runtime = COMPUTE_REQUEST_RUNTIME.enter(); // required by TcpListener::from_std
listener.set_nonblocking(true)?;
tonic::transport::server::TcpIncoming::from(tokio::net::TcpListener::from_std(
listener,
)?)
.with_nodelay(Some(GRPC_TCP_NODELAY))
.with_keepalive(Some(GRPC_TCP_KEEPALIVE_TIME))
};
// Set up the gRPC server.
//
// TODO: consider tuning window sizes.
let mut server = tonic::transport::Server::builder()
.http2_keepalive_interval(Some(GRPC_HTTP2_KEEPALIVE_INTERVAL))
.http2_keepalive_timeout(Some(GRPC_HTTP2_KEEPALIVE_TIMEOUT))
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
// Main page service stack. Uses a mix of Tonic interceptors and Tower layers:
//
// * Interceptors: can inspect and modify the gRPC request. Sync code only, runs before service.
//
// * Layers: allow async code, can run code after the service response. However, only has access
// to the raw HTTP request/response, not the gRPC types.
let page_service_handler = GrpcPageServiceHandler {
tenant_manager,
ctx,
gate_guard: gate.enter().expect("gate was just created"),
get_vectored_concurrent_io,
};
let observability_layer = ObservabilityLayer;
let mut tenant_interceptor = TenantMetadataInterceptor;
let mut auth_interceptor = TenantAuthInterceptor::new(auth);
let page_service = tower::ServiceBuilder::new()
// Create tracing span and record request start time.
.layer(observability_layer)
// Intercept gRPC requests.
.layer(tonic::service::InterceptorLayer::new(move |mut req| {
// Extract tenant metadata.
req = tenant_interceptor.call(req)?;
// Authenticate tenant JWT token.
req = auth_interceptor.call(req)?;
Ok(req)
}))
// Run the page service.
.service(proto::PageServiceServer::new(page_service_handler));
let server = server.add_service(page_service);
// Reflection service for use with e.g. grpcurl.
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(proto::FILE_DESCRIPTOR_SET)
.build_v1()?;
let server = server.add_service(reflection_service);
// Spawn server task.
let task_cancel = cancel.clone();
let task = COMPUTE_REQUEST_RUNTIME.spawn(task_mgr::exit_on_panic_or_error(
"grpc listener",
async move {
let result = server
.serve_with_incoming_shutdown(incoming, task_cancel.cancelled())
.await;
if result.is_ok() {
// TODO: revisit shutdown logic once page service is implemented.
gate.close().await;
}
result
},
));
Ok(CancellableTask { task, cancel })
}
/// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of
/// relations and their sizes, as well as SLRU segments and similar data.
#[allow(clippy::result_large_err)]
@@ -3724,14 +3721,6 @@ impl proto::PageService for GrpcPageServiceHandler {
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
.await?;
// Spawn an IoConcurrency sidecar, if enabled.
let Ok(gate_guard) = self.gate_guard.try_clone() else {
return Err(tonic::Status::unavailable("shutting down"));
};
let io_concurrency =
IoConcurrency::spawn_from_conf(self.get_vectored_concurrent_io, gate_guard);
// Spawn a task to handle the GetPageRequest stream.
let span = Span::current();
let ctx = self.ctx.attached_child();
let mut reqs = req.into_inner();
@@ -3742,7 +3731,8 @@ impl proto::PageService for GrpcPageServiceHandler {
.await?
.downgrade();
while let Some(req) = reqs.message().await? {
yield Self::get_page(&ctx, &timeline, req, io_concurrency.clone())
// TODO: implement IoConcurrency sidecar.
yield Self::get_page(&ctx, &timeline, req, IoConcurrency::Sequential)
.instrument(span.clone()) // propagate request span
.await?
}

View File

@@ -51,7 +51,6 @@ use secondary::heatmap::{HeatMapTenant, HeatMapTimeline};
use storage_broker::BrokerClientChannel;
use timeline::compaction::{CompactionOutcome, GcCompactionQueue};
use timeline::import_pgdata::ImportingTimeline;
use timeline::layer_manager::LayerManagerLockHolder;
use timeline::offload::{OffloadError, offload_timeline};
use timeline::{
CompactFlags, CompactOptions, CompactionError, PreviousHeatmap, ShutdownMode, import_pgdata,
@@ -90,8 +89,7 @@ use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::{
BROKEN_TENANTS_SET, CIRCUIT_BREAKERS_BROKEN, CIRCUIT_BREAKERS_UNBROKEN, CONCURRENT_INITDBS,
INITDB_RUN_TIME, INITDB_SEMAPHORE_ACQUISITION_TIME, TENANT, TENANT_OFFLOADED_TIMELINES,
TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC, TIMELINE_STATE_METRIC,
remove_tenant_metrics,
TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC, remove_tenant_metrics,
};
use crate::task_mgr::TaskKind;
use crate::tenant::config::LocationMode;
@@ -546,28 +544,6 @@ pub struct OffloadedTimeline {
/// Part of the `OffloadedTimeline` object's lifecycle: this needs to be set before we drop it
pub deleted_from_ancestor: AtomicBool,
_metrics_guard: OffloadedTimelineMetricsGuard,
}
/// Increases the offloaded timeline count metric when created, and decreases when dropped.
struct OffloadedTimelineMetricsGuard;
impl OffloadedTimelineMetricsGuard {
fn new() -> Self {
TIMELINE_STATE_METRIC
.with_label_values(&["offloaded"])
.inc();
Self
}
}
impl Drop for OffloadedTimelineMetricsGuard {
fn drop(&mut self) {
TIMELINE_STATE_METRIC
.with_label_values(&["offloaded"])
.dec();
}
}
impl OffloadedTimeline {
@@ -600,8 +576,6 @@ impl OffloadedTimeline {
delete_progress: timeline.delete_progress.clone(),
deleted_from_ancestor: AtomicBool::new(false),
_metrics_guard: OffloadedTimelineMetricsGuard::new(),
})
}
fn from_manifest(tenant_shard_id: TenantShardId, manifest: &OffloadedTimelineManifest) -> Self {
@@ -621,7 +595,6 @@ impl OffloadedTimeline {
archived_at,
delete_progress: TimelineDeleteProgress::default(),
deleted_from_ancestor: AtomicBool::new(false),
_metrics_guard: OffloadedTimelineMetricsGuard::new(),
}
}
fn manifest(&self) -> OffloadedTimelineManifest {
@@ -1316,7 +1289,7 @@ impl TenantShard {
ancestor.is_some()
|| timeline
.layers
.read(LayerManagerLockHolder::LoadLayerMap)
.read()
.await
.layer_map()
.expect(
@@ -2644,7 +2617,7 @@ impl TenantShard {
}
let layer_names = tline
.layers
.read(LayerManagerLockHolder::Testing)
.read()
.await
.layer_map()
.unwrap()
@@ -3159,12 +3132,7 @@ impl TenantShard {
for timeline in &compact {
// Collect L0 counts. Can't await while holding lock above.
if let Ok(lm) = timeline
.layers
.read(LayerManagerLockHolder::Compaction)
.await
.layer_map()
{
if let Ok(lm) = timeline.layers.read().await.layer_map() {
l0_counts.insert(timeline.timeline_id, lm.level0_deltas().len());
}
}
@@ -4906,7 +4874,7 @@ impl TenantShard {
}
let layer_names = tline
.layers
.read(LayerManagerLockHolder::Testing)
.read()
.await
.layer_map()
.unwrap()
@@ -6976,7 +6944,7 @@ mod tests {
.await?;
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
let layer_map = tline.layers.read(LayerManagerLockHolder::Testing).await;
let layer_map = tline.layers.read().await;
let level0_deltas = layer_map
.layer_map()?
.level0_deltas()
@@ -7212,7 +7180,7 @@ mod tests {
let lsn = Lsn(0x10);
let inserted = bulk_insert_compact_gc(&tenant, &tline, &ctx, lsn, 50, 10000).await?;
let guard = tline.layers.read(LayerManagerLockHolder::Testing).await;
let guard = tline.layers.read().await;
let lm = guard.layer_map()?;
lm.dump(true, &ctx).await?;
@@ -8240,23 +8208,12 @@ mod tests {
tline.freeze_and_flush().await?; // force create a delta layer
}
let before_num_l0_delta_files = tline
.layers
.read(LayerManagerLockHolder::Testing)
.await
.layer_map()?
.level0_deltas()
.len();
let before_num_l0_delta_files =
tline.layers.read().await.layer_map()?.level0_deltas().len();
tline.compact(&cancel, EnumSet::default(), &ctx).await?;
let after_num_l0_delta_files = tline
.layers
.read(LayerManagerLockHolder::Testing)
.await
.layer_map()?
.level0_deltas()
.len();
let after_num_l0_delta_files = tline.layers.read().await.layer_map()?.level0_deltas().len();
assert!(
after_num_l0_delta_files < before_num_l0_delta_files,

View File

@@ -61,8 +61,8 @@ pub(crate) struct LocationConf {
/// The detailed shard identity. This structure is already scoped within
/// a TenantShardId, but we need the full ShardIdentity to enable calculating
/// key->shard mappings.
// TODO(vlad): Remove this default once all configs have a shard identity on disk.
#[serde(default = "ShardIdentity::unsharded")]
#[serde(skip_serializing_if = "ShardIdentity::is_unsharded")]
pub(crate) shard: ShardIdentity,
/// The pan-cluster tenant configuration, the same on all locations
@@ -149,12 +149,7 @@ impl LocationConf {
/// For use when attaching/re-attaching: update the generation stored in this
/// structure. If we were in a secondary state, promote to attached (posession
/// of a fresh generation implies this).
pub(crate) fn attach_in_generation(
&mut self,
mode: AttachmentMode,
generation: Generation,
stripe_size: ShardStripeSize,
) {
pub(crate) fn attach_in_generation(&mut self, mode: AttachmentMode, generation: Generation) {
match &mut self.mode {
LocationMode::Attached(attach_conf) => {
attach_conf.generation = generation;
@@ -168,8 +163,6 @@ impl LocationConf {
})
}
}
self.shard.stripe_size = stripe_size;
}
pub(crate) fn try_from(conf: &'_ models::LocationConfig) -> anyhow::Result<Self> {

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,6 @@
//! Helper functions to upload files to remote storage with a RemoteStorage
use std::io::{ErrorKind, SeekFrom};
use std::num::NonZeroU32;
use std::time::SystemTime;
use anyhow::{Context, bail};
@@ -229,25 +228,11 @@ pub(crate) async fn time_travel_recover_tenant(
let timelines_path = super::remote_timelines_path(tenant_shard_id);
prefixes.push(timelines_path);
}
// Limit the number of versions deletions, mostly so that we don't
// keep requesting forever if the list is too long, as we'd put the
// list in RAM.
// Building a list of 100k entries that reaches the limit roughly takes
// 40 seconds, and roughly corresponds to tenants of 2 TiB physical size.
const COMPLEXITY_LIMIT: Option<NonZeroU32> = NonZeroU32::new(100_000);
for prefix in &prefixes {
backoff::retry(
|| async {
storage
.time_travel_recover(
Some(prefix),
timestamp,
done_if_after,
cancel,
COMPLEXITY_LIMIT,
)
.time_travel_recover(Some(prefix), timestamp, done_if_after, cancel)
.await
},
|e| !matches!(e, TimeTravelError::Other(_)),

View File

@@ -1635,7 +1635,6 @@ pub(crate) mod test {
use crate::tenant::disk_btree::tests::TestDisk;
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
use crate::tenant::storage_layer::{Layer, ResidentLayer};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::{TenantShard, Timeline};
/// Construct an index for a fictional delta layer and and then
@@ -2003,7 +2002,7 @@ pub(crate) mod test {
let initdb_layer = timeline
.layers
.read(crate::tenant::timeline::layer_manager::LayerManagerLockHolder::Testing)
.read()
.await
.likely_resident_layers()
.next()
@@ -2079,7 +2078,7 @@ pub(crate) mod test {
let new_layer = timeline
.layers
.read(LayerManagerLockHolder::Testing)
.read()
.await
.likely_resident_layers()
.find(|&x| x != &initdb_layer)

View File

@@ -10,7 +10,6 @@ use super::*;
use crate::context::DownloadBehavior;
use crate::tenant::harness::{TenantHarness, test_img};
use crate::tenant::storage_layer::{IoConcurrency, LayerVisibilityHint};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
/// Used in tests to advance a future to wanted await point, and not futher.
const ADVANCE: std::time::Duration = std::time::Duration::from_secs(3600);
@@ -60,7 +59,7 @@ async fn smoke_test() {
// there to avoid the timeline being illegally empty
let (layer, dummy_layer) = {
let mut layers = {
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -216,7 +215,7 @@ async fn smoke_test() {
// Simulate GC removing our test layer.
{
let mut g = timeline.layers.write(LayerManagerLockHolder::Testing).await;
let mut g = timeline.layers.write().await;
let layers = &[layer];
g.open_mut().unwrap().finish_gc_timeline(layers);
@@ -262,7 +261,7 @@ async fn evict_and_wait_on_wanted_deleted() {
let layer = {
let mut layers = {
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -306,7 +305,7 @@ async fn evict_and_wait_on_wanted_deleted() {
// assert that once we remove the `layer` from the layer map and drop our reference,
// the deletion of the layer in remote_storage happens.
{
let mut layers = timeline.layers.write(LayerManagerLockHolder::Testing).await;
let mut layers = timeline.layers.write().await;
layers.open_mut().unwrap().finish_gc_timeline(&[layer]);
}
@@ -348,7 +347,7 @@ fn read_wins_pending_eviction() {
let layer = {
let mut layers = {
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -481,7 +480,7 @@ fn multiple_pending_evictions_scenario(name: &'static str, in_order: bool) {
let layer = {
let mut layers = {
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -656,7 +655,7 @@ async fn cancelled_get_or_maybe_download_does_not_cancel_eviction() {
let layer = {
let mut layers = {
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -742,7 +741,7 @@ async fn evict_and_wait_does_not_wait_for_download() {
let layer = {
let mut layers = {
let layers = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let layers = timeline.layers.read().await;
layers.likely_resident_layers().cloned().collect::<Vec<_>>()
};
@@ -863,7 +862,7 @@ async fn eviction_cancellation_on_drop() {
let (evicted_layer, not_evicted) = {
let mut layers = {
let mut guard = timeline.layers.write(LayerManagerLockHolder::Testing).await;
let mut guard = timeline.layers.write().await;
let layers = guard.likely_resident_layers().cloned().collect::<Vec<_>>();
// remove the layers from layermap
guard.open_mut().unwrap().finish_gc_timeline(&layers);

View File

@@ -35,11 +35,7 @@ use fail::fail_point;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use handle::ShardTimelineId;
use layer_manager::{
LayerManagerLockHolder, LayerManagerReadGuard, LayerManagerWriteGuard, LockedLayerManager,
Shutdown,
};
use layer_manager::Shutdown;
use offload::OffloadError;
use once_cell::sync::Lazy;
use pageserver_api::config::tenant_conf_defaults::DEFAULT_PITR_INTERVAL;
@@ -86,6 +82,7 @@ use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};
use self::delete::DeleteTimelineFlow;
pub(super) use self::eviction_task::EvictionTaskTenantState;
use self::eviction_task::EvictionTaskTimelineState;
use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};
use super::remote_timeline_client::RemoteTimelineClient;
@@ -184,13 +181,13 @@ impl std::fmt::Display for ImageLayerCreationMode {
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_layer_manager_rlock(rlock: LayerManagerReadGuard<'_>) {
fn drop_rlock<T>(rlock: tokio::sync::RwLockReadGuard<T>) {
drop(rlock)
}
/// Temporary function for immutable storage state refactor, ensures we are dropping mutex guard instead of other things.
/// Can be removed after all refactors are done.
fn drop_layer_manager_wlock(rlock: LayerManagerWriteGuard<'_>) {
fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
drop(rlock)
}
@@ -244,7 +241,7 @@ pub struct Timeline {
///
/// In the future, we'll be able to split up the tuple of LayerMap and `LayerFileManager`,
/// so that e.g. on-demand-download/eviction, and layer spreading, can operate just on `LayerFileManager`.
pub(crate) layers: LockedLayerManager,
pub(crate) layers: tokio::sync::RwLock<LayerManager>,
last_freeze_at: AtomicLsn,
// Atomic would be more appropriate here.
@@ -1058,8 +1055,8 @@ pub(crate) enum WaitLsnWaiter<'a> {
/// Argument to [`Timeline::shutdown`].
#[derive(Debug, Clone, Copy)]
pub(crate) enum ShutdownMode {
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk. This method can
/// take multiple seconds for a busy timeline.
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
/// also to remote storage. This method can easily take multiple seconds for a busy timeline.
///
/// While we are flushing, we continue to accept read I/O for LSNs ingested before
/// the call to [`Timeline::shutdown`].
@@ -1538,10 +1535,7 @@ impl Timeline {
/// This method makes no distinction between local and remote layers.
/// Hence, the result **does not represent local filesystem usage**.
pub(crate) async fn layer_size_sum(&self) -> u64 {
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = self.layers.read().await;
guard.layer_size_sum()
}
@@ -1851,7 +1845,7 @@ impl Timeline {
// time, and this was missed.
// if write_guard.is_none() { return; }
let Ok(layers_guard) = self.layers.try_read(LayerManagerLockHolder::TryFreezeLayer) else {
let Ok(layers_guard) = self.layers.try_read() else {
// Don't block if the layer lock is busy
return;
};
@@ -2164,7 +2158,7 @@ impl Timeline {
if let ShutdownMode::FreezeAndFlush = mode {
let do_flush = if let Some((open, frozen)) = self
.layers
.read(LayerManagerLockHolder::Shutdown)
.read()
.await
.layer_map()
.map(|lm| (lm.open_layer.is_some(), lm.frozen_layers.len()))
@@ -2268,10 +2262,7 @@ impl Timeline {
// Allow any remaining in-memory layers to do cleanup -- until that, they hold the gate
// open.
let mut write_guard = self.write_lock.lock().await;
self.layers
.write(LayerManagerLockHolder::Shutdown)
.await
.shutdown(&mut write_guard);
self.layers.write().await.shutdown(&mut write_guard);
}
// Finally wait until any gate-holders are complete.
@@ -2374,10 +2365,7 @@ impl Timeline {
&self,
reset: LayerAccessStatsReset,
) -> Result<LayerMapInfo, layer_manager::Shutdown> {
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
let mut in_memory_layers = Vec::with_capacity(layer_map.frozen_layers.len() + 1);
if let Some(open_layer) = &layer_map.open_layer {
@@ -2518,13 +2506,6 @@ impl Timeline {
// Preparing basebackup doesn't make sense for shards other than shard zero.
return;
}
if !self.is_active() {
// May happen during initial timeline creation.
// Such timeline is not in the global timeline map yet,
// so basebackup cache will not be able to find it.
// TODO(diko): We can prepare such timelines in finish_creation().
return;
}
let res = self
.basebackup_prepare_sender
@@ -2864,6 +2845,21 @@ impl Timeline {
)
}
/// Resolve the effective WAL receiver protocol to use for this tenant.
///
/// Priority order is:
/// 1. Tenant config override
/// 2. Default value for tenant config override
/// 3. Pageserver config override
/// 4. Pageserver config default
pub fn resolve_wal_receiver_protocol(&self) -> PostgresClientProtocol {
let tenant_conf = self.tenant_conf.load().tenant_conf.clone();
tenant_conf
.wal_receiver_protocol_override
.or(self.conf.default_tenant_conf.wal_receiver_protocol_override)
.unwrap_or(self.conf.wal_receiver_protocol)
}
pub(super) fn tenant_conf_updated(&self, new_conf: &AttachedTenantConf) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
@@ -3219,16 +3215,10 @@ impl Timeline {
guard.is_none(),
"multiple launches / re-launches of WAL receiver are not supported"
);
let protocol = PostgresClientProtocol::Interpreted {
format: utils::postgres_client::InterpretedFormat::Protobuf,
compression: Some(utils::postgres_client::Compression::Zstd { level: 1 }),
};
*guard = Some(WalReceiver::start(
Arc::clone(self),
WalReceiverConf {
protocol,
protocol: self.resolve_wal_receiver_protocol(),
wal_connect_timeout,
lagging_wal_timeout,
max_lsn_wal_lag,
@@ -3244,7 +3234,7 @@ impl Timeline {
/// Initialize with an empty layer map. Used when creating a new timeline.
pub(super) fn init_empty_layer_map(&self, start_lsn: Lsn) {
let mut layers = self.layers.try_write(LayerManagerLockHolder::Init).expect(
let mut layers = self.layers.try_write().expect(
"in the context where we call this function, no other task has access to the object",
);
layers
@@ -3264,10 +3254,7 @@ impl Timeline {
use init::Decision::*;
use init::{Discovered, DismissedLayer};
let mut guard = self
.layers
.write(LayerManagerLockHolder::LoadLayerMap)
.await;
let mut guard = self.layers.write().await;
let timer = self.metrics.load_layer_map_histo.start_timer();
@@ -3884,10 +3871,7 @@ impl Timeline {
&self,
layer_name: &LayerName,
) -> Result<Option<Layer>, layer_manager::Shutdown> {
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = self.layers.read().await;
let layer = guard
.layer_map()?
.iter_historic_layers()
@@ -3920,10 +3904,7 @@ impl Timeline {
return None;
}
let guard = self
.layers
.read(LayerManagerLockHolder::GenerateHeatmap)
.await;
let guard = self.layers.read().await;
// Firstly, if there's any heatmap left over from when this location
// was a secondary, take that into account. Keep layers that are:
@@ -4021,10 +4002,7 @@ impl Timeline {
}
pub(super) async fn generate_unarchival_heatmap(&self, end_lsn: Lsn) -> PreviousHeatmap {
let guard = self
.layers
.read(LayerManagerLockHolder::GenerateHeatmap)
.await;
let guard = self.layers.read().await;
let now = SystemTime::now();
let mut heatmap_layers = Vec::default();
@@ -4366,7 +4344,7 @@ impl Timeline {
query: &VersionedKeySpaceQuery,
) -> Result<LayerFringe, GetVectoredError> {
let mut fringe = LayerFringe::new();
let guard = self.layers.read(LayerManagerLockHolder::GetPage).await;
let guard = self.layers.read().await;
match query {
VersionedKeySpaceQuery::Uniform { keyspace, lsn } => {
@@ -4469,7 +4447,7 @@ impl Timeline {
// required for correctness, but avoids visiting extra layers
// which turns out to be a perf bottleneck in some cases.
if !unmapped_keyspace.is_empty() {
let guard = timeline.layers.read(LayerManagerLockHolder::GetPage).await;
let guard = timeline.layers.read().await;
guard.update_search_fringe(&unmapped_keyspace, cont_lsn, &mut fringe)?;
// It's safe to drop the layer map lock after planning the next round of reads.
@@ -4579,10 +4557,7 @@ impl Timeline {
_guard: &tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
ctx: &RequestContext,
) -> anyhow::Result<Arc<InMemoryLayer>> {
let mut guard = self
.layers
.write(LayerManagerLockHolder::GetLayerForWrite)
.await;
let mut guard = self.layers.write().await;
let last_record_lsn = self.get_last_record_lsn();
ensure!(
@@ -4624,10 +4599,7 @@ impl Timeline {
write_lock: &mut tokio::sync::MutexGuard<'_, Option<TimelineWriterState>>,
) -> Result<u64, FlushLayerError> {
let frozen = {
let mut guard = self
.layers
.write(LayerManagerLockHolder::TryFreezeLayer)
.await;
let mut guard = self.layers.write().await;
guard
.open_mut()?
.try_freeze_in_memory_layer(at, &self.last_freeze_at, write_lock, &self.metrics)
@@ -4668,12 +4640,7 @@ impl Timeline {
ctx: &RequestContext,
) {
// Subscribe to L0 delta layer updates, for compaction backpressure.
let mut watch_l0 = match self
.layers
.read(LayerManagerLockHolder::FlushLoop)
.await
.layer_map()
{
let mut watch_l0 = match self.layers.read().await.layer_map() {
Ok(lm) => lm.watch_level0_deltas(),
Err(Shutdown) => return,
};
@@ -4710,7 +4677,7 @@ impl Timeline {
// Fetch the next layer to flush, if any.
let (layer, l0_count, frozen_count, frozen_size) = {
let layers = self.layers.read(LayerManagerLockHolder::FlushLoop).await;
let layers = self.layers.read().await;
let Ok(lm) = layers.layer_map() else {
info!("dropping out of flush loop for timeline shutdown");
return;
@@ -5006,10 +4973,7 @@ impl Timeline {
// in-memory layer from the map now. The flushed layer is stored in
// the mapping in `create_delta_layer`.
{
let mut guard = self
.layers
.write(LayerManagerLockHolder::FlushFrozenLayer)
.await;
let mut guard = self.layers.write().await;
guard.open_mut()?.finish_flush_l0_layer(
delta_layer_to_add.as_ref(),
@@ -5224,7 +5188,7 @@ impl Timeline {
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
let threshold = self.get_image_creation_threshold();
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let guard = self.layers.read().await;
let Ok(layers) = guard.layer_map() else {
return false;
};
@@ -5642,7 +5606,7 @@ impl Timeline {
if let ImageLayerCreationMode::Force = mode {
// When forced to create image layers, we might try and create them where they already
// exist. This mode is only used in tests/debug.
let layers = self.layers.read(LayerManagerLockHolder::Compaction).await;
let layers = self.layers.read().await;
if layers.contains_key(&PersistentLayerKey {
key_range: img_range.clone(),
lsn_range: PersistentLayerDesc::image_layer_lsn_range(lsn),
@@ -5767,7 +5731,7 @@ impl Timeline {
let image_layers = batch_image_writer.finish(self, ctx).await?;
let mut guard = self.layers.write(LayerManagerLockHolder::Compaction).await;
let mut guard = self.layers.write().await;
// FIXME: we could add the images to be uploaded *before* returning from here, but right
// now they are being scheduled outside of write lock; current way is inconsistent with
@@ -5775,7 +5739,7 @@ impl Timeline {
guard
.open_mut()?
.track_new_image_layers(&image_layers, &self.metrics);
drop_layer_manager_wlock(guard);
drop_wlock(guard);
let duration = timer.stop_and_record();
// Creating image layers may have caused some previously visible layers to be covered
@@ -6145,7 +6109,7 @@ impl Timeline {
layers_to_remove: &[Layer],
) -> Result<(), CompactionError> {
let mut guard = tokio::select! {
guard = self.layers.write(LayerManagerLockHolder::Compaction) => guard,
guard = self.layers.write() => guard,
_ = self.cancel.cancelled() => {
return Err(CompactionError::ShuttingDown);
}
@@ -6194,7 +6158,7 @@ impl Timeline {
self.remote_client
.schedule_compaction_update(&remove_layers, new_deltas)?;
drop_layer_manager_wlock(guard);
drop_wlock(guard);
Ok(())
}
@@ -6204,7 +6168,7 @@ impl Timeline {
mut replace_layers: Vec<(Layer, ResidentLayer)>,
mut drop_layers: Vec<Layer>,
) -> Result<(), CompactionError> {
let mut guard = self.layers.write(LayerManagerLockHolder::Compaction).await;
let mut guard = self.layers.write().await;
// Trim our lists in case our caller (compaction) raced with someone else (GC) removing layers: we want
// to avoid double-removing, and avoid rewriting something that was removed.
@@ -6555,10 +6519,7 @@ impl Timeline {
// 5. newer on-disk image layers cover the layer's whole key range
//
// TODO holding a write lock is too agressive and avoidable
let mut guard = self
.layers
.write(LayerManagerLockHolder::GarbageCollection)
.await;
let mut guard = self.layers.write().await;
let layers = guard.layer_map()?;
'outer: for l in layers.iter_historic_layers() {
result.layers_total += 1;
@@ -6860,10 +6821,7 @@ impl Timeline {
use pageserver_api::models::DownloadRemoteLayersTaskState;
let remaining = {
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = self.layers.read().await;
let Ok(lm) = guard.layer_map() else {
// technically here we could look into iterating accessible layers, but downloading
// all layers of a shutdown timeline makes no sense regardless.
@@ -6969,7 +6927,7 @@ impl Timeline {
impl Timeline {
/// Returns non-remote layers for eviction.
pub(crate) async fn get_local_layers_for_disk_usage_eviction(&self) -> DiskUsageEvictionInfo {
let guard = self.layers.read(LayerManagerLockHolder::Eviction).await;
let guard = self.layers.read().await;
let mut max_layer_size: Option<u64> = None;
let resident_layers = guard
@@ -7070,7 +7028,7 @@ impl Timeline {
let image_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created image layer {}", image_layer.local_path());
{
let mut guard = self.layers.write(LayerManagerLockHolder::Testing).await;
let mut guard = self.layers.write().await;
guard
.open_mut()
.unwrap()
@@ -7133,7 +7091,7 @@ impl Timeline {
let delta_layer = Layer::finish_creating(self.conf, self, desc, &path)?;
info!("force created delta layer {}", delta_layer.local_path());
{
let mut guard = self.layers.write(LayerManagerLockHolder::Testing).await;
let mut guard = self.layers.write().await;
guard
.open_mut()
.unwrap()
@@ -7228,7 +7186,7 @@ impl Timeline {
// Link the layer to the layer map
{
let mut guard = self.layers.write(LayerManagerLockHolder::Testing).await;
let mut guard = self.layers.write().await;
let layer_map = guard.open_mut().unwrap();
layer_map.force_insert_in_memory_layer(Arc::new(layer));
}
@@ -7245,7 +7203,7 @@ impl Timeline {
io_concurrency: IoConcurrency,
) -> anyhow::Result<Vec<(Key, Bytes)>> {
let mut all_data = Vec::new();
let guard = self.layers.read(LayerManagerLockHolder::Testing).await;
let guard = self.layers.read().await;
for layer in guard.layer_map()?.iter_historic_layers() {
if !layer.is_delta() && layer.image_layer_lsn() == lsn {
let layer = guard.get_from_desc(&layer);
@@ -7274,7 +7232,7 @@ impl Timeline {
self: &Arc<Timeline>,
) -> anyhow::Result<Vec<super::storage_layer::PersistentLayerKey>> {
let mut layers = Vec::new();
let guard = self.layers.read(LayerManagerLockHolder::Testing).await;
let guard = self.layers.read().await;
for layer in guard.layer_map()?.iter_historic_layers() {
layers.push(layer.key());
}
@@ -7386,7 +7344,7 @@ impl TimelineWriter<'_> {
let l0_count = self
.tl
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.read()
.await
.layer_map()?
.level0_deltas()
@@ -7605,7 +7563,6 @@ mod tests {
use crate::tenant::harness::{TenantHarness, test_img};
use crate::tenant::layer_map::LayerMap;
use crate::tenant::storage_layer::{Layer, LayerName, LayerVisibilityHint};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::timeline::{DeltaLayerTestDesc, EvictionError};
use crate::tenant::{PreviousHeatmap, Timeline};
@@ -7713,7 +7670,7 @@ mod tests {
// Evict all the layers and stash the old heatmap in the timeline.
// This simulates a migration to a cold secondary location.
let guard = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let guard = timeline.layers.read().await;
let mut all_layers = Vec::new();
let forever = std::time::Duration::from_secs(120);
for layer in guard.likely_resident_layers() {
@@ -7835,7 +7792,7 @@ mod tests {
})));
// Evict all the layers in the previous heatmap
let guard = timeline.layers.read(LayerManagerLockHolder::Testing).await;
let guard = timeline.layers.read().await;
let forever = std::time::Duration::from_secs(120);
for layer in guard.likely_resident_layers() {
layer.evict_and_wait(forever).await.unwrap();
@@ -7898,10 +7855,7 @@ mod tests {
}
async fn find_some_layer(timeline: &Timeline) -> Layer {
let layers = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let layers = timeline.layers.read().await;
let desc = layers
.layer_map()
.unwrap()

View File

@@ -4,7 +4,6 @@ use std::ops::Range;
use utils::lsn::Lsn;
use super::Timeline;
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
#[derive(serde::Serialize)]
pub(crate) struct RangeAnalysis {
@@ -25,10 +24,7 @@ impl Timeline {
let num_of_l0;
let all_layer_files = {
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = self.layers.read().await;
num_of_l0 = guard.layer_map().unwrap().level0_deltas().len();
guard.all_persistent_layers()
};

View File

@@ -9,7 +9,7 @@ use std::ops::{Deref, Range};
use std::sync::Arc;
use std::time::{Duration, Instant};
use super::layer_manager::{LayerManagerLockHolder, LayerManagerReadGuard};
use super::layer_manager::LayerManager;
use super::{
CompactFlags, CompactOptions, CompactionError, CreateImageLayersError, DurationRecorder,
GetVectoredError, ImageLayerCreationMode, LastImageLayerCreationStatus, RecordedDuration,
@@ -62,7 +62,7 @@ use crate::tenant::storage_layer::{
use crate::tenant::tasks::log_compaction_error;
use crate::tenant::timeline::{
DeltaLayerWriter, ImageLayerCreationOutcome, ImageLayerWriter, IoConcurrency, Layer,
ResidentLayer, drop_layer_manager_rlock,
ResidentLayer, drop_rlock,
};
use crate::tenant::{DeltaLayer, MaybeOffloaded};
use crate::virtual_file::{MaybeFatalIo, VirtualFile};
@@ -314,10 +314,7 @@ impl GcCompactionQueue {
.unwrap_or(Lsn::INVALID);
let layers = {
let guard = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = timeline.layers.read().await;
let layer_map = guard.layer_map()?;
layer_map.iter_historic_layers().collect_vec()
};
@@ -411,10 +408,7 @@ impl GcCompactionQueue {
timeline: &Arc<Timeline>,
lsn: Lsn,
) -> Result<u64, CompactionError> {
let guard = timeline
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = timeline.layers.read().await;
let layer_map = guard.layer_map()?;
let layers = layer_map.iter_historic_layers().collect_vec();
let mut size = 0;
@@ -857,7 +851,7 @@ impl KeyHistoryRetention {
}
let layer_generation;
{
let guard = tline.layers.read(LayerManagerLockHolder::Compaction).await;
let guard = tline.layers.read().await;
if !guard.contains_key(key) {
return false;
}
@@ -1288,10 +1282,7 @@ impl Timeline {
// We do the repartition on the L0-L1 boundary. All data below the boundary
// are compacted by L0 with low read amplification, thus making the `repartition`
// function run fast.
let guard = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let guard = self.layers.read().await;
guard
.all_persistent_layers()
.iter()
@@ -1470,7 +1461,7 @@ impl Timeline {
let latest_gc_cutoff = self.get_applied_gc_cutoff_lsn();
let pitr_cutoff = self.gc_info.read().unwrap().cutoffs.time;
let layers = self.layers.read(LayerManagerLockHolder::Compaction).await;
let layers = self.layers.read().await;
let layers_iter = layers.layer_map()?.iter_historic_layers();
let (layers_total, mut layers_checked) = (layers_iter.len(), 0);
for layer_desc in layers_iter {
@@ -1731,10 +1722,7 @@ impl Timeline {
// are implicitly left visible, because LayerVisibilityHint's default is Visible, and we never modify it here.
// Note that L0 deltas _can_ be covered by image layers, but we consider them 'visible' because we anticipate that
// they will be subject to L0->L1 compaction in the near future.
let layer_manager = self
.layers
.read(LayerManagerLockHolder::GetLayerMapInfo)
.await;
let layer_manager = self.layers.read().await;
let layer_map = layer_manager.layer_map()?;
let readable_points = {
@@ -1787,7 +1775,7 @@ impl Timeline {
};
let begin = tokio::time::Instant::now();
let phase1_layers_locked = self.layers.read(LayerManagerLockHolder::Compaction).await;
let phase1_layers_locked = self.layers.read().await;
let now = tokio::time::Instant::now();
stats.read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
@@ -1815,7 +1803,7 @@ impl Timeline {
/// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
async fn compact_level0_phase1<'a>(
self: &'a Arc<Self>,
guard: LayerManagerReadGuard<'a>,
guard: tokio::sync::RwLockReadGuard<'a, LayerManager>,
mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64,
force_compaction_ignore_threshold: bool,
@@ -2041,7 +2029,7 @@ impl Timeline {
holes
};
stats.read_lock_held_compute_holes_micros = stats.read_lock_held_key_sort_micros.till_now();
drop_layer_manager_rlock(guard);
drop_rlock(guard);
if self.cancel.is_cancelled() {
return Err(CompactionError::ShuttingDown);
@@ -2481,7 +2469,7 @@ impl Timeline {
// Find the top of the historical layers
let end_lsn = {
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let guard = self.layers.read().await;
let layers = guard.layer_map()?;
let l0_deltas = layers.level0_deltas();
@@ -3020,7 +3008,7 @@ impl Timeline {
}
split_key_ranges.sort();
let all_layers = {
let guard = self.layers.read(LayerManagerLockHolder::Compaction).await;
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
layer_map.iter_historic_layers().collect_vec()
};
@@ -3124,12 +3112,12 @@ impl Timeline {
.await?;
let jobs_len = jobs.len();
for (idx, job) in jobs.into_iter().enumerate() {
let sub_compaction_progress = format!("{}/{}", idx + 1, jobs_len);
info!(
"running enhanced gc bottom-most compaction, sub-compaction {}/{}",
idx + 1,
jobs_len
);
self.compact_with_gc_inner(cancel, job, ctx, yield_for_l0)
.instrument(info_span!(
"sub_compaction",
sub_compaction_progress = sub_compaction_progress
))
.await?;
}
if jobs_len == 0 {
@@ -3197,10 +3185,7 @@ impl Timeline {
// 1. If a layer is in the selection, all layers below it are in the selection.
// 2. Inferred from (1), for each key in the layer selection, the value can be reconstructed only with the layers in the layer selection.
let job_desc = {
let guard = self
.layers
.read(LayerManagerLockHolder::GarbageCollection)
.await;
let guard = self.layers.read().await;
let layers = guard.layer_map()?;
let gc_info = self.gc_info.read().unwrap();
let mut retain_lsns_below_horizon = Vec::new();
@@ -3971,10 +3956,7 @@ impl Timeline {
// First, do a sanity check to ensure the newly-created layer map does not contain overlaps.
let all_layers = {
let guard = self
.layers
.read(LayerManagerLockHolder::GarbageCollection)
.await;
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
layer_map.iter_historic_layers().collect_vec()
};
@@ -4038,10 +4020,7 @@ impl Timeline {
let update_guard = self.gc_compaction_layer_update_lock.write().await;
// Acquiring the update guard ensures current read operations end and new read operations are blocked.
// TODO: can we use `latest_gc_cutoff` Rcu to achieve the same effect?
let mut guard = self
.layers
.write(LayerManagerLockHolder::GarbageCollection)
.await;
let mut guard = self.layers.write().await;
guard
.open_mut()?
.finish_gc_compaction(&layer_selection, &compact_to, &self.metrics);
@@ -4109,11 +4088,7 @@ impl TimelineAdaptor {
pub async fn flush_updates(&mut self) -> Result<(), CompactionError> {
let layers_to_delete = {
let guard = self
.timeline
.layers
.read(LayerManagerLockHolder::Compaction)
.await;
let guard = self.timeline.layers.read().await;
self.layers_to_delete
.iter()
.map(|x| guard.get_from_desc(x))
@@ -4158,11 +4133,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
) -> anyhow::Result<Vec<OwnArc<PersistentLayerDesc>>> {
self.flush_updates().await?;
let guard = self
.timeline
.layers
.read(LayerManagerLockHolder::Compaction)
.await;
let guard = self.timeline.layers.read().await;
let layer_map = guard.layer_map()?;
let result = layer_map
@@ -4201,11 +4172,7 @@ impl CompactionJobExecutor for TimelineAdaptor {
// this is a lot more complex than a simple downcast...
if layer.is_delta() {
let l = {
let guard = self
.timeline
.layers
.read(LayerManagerLockHolder::Compaction)
.await;
let guard = self.timeline.layers.read().await;
guard.get_from_desc(layer)
};
let result = l.download_and_keep_resident(ctx).await?;

View File

@@ -19,7 +19,7 @@ use utils::id::TimelineId;
use utils::lsn::Lsn;
use utils::sync::gate::GateError;
use super::layer_manager::{LayerManager, LayerManagerLockHolder};
use super::layer_manager::LayerManager;
use super::{FlushLayerError, Timeline};
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::TaskKind;
@@ -199,10 +199,7 @@ pub(crate) async fn generate_tombstone_image_layer(
let image_lsn = ancestor_lsn;
{
let layers = detached
.layers
.read(LayerManagerLockHolder::DetachAncestor)
.await;
let layers = detached.layers.read().await;
for layer in layers.all_persistent_layers() {
if !layer.is_delta
&& layer.lsn_range.start == image_lsn
@@ -426,7 +423,7 @@ pub(super) async fn prepare(
// we do not need to start from our layers, because they can only be layers that come
// *after* ancestor_lsn
let layers = tokio::select! {
guard = ancestor.layers.read(LayerManagerLockHolder::DetachAncestor) => guard,
guard = ancestor.layers.read() => guard,
_ = detached.cancel.cancelled() => {
return Err(ShuttingDown);
}
@@ -872,12 +869,7 @@ async fn remote_copy(
// Double check that the file is orphan (probably from an earlier attempt), then delete it
let key = file_name.clone().into();
if adoptee
.layers
.read(LayerManagerLockHolder::DetachAncestor)
.await
.contains_key(&key)
{
if adoptee.layers.read().await.contains_key(&key) {
// We are supposed to filter out such cases before coming to this function
return Err(Error::Prepare(anyhow::anyhow!(
"layer file {file_name} already present and inside layer map"

View File

@@ -33,7 +33,6 @@ use crate::tenant::size::CalculateSyntheticSizeError;
use crate::tenant::storage_layer::LayerVisibilityHint;
use crate::tenant::tasks::{BackgroundLoopKind, BackgroundLoopSemaphorePermit, sleep_random};
use crate::tenant::timeline::EvictionError;
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
use crate::tenant::{LogicalSizeCalculationCause, TenantShard};
#[derive(Default)]
@@ -209,7 +208,7 @@ impl Timeline {
let mut js = tokio::task::JoinSet::new();
{
let guard = self.layers.read(LayerManagerLockHolder::Eviction).await;
let guard = self.layers.read().await;
guard
.likely_resident_layers()

View File

@@ -15,7 +15,6 @@ use super::{Timeline, TimelineDeleteProgress};
use crate::context::RequestContext;
use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
use crate::tenant::metadata::TimelineMetadata;
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
mod flow;
mod importbucket_client;
@@ -164,10 +163,7 @@ async fn prepare_import(
info!("wipe the slate clean");
{
// TODO: do we need to hold GC lock for this?
let mut guard = timeline
.layers
.write(LayerManagerLockHolder::ImportPgData)
.await;
let mut guard = timeline.layers.write().await;
assert!(
guard.layer_map()?.open_layer.is_none(),
"while importing, there should be no in-memory layer" // this just seems like a good place to assert it

View File

@@ -56,7 +56,6 @@ use crate::pgdatadir_mapping::{
};
use crate::task_mgr::TaskKind;
use crate::tenant::storage_layer::{AsLayerDesc, ImageLayerWriter, Layer};
use crate::tenant::timeline::layer_manager::LayerManagerLockHolder;
pub async fn run(
timeline: Arc<Timeline>,
@@ -101,7 +100,6 @@ async fn run_v1(
.unwrap(),
import_job_concurrency: base.import_job_concurrency,
import_job_checkpoint_threshold: base.import_job_checkpoint_threshold,
import_job_max_byte_range_size: base.import_job_max_byte_range_size,
}
}
None => timeline.conf.timeline_import_config.clone(),
@@ -443,7 +441,6 @@ impl Plan {
let mut last_completed_job_idx = start_after_job_idx.unwrap_or(0);
let checkpoint_every: usize = import_config.import_job_checkpoint_threshold.into();
let max_byte_range_size: usize = import_config.import_job_max_byte_range_size.into();
// Run import jobs concurrently up to the limit specified by the pageserver configuration.
// Note that we process completed futures in the oreder of insertion. This will be the
@@ -459,7 +456,7 @@ impl Plan {
work.push_back(tokio::task::spawn(async move {
let _permit = permit;
let res = job.run(job_timeline, max_byte_range_size, &ctx).await;
let res = job.run(job_timeline, &ctx).await;
(job_idx, res)
}));
},
@@ -682,7 +679,6 @@ trait ImportTask {
async fn doit(
self,
layer_writer: &mut ImageLayerWriter,
max_byte_range_size: usize,
ctx: &RequestContext,
) -> anyhow::Result<usize>;
}
@@ -719,7 +715,6 @@ impl ImportTask for ImportSingleKeyTask {
async fn doit(
self,
layer_writer: &mut ImageLayerWriter,
_max_byte_range_size: usize,
ctx: &RequestContext,
) -> anyhow::Result<usize> {
layer_writer.put_image(self.key, self.buf, ctx).await?;
@@ -773,9 +768,10 @@ impl ImportTask for ImportRelBlocksTask {
async fn doit(
self,
layer_writer: &mut ImageLayerWriter,
max_byte_range_size: usize,
ctx: &RequestContext,
) -> anyhow::Result<usize> {
const MAX_BYTE_RANGE_SIZE: usize = 4 * 1024 * 1024;
debug!("Importing relation file");
let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?;
@@ -800,7 +796,7 @@ impl ImportTask for ImportRelBlocksTask {
assert_eq!(key.len(), 1);
assert!(!acc.is_empty());
assert!(acc_end > acc_start);
if acc_end == start && end - acc_start <= max_byte_range_size {
if acc_end == start && end - acc_start <= MAX_BYTE_RANGE_SIZE {
acc.push(key.pop().unwrap());
Ok((acc, acc_start, end))
} else {
@@ -864,7 +860,6 @@ impl ImportTask for ImportSlruBlocksTask {
async fn doit(
self,
layer_writer: &mut ImageLayerWriter,
_max_byte_range_size: usize,
ctx: &RequestContext,
) -> anyhow::Result<usize> {
debug!("Importing SLRU segment file {}", self.path);
@@ -911,13 +906,12 @@ impl ImportTask for AnyImportTask {
async fn doit(
self,
layer_writer: &mut ImageLayerWriter,
max_byte_range_size: usize,
ctx: &RequestContext,
) -> anyhow::Result<usize> {
match self {
Self::SingleKey(t) => t.doit(layer_writer, max_byte_range_size, ctx).await,
Self::RelBlocks(t) => t.doit(layer_writer, max_byte_range_size, ctx).await,
Self::SlruBlocks(t) => t.doit(layer_writer, max_byte_range_size, ctx).await,
Self::SingleKey(t) => t.doit(layer_writer, ctx).await,
Self::RelBlocks(t) => t.doit(layer_writer, ctx).await,
Self::SlruBlocks(t) => t.doit(layer_writer, ctx).await,
}
}
}
@@ -958,12 +952,7 @@ impl ChunkProcessingJob {
}
}
async fn run(
self,
timeline: Arc<Timeline>,
max_byte_range_size: usize,
ctx: &RequestContext,
) -> anyhow::Result<()> {
async fn run(self, timeline: Arc<Timeline>, ctx: &RequestContext) -> anyhow::Result<()> {
let mut writer = ImageLayerWriter::new(
timeline.conf,
timeline.timeline_id,
@@ -978,17 +967,14 @@ impl ChunkProcessingJob {
let mut nimages = 0;
for task in self.tasks {
nimages += task.doit(&mut writer, max_byte_range_size, ctx).await?;
nimages += task.doit(&mut writer, ctx).await?;
}
let resident_layer = if nimages > 0 {
let (desc, path) = writer.finish(ctx).await?;
{
let guard = timeline
.layers
.read(LayerManagerLockHolder::ImportPgData)
.await;
let guard = timeline.layers.read().await;
let existing_layer = guard.try_get_from_key(&desc.key());
if let Some(layer) = existing_layer {
if layer.metadata().generation == timeline.generation {
@@ -1011,10 +997,7 @@ impl ChunkProcessingJob {
// certain that the existing layer is identical to the new one, so in that case
// we replace the old layer with the one we just generated.
let mut guard = timeline
.layers
.write(LayerManagerLockHolder::ImportPgData)
.await;
let mut guard = timeline.layers.write().await;
let existing_layer = guard
.try_get_from_key(&resident_layer.layer_desc().key())
@@ -1043,7 +1026,7 @@ impl ChunkProcessingJob {
}
}
crate::tenant::timeline::drop_layer_manager_wlock(guard);
crate::tenant::timeline::drop_wlock(guard);
timeline
.remote_client

View File

@@ -1,8 +1,5 @@
use std::collections::HashMap;
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, bail, ensure};
use itertools::Itertools;
@@ -23,155 +20,6 @@ use crate::tenant::storage_layer::{
PersistentLayerKey, ReadableLayerWeak, ResidentLayer,
};
/// Warn if the lock was held for longer than this threshold.
/// It's very generous and we should bring this value down over time.
const LAYER_MANAGER_LOCK_WARN_THRESHOLD: Duration = Duration::from_secs(5);
const LAYER_MANAGER_LOCK_READ_WARN_THRESHOLD: Duration = Duration::from_secs(30);
/// Describes the operation that is holding the layer manager lock
#[derive(Debug, Clone, Copy, strum_macros::Display)]
#[strum(serialize_all = "kebab_case")]
pub(crate) enum LayerManagerLockHolder {
GetLayerMapInfo,
GenerateHeatmap,
GetPage,
Init,
LoadLayerMap,
GetLayerForWrite,
TryFreezeLayer,
FlushFrozenLayer,
FlushLoop,
Compaction,
GarbageCollection,
Shutdown,
ImportPgData,
DetachAncestor,
Eviction,
#[cfg(test)]
Testing,
}
/// Wrapper for the layer manager that tracks the amount of time during which
/// it was held under read or write lock
#[derive(Default)]
pub(crate) struct LockedLayerManager {
locked: tokio::sync::RwLock<LayerManager>,
}
pub(crate) struct LayerManagerReadGuard<'a> {
guard: ManuallyDrop<tokio::sync::RwLockReadGuard<'a, LayerManager>>,
acquired_at: std::time::Instant,
holder: LayerManagerLockHolder,
}
pub(crate) struct LayerManagerWriteGuard<'a> {
guard: ManuallyDrop<tokio::sync::RwLockWriteGuard<'a, LayerManager>>,
acquired_at: std::time::Instant,
holder: LayerManagerLockHolder,
}
impl Drop for LayerManagerReadGuard<'_> {
fn drop(&mut self) {
// Drop the lock first, before potentially warning if it was held for too long.
// SAFETY: ManuallyDrop in Drop implementation
unsafe { ManuallyDrop::drop(&mut self.guard) };
let held_for = self.acquired_at.elapsed();
if held_for >= LAYER_MANAGER_LOCK_READ_WARN_THRESHOLD {
tracing::warn!(
holder=%self.holder,
"Layer manager read lock held for {}s",
held_for.as_secs_f64(),
);
}
}
}
impl Drop for LayerManagerWriteGuard<'_> {
fn drop(&mut self) {
// Drop the lock first, before potentially warning if it was held for too long.
// SAFETY: ManuallyDrop in Drop implementation
unsafe { ManuallyDrop::drop(&mut self.guard) };
let held_for = self.acquired_at.elapsed();
if held_for >= LAYER_MANAGER_LOCK_WARN_THRESHOLD {
tracing::warn!(
holder=%self.holder,
"Layer manager write lock held for {}s",
held_for.as_secs_f64(),
);
}
}
}
impl Deref for LayerManagerReadGuard<'_> {
type Target = LayerManager;
fn deref(&self) -> &Self::Target {
self.guard.deref()
}
}
impl Deref for LayerManagerWriteGuard<'_> {
type Target = LayerManager;
fn deref(&self) -> &Self::Target {
self.guard.deref()
}
}
impl DerefMut for LayerManagerWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.guard.deref_mut()
}
}
impl LockedLayerManager {
pub(crate) async fn read(&self, holder: LayerManagerLockHolder) -> LayerManagerReadGuard {
let guard = ManuallyDrop::new(self.locked.read().await);
LayerManagerReadGuard {
guard,
acquired_at: std::time::Instant::now(),
holder,
}
}
pub(crate) fn try_read(
&self,
holder: LayerManagerLockHolder,
) -> Result<LayerManagerReadGuard, tokio::sync::TryLockError> {
let guard = ManuallyDrop::new(self.locked.try_read()?);
Ok(LayerManagerReadGuard {
guard,
acquired_at: std::time::Instant::now(),
holder,
})
}
pub(crate) async fn write(&self, holder: LayerManagerLockHolder) -> LayerManagerWriteGuard {
let guard = ManuallyDrop::new(self.locked.write().await);
LayerManagerWriteGuard {
guard,
acquired_at: std::time::Instant::now(),
holder,
}
}
pub(crate) fn try_write(
&self,
holder: LayerManagerLockHolder,
) -> Result<LayerManagerWriteGuard, tokio::sync::TryLockError> {
let guard = ManuallyDrop::new(self.locked.try_write()?);
Ok(LayerManagerWriteGuard {
guard,
acquired_at: std::time::Instant::now(),
holder,
})
}
}
/// Provides semantic APIs to manipulate the layer map.
pub(crate) enum LayerManager {
/// Open as in not shutdown layer manager; we still have in-memory layers and we can manipulate

View File

@@ -32,7 +32,9 @@ use utils::backoff::{
};
use utils::id::{NodeId, TenantTimelineId};
use utils::lsn::Lsn;
use utils::postgres_client::{ConnectionConfigArgs, wal_stream_connection_config};
use utils::postgres_client::{
ConnectionConfigArgs, PostgresClientProtocol, wal_stream_connection_config,
};
use super::walreceiver_connection::{WalConnectionStatus, WalReceiverError};
use super::{TaskEvent, TaskHandle, TaskStateUpdate, WalReceiverConf};
@@ -989,12 +991,19 @@ impl ConnectionManagerState {
return None; // no connection string, ignore sk
}
let shard_identity = self.timeline.get_shard_identity();
let (shard_number, shard_count, shard_stripe_size) = (
Some(shard_identity.number.0),
Some(shard_identity.count.0),
Some(shard_identity.stripe_size.0),
);
let (shard_number, shard_count, shard_stripe_size) = match self.conf.protocol {
PostgresClientProtocol::Vanilla => {
(None, None, None)
},
PostgresClientProtocol::Interpreted { .. } => {
let shard_identity = self.timeline.get_shard_identity();
(
Some(shard_identity.number.0),
Some(shard_identity.count.0),
Some(shard_identity.stripe_size.0),
)
}
};
let connection_conf_args = ConnectionConfigArgs {
protocol: self.conf.protocol,
@@ -1111,8 +1120,8 @@ impl ReconnectReason {
#[cfg(test)]
mod tests {
use pageserver_api::config::defaults::DEFAULT_WAL_RECEIVER_PROTOCOL;
use url::Host;
use utils::postgres_client::PostgresClientProtocol;
use super::*;
use crate::tenant::harness::{TIMELINE_ID, TenantHarness};
@@ -1543,11 +1552,6 @@ mod tests {
.await
.expect("Failed to create an empty timeline for dummy wal connection manager");
let protocol = PostgresClientProtocol::Interpreted {
format: utils::postgres_client::InterpretedFormat::Protobuf,
compression: Some(utils::postgres_client::Compression::Zstd { level: 1 }),
};
ConnectionManagerState {
id: TenantTimelineId {
tenant_id: harness.tenant_shard_id.tenant_id,
@@ -1556,7 +1560,7 @@ mod tests {
timeline,
cancel: CancellationToken::new(),
conf: WalReceiverConf {
protocol,
protocol: DEFAULT_WAL_RECEIVER_PROTOCOL,
wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1),
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),

View File

@@ -15,7 +15,7 @@ use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::WAL_SEGMENT_SIZE;
use postgres_ffi::v14::xlog_utils::normalize_lsn;
use postgres_ffi::waldecoder::WalDecodeError;
use postgres_ffi::waldecoder::{WalDecodeError, WalStreamDecoder};
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use tokio::sync::watch;
@@ -31,7 +31,7 @@ use utils::lsn::Lsn;
use utils::pageserver_feedback::PageserverFeedback;
use utils::postgres_client::PostgresClientProtocol;
use utils::sync::gate::GateError;
use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecords};
use wal_decoder::models::{FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords};
use wal_decoder::wire_format::FromWireFormat;
use super::TaskStateUpdate;
@@ -275,6 +275,8 @@ pub(super) async fn handle_walreceiver_connection(
let copy_stream = replication_client.copy_both_simple(&query).await?;
let mut physical_stream = pin!(ReplicationStream::new(copy_stream));
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint, &ctx)
.await
.map_err(|e| match e.kind {
@@ -282,16 +284,14 @@ pub(super) async fn handle_walreceiver_connection(
_ => WalReceiverError::Other(e.into()),
})?;
let (format, compression) = match protocol {
let shard = vec![*timeline.get_shard_identity()];
let interpreted_proto_config = match protocol {
PostgresClientProtocol::Vanilla => None,
PostgresClientProtocol::Interpreted {
format,
compression,
} => (format, compression),
PostgresClientProtocol::Vanilla => {
return Err(WalReceiverError::Other(anyhow!(
"Vanilla WAL receiver protocol is no longer supported for ingest"
)));
}
} => Some((format, compression)),
};
let mut expected_wal_start = startpoint;
@@ -313,6 +313,16 @@ pub(super) async fn handle_walreceiver_connection(
// Update the connection status before processing the message. If the message processing
// fails (e.g. in walingest), we still want to know latests LSNs from the safekeeper.
match &replication_message {
ReplicationMessage::XLogData(xlog_data) => {
connection_status.latest_connection_update = now;
connection_status.commit_lsn = Some(Lsn::from(xlog_data.wal_end()));
connection_status.streaming_lsn = Some(Lsn::from(
xlog_data.wal_start() + xlog_data.data().len() as u64,
));
if !xlog_data.data().is_empty() {
connection_status.latest_wal_update = now;
}
}
ReplicationMessage::PrimaryKeepAlive(keepalive) => {
connection_status.latest_connection_update = now;
connection_status.commit_lsn = Some(Lsn::from(keepalive.wal_end()));
@@ -343,6 +353,7 @@ pub(super) async fn handle_walreceiver_connection(
// were interpreted.
let streaming_lsn = Lsn::from(raw.streaming_lsn());
let (format, compression) = interpreted_proto_config.unwrap();
let batch = InterpretedWalRecords::from_wire(raw.data(), format, compression)
.await
.with_context(|| {
@@ -498,6 +509,138 @@ pub(super) async fn handle_walreceiver_connection(
Some(streaming_lsn)
}
ReplicationMessage::XLogData(xlog_data) => {
async fn commit(
modification: &mut DatadirModification<'_>,
uncommitted: &mut u64,
filtered: &mut u64,
ctx: &RequestContext,
) -> anyhow::Result<()> {
let stats = modification.stats();
modification.commit(ctx).await?;
WAL_INGEST
.records_committed
.inc_by(*uncommitted - *filtered);
WAL_INGEST.inc_values_committed(&stats);
*uncommitted = 0;
*filtered = 0;
Ok(())
}
// Pass the WAL data to the decoder, and see if we can decode
// more records as a result.
let data = xlog_data.data();
let startlsn = Lsn::from(xlog_data.wal_start());
let endlsn = startlsn + data.len() as u64;
trace!("received XLogData between {startlsn} and {endlsn}");
WAL_INGEST.bytes_received.inc_by(data.len() as u64);
waldecoder.feed_bytes(data);
{
let mut modification = timeline.begin_modification(startlsn);
let mut uncommitted_records = 0;
let mut filtered_records = 0;
while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
if !next_record_lsn.is_aligned() {
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
}
// Deserialize and interpret WAL record
let interpreted = InterpretedWalRecord::from_bytes_filtered(
recdata,
&shard,
next_record_lsn,
modification.tline.pg_version,
)?
.remove(timeline.get_shard_identity())
.unwrap();
if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes)
&& uncommitted_records > 0
{
// Special case: legacy PG database creations operate by reading pages from a 'template' database:
// these are the only kinds of WAL record that require reading data blocks while ingesting. Ensure
// all earlier writes of data blocks are visible by committing any modification in flight.
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
}
// Ingest the records without immediately committing them.
timeline.metrics.wal_records_received.inc();
let ingested = walingest
.ingest_record(interpreted, &mut modification, &ctx)
.await
.with_context(|| {
format!("could not ingest record at {next_record_lsn}")
})
.inspect_err(|err| {
// TODO: we can't differentiate cancellation errors with
// anyhow::Error, so just ignore it if we're cancelled.
if !cancellation.is_cancelled() && !timeline.is_stopping() {
critical!("{err:?}")
}
})?;
if !ingested {
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
WAL_INGEST.records_filtered.inc();
filtered_records += 1;
}
// FIXME: this cannot be made pausable_failpoint without fixing the
// failpoint library; in tests, the added amount of debugging will cause us
// to timeout the tests.
fail_point!("walreceiver-after-ingest");
last_rec_lsn = next_record_lsn;
// Commit every ingest_batch_size records. Even if we filtered out
// all records, we still need to call commit to advance the LSN.
uncommitted_records += 1;
if uncommitted_records >= ingest_batch_size
|| modification.approx_pending_bytes()
> DatadirModification::MAX_PENDING_BYTES
{
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
}
}
// Commit the remaining records.
if uncommitted_records > 0 {
commit(
&mut modification,
&mut uncommitted_records,
&mut filtered_records,
&ctx,
)
.await?;
}
}
if !caught_up && endlsn >= end_of_wal {
info!("caught up at LSN {endlsn}");
caught_up = true;
}
Some(endlsn)
}
ReplicationMessage::PrimaryKeepAlive(keepalive) => {
let wal_end = keepalive.wal_end();
let timestamp = keepalive.timestamp();

View File

@@ -1092,15 +1092,13 @@ communicator_prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
MyPState->ring_last <= ring_index);
}
/* Internal version. Returns the ring index of the last block (result of this function is used only
* when nblocks==1)
*/
/* internal version. Returns the ring index */
static uint64
prefetch_register_bufferv(BufferTag tag, neon_request_lsns *frlsns,
BlockNumber nblocks, const bits8 *mask,
bool is_prefetch)
{
uint64 last_ring_index;
uint64 min_ring_index;
PrefetchRequest hashkey;
#ifdef USE_ASSERT_CHECKING
bool any_hits = false;
@@ -1124,12 +1122,13 @@ Retry:
MyPState->ring_unused - MyPState->ring_receive;
MyNeonCounters->getpage_prefetches_buffered =
MyPState->n_responses_buffered;
last_ring_index = UINT64_MAX;
min_ring_index = UINT64_MAX;
for (int i = 0; i < nblocks; i++)
{
PrefetchRequest *slot = NULL;
PrfHashEntry *entry = NULL;
uint64 ring_index;
neon_request_lsns *lsns;
if (PointerIsValid(mask) && BITMAP_ISSET(mask, i))
@@ -1153,12 +1152,12 @@ Retry:
if (entry != NULL)
{
slot = entry->slot;
last_ring_index = slot->my_ring_index;
Assert(slot == GetPrfSlot(last_ring_index));
ring_index = slot->my_ring_index;
Assert(slot == GetPrfSlot(ring_index));
Assert(slot->status != PRFS_UNUSED);
Assert(MyPState->ring_last <= last_ring_index &&
last_ring_index < MyPState->ring_unused);
Assert(MyPState->ring_last <= ring_index &&
ring_index < MyPState->ring_unused);
Assert(BufferTagsEqual(&slot->buftag, &hashkey.buftag));
/*
@@ -1170,9 +1169,9 @@ Retry:
if (!neon_prefetch_response_usable(lsns, slot))
{
/* Wait for the old request to finish and discard it */
if (!prefetch_wait_for(last_ring_index))
if (!prefetch_wait_for(ring_index))
goto Retry;
prefetch_set_unused(last_ring_index);
prefetch_set_unused(ring_index);
entry = NULL;
slot = NULL;
pgBufferUsage.prefetch.expired += 1;
@@ -1189,12 +1188,13 @@ Retry:
*/
if (slot->status == PRFS_TAG_REMAINS)
{
prefetch_set_unused(last_ring_index);
prefetch_set_unused(ring_index);
entry = NULL;
slot = NULL;
}
else
{
min_ring_index = Min(min_ring_index, ring_index);
/* The buffered request is good enough, return that index */
if (is_prefetch)
pgBufferUsage.prefetch.duplicates++;
@@ -1283,12 +1283,12 @@ Retry:
* The next buffer pointed to by `ring_unused` is now definitely empty, so
* we can insert the new request to it.
*/
last_ring_index = MyPState->ring_unused;
ring_index = MyPState->ring_unused;
Assert(MyPState->ring_last <= last_ring_index &&
last_ring_index <= MyPState->ring_unused);
Assert(MyPState->ring_last <= ring_index &&
ring_index <= MyPState->ring_unused);
slot = GetPrfSlotNoCheck(last_ring_index);
slot = GetPrfSlotNoCheck(ring_index);
Assert(slot->status == PRFS_UNUSED);
@@ -1298,9 +1298,11 @@ Retry:
*/
slot->buftag = hashkey.buftag;
slot->shard_no = get_shard_number(&tag);
slot->my_ring_index = last_ring_index;
slot->my_ring_index = ring_index;
slot->flags = 0;
min_ring_index = Min(min_ring_index, ring_index);
if (is_prefetch)
MyNeonCounters->getpage_prefetch_requests_total++;
else
@@ -1313,12 +1315,11 @@ Retry:
MyPState->ring_unused - MyPState->ring_receive;
Assert(any_hits);
Assert(last_ring_index != UINT64_MAX);
Assert(GetPrfSlot(last_ring_index)->status == PRFS_REQUESTED ||
GetPrfSlot(last_ring_index)->status == PRFS_RECEIVED);
Assert(MyPState->ring_last <= last_ring_index &&
last_ring_index < MyPState->ring_unused);
Assert(GetPrfSlot(min_ring_index)->status == PRFS_REQUESTED ||
GetPrfSlot(min_ring_index)->status == PRFS_RECEIVED);
Assert(MyPState->ring_last <= min_ring_index &&
min_ring_index < MyPState->ring_unused);
if (flush_every_n_requests > 0 &&
MyPState->ring_unused - MyPState->ring_flush >= flush_every_n_requests)
@@ -1334,7 +1335,7 @@ Retry:
MyPState->ring_flush = MyPState->ring_unused;
}
return last_ring_index;
return min_ring_index;
}
static bool

View File

@@ -2,6 +2,6 @@ DROP FUNCTION IF EXISTS get_prewarm_info(out total_pages integer, out prewarmed_
DROP FUNCTION IF EXISTS get_local_cache_state(max_chunks integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer);
DROP FUNCTION IF EXISTS prewarm_local_cache(state bytea, n_workers integer default 1);

View File

@@ -16,7 +16,6 @@
#if PG_MAJORVERSION_NUM >= 15
#include "access/xlogrecovery.h"
#endif
#include "executor/instrument.h"
#include "replication/logical.h"
#include "replication/logicallauncher.h"
#include "replication/slot.h"
@@ -34,7 +33,6 @@
#include "file_cache.h"
#include "neon.h"
#include "neon_lwlsncache.h"
#include "neon_perf_counters.h"
#include "control_plane_connector.h"
#include "logical_replication_monitor.h"
#include "unstable_extensions.h"
@@ -48,13 +46,6 @@ void _PG_init(void);
static int running_xacts_overflow_policy;
static bool monitor_query_exec_time = false;
static ExecutorStart_hook_type prev_ExecutorStart = NULL;
static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
static void neon_ExecutorStart(QueryDesc *queryDesc, int eflags);
static void neon_ExecutorEnd(QueryDesc *queryDesc);
#if PG_MAJORVERSION_NUM >= 16
static shmem_startup_hook_type prev_shmem_startup_hook;
@@ -479,16 +470,6 @@ _PG_init(void)
0,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"neon.monitor_query_exec_time",
"Collect infortmation about query execution time",
NULL,
&monitor_query_exec_time,
false,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"neon.allow_replica_misconfig",
"Allow replica startup when some critical GUCs have smaller value than on primary node",
@@ -527,11 +508,6 @@ _PG_init(void)
EmitWarningsOnPlaceholders("neon");
ReportSearchPath();
prev_ExecutorStart = ExecutorStart_hook;
ExecutorStart_hook = neon_ExecutorStart;
prev_ExecutorEnd = ExecutorEnd_hook;
ExecutorEnd_hook = neon_ExecutorEnd;
}
PG_FUNCTION_INFO_V1(pg_cluster_size);
@@ -605,55 +581,3 @@ neon_shmem_startup_hook(void)
#endif
}
#endif
/*
* ExecutorStart hook: start up tracking if needed
*/
static void
neon_ExecutorStart(QueryDesc *queryDesc, int eflags)
{
if (prev_ExecutorStart)
prev_ExecutorStart(queryDesc, eflags);
else
standard_ExecutorStart(queryDesc, eflags);
if (monitor_query_exec_time)
{
/*
* Set up to track total elapsed time in ExecutorRun. Make sure the
* space is allocated in the per-query context so it will go away at
* ExecutorEnd.
*/
if (queryDesc->totaltime == NULL)
{
MemoryContext oldcxt;
oldcxt = MemoryContextSwitchTo(queryDesc->estate->es_query_cxt);
queryDesc->totaltime = InstrAlloc(1, INSTRUMENT_TIMER, false);
MemoryContextSwitchTo(oldcxt);
}
}
}
/*
* ExecutorEnd hook: store results if needed
*/
static void
neon_ExecutorEnd(QueryDesc *queryDesc)
{
if (monitor_query_exec_time && queryDesc->totaltime)
{
/*
* Make sure stats accumulation is done. (Note: it's okay if several
* levels of hook all do this.)
*/
InstrEndLoop(queryDesc->totaltime);
inc_query_time(queryDesc->totaltime->total*1000000); /* convert to usec */
}
if (prev_ExecutorEnd)
prev_ExecutorEnd(queryDesc);
else
standard_ExecutorEnd(queryDesc);
}

View File

@@ -71,27 +71,6 @@ inc_iohist(IOHistogram hist, uint64 latency_us)
hist->wait_us_count++;
}
static inline void
inc_qthist(QTHistogram hist, uint64 elapsed_us)
{
int lo = 0;
int hi = NUM_QT_BUCKETS - 1;
/* Find the right bucket with binary search */
while (lo < hi)
{
int mid = (lo + hi) / 2;
if (elapsed_us < qt_bucket_thresholds[mid])
hi = mid;
else
lo = mid + 1;
}
hist->elapsed_us_bucket[lo]++;
hist->elapsed_us_sum += elapsed_us;
hist->elapsed_us_count++;
}
/*
* Count a GetPage wait operation.
*/
@@ -119,13 +98,6 @@ inc_page_cache_write_wait(uint64 latency)
inc_iohist(&MyNeonCounters->file_cache_write_hist, latency);
}
void
inc_query_time(uint64 elapsed)
{
inc_qthist(&MyNeonCounters->query_time_hist, elapsed);
}
/*
* Support functions for the views, neon_backend_perf_counters and
* neon_perf_counters.
@@ -140,11 +112,11 @@ typedef struct
} metric_t;
static int
io_histogram_to_metrics(IOHistogram histogram,
metric_t *metrics,
const char *count,
const char *sum,
const char *bucket)
histogram_to_metrics(IOHistogram histogram,
metric_t *metrics,
const char *count,
const char *sum,
const char *bucket)
{
int i = 0;
uint64 bucket_accum = 0;
@@ -173,44 +145,10 @@ io_histogram_to_metrics(IOHistogram histogram,
return i;
}
static int
qt_histogram_to_metrics(QTHistogram histogram,
metric_t *metrics,
const char *count,
const char *sum,
const char *bucket)
{
int i = 0;
uint64 bucket_accum = 0;
metrics[i].name = count;
metrics[i].is_bucket = false;
metrics[i].value = (double) histogram->elapsed_us_count;
i++;
metrics[i].name = sum;
metrics[i].is_bucket = false;
metrics[i].value = (double) histogram->elapsed_us_sum / 1000000.0;
i++;
for (int bucketno = 0; bucketno < NUM_QT_BUCKETS; bucketno++)
{
uint64 threshold = qt_bucket_thresholds[bucketno];
bucket_accum += histogram->elapsed_us_bucket[bucketno];
metrics[i].name = bucket;
metrics[i].is_bucket = true;
metrics[i].bucket_le = (threshold == UINT64_MAX) ? INFINITY : ((double) threshold) / 1000000.0;
metrics[i].value = (double) bucket_accum;
i++;
}
return i;
}
static metric_t *
neon_perf_counters_to_metrics(neon_per_backend_counters *counters)
{
#define NUM_METRICS ((2 + NUM_IO_WAIT_BUCKETS) * 3 + (2 + NUM_QT_BUCKETS) + 12)
#define NUM_METRICS ((2 + NUM_IO_WAIT_BUCKETS) * 3 + 12)
metric_t *metrics = palloc((NUM_METRICS + 1) * sizeof(metric_t));
int i = 0;
@@ -221,10 +159,10 @@ neon_perf_counters_to_metrics(neon_per_backend_counters *counters)
i++; \
} while (false)
i += io_histogram_to_metrics(&counters->getpage_hist, &metrics[i],
"getpage_wait_seconds_count",
"getpage_wait_seconds_sum",
"getpage_wait_seconds_bucket");
i += histogram_to_metrics(&counters->getpage_hist, &metrics[i],
"getpage_wait_seconds_count",
"getpage_wait_seconds_sum",
"getpage_wait_seconds_bucket");
APPEND_METRIC(getpage_prefetch_requests_total);
APPEND_METRIC(getpage_sync_requests_total);
@@ -240,19 +178,14 @@ neon_perf_counters_to_metrics(neon_per_backend_counters *counters)
APPEND_METRIC(file_cache_hits_total);
i += io_histogram_to_metrics(&counters->file_cache_read_hist, &metrics[i],
"file_cache_read_wait_seconds_count",
"file_cache_read_wait_seconds_sum",
"file_cache_read_wait_seconds_bucket");
i += io_histogram_to_metrics(&counters->file_cache_write_hist, &metrics[i],
"file_cache_write_wait_seconds_count",
"file_cache_write_wait_seconds_sum",
"file_cache_write_wait_seconds_bucket");
i += qt_histogram_to_metrics(&counters->query_time_hist, &metrics[i],
"query_time_seconds_count",
"query_time_seconds_sum",
"query_time_seconds_bucket");
i += histogram_to_metrics(&counters->file_cache_read_hist, &metrics[i],
"file_cache_read_wait_seconds_count",
"file_cache_read_wait_seconds_sum",
"file_cache_read_wait_seconds_bucket");
i += histogram_to_metrics(&counters->file_cache_write_hist, &metrics[i],
"file_cache_write_wait_seconds_count",
"file_cache_write_wait_seconds_sum",
"file_cache_write_wait_seconds_bucket");
Assert(i == NUM_METRICS);
@@ -324,7 +257,7 @@ neon_get_backend_perf_counters(PG_FUNCTION_ARGS)
}
static inline void
io_histogram_merge_into(IOHistogram into, IOHistogram from)
histogram_merge_into(IOHistogram into, IOHistogram from)
{
into->wait_us_count += from->wait_us_count;
into->wait_us_sum += from->wait_us_sum;
@@ -332,15 +265,6 @@ io_histogram_merge_into(IOHistogram into, IOHistogram from)
into->wait_us_bucket[bucketno] += from->wait_us_bucket[bucketno];
}
static inline void
qt_histogram_merge_into(QTHistogram into, QTHistogram from)
{
into->elapsed_us_count += from->elapsed_us_count;
into->elapsed_us_sum += from->elapsed_us_sum;
for (int bucketno = 0; bucketno < NUM_QT_BUCKETS; bucketno++)
into->elapsed_us_bucket[bucketno] += from->elapsed_us_bucket[bucketno];
}
PG_FUNCTION_INFO_V1(neon_get_perf_counters);
Datum
neon_get_perf_counters(PG_FUNCTION_ARGS)
@@ -359,7 +283,7 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
{
neon_per_backend_counters *counters = &neon_per_backend_counters_shared[procno];
io_histogram_merge_into(&totals.getpage_hist, &counters->getpage_hist);
histogram_merge_into(&totals.getpage_hist, &counters->getpage_hist);
totals.getpage_prefetch_requests_total += counters->getpage_prefetch_requests_total;
totals.getpage_sync_requests_total += counters->getpage_sync_requests_total;
totals.getpage_prefetch_misses_total += counters->getpage_prefetch_misses_total;
@@ -370,13 +294,13 @@ neon_get_perf_counters(PG_FUNCTION_ARGS)
totals.pageserver_open_requests += counters->pageserver_open_requests;
totals.getpage_prefetches_buffered += counters->getpage_prefetches_buffered;
totals.file_cache_hits_total += counters->file_cache_hits_total;
histogram_merge_into(&totals.file_cache_read_hist, &counters->file_cache_read_hist);
histogram_merge_into(&totals.file_cache_write_hist, &counters->file_cache_write_hist);
totals.compute_getpage_stuck_requests_total += counters->compute_getpage_stuck_requests_total;
totals.compute_getpage_max_inflight_stuck_time_ms = Max(
totals.compute_getpage_max_inflight_stuck_time_ms,
counters->compute_getpage_max_inflight_stuck_time_ms);
io_histogram_merge_into(&totals.file_cache_read_hist, &counters->file_cache_read_hist);
io_histogram_merge_into(&totals.file_cache_write_hist, &counters->file_cache_write_hist);
qt_histogram_merge_into(&totals.query_time_hist, &counters->query_time_hist);
}
metrics = neon_perf_counters_to_metrics(&totals);

View File

@@ -36,28 +36,6 @@ typedef struct IOHistogramData
typedef IOHistogramData *IOHistogram;
static const uint64 qt_bucket_thresholds[] = {
2, 3, 6, 10, /* 0 us - 10 us */
20, 30, 60, 100, /* 10 us - 100 us */
200, 300, 600, 1000, /* 100 us - 1 ms */
2000, 3000, 6000, 10000, /* 1 ms - 10 ms */
20000, 30000, 60000, 100000, /* 10 ms - 100 ms */
200000, 300000, 600000, 1000000, /* 100 ms - 1 s */
2000000, 3000000, 6000000, 10000000, /* 1 s - 10 s */
20000000, 30000000, 60000000, 100000000, /* 10 s - 100 s */
UINT64_MAX,
};
#define NUM_QT_BUCKETS (lengthof(qt_bucket_thresholds))
typedef struct QTHistogramData
{
uint64 elapsed_us_count;
uint64 elapsed_us_sum;
uint64 elapsed_us_bucket[NUM_QT_BUCKETS];
} QTHistogramData;
typedef QTHistogramData *QTHistogram;
typedef struct
{
/*
@@ -149,11 +127,6 @@ typedef struct
/* LFC I/O time buckets */
IOHistogramData file_cache_read_hist;
IOHistogramData file_cache_write_hist;
/*
* Histogram of query execution time.
*/
QTHistogramData query_time_hist;
} neon_per_backend_counters;
/* Pointer to the shared memory array of neon_per_backend_counters structs */
@@ -176,7 +149,6 @@ extern neon_per_backend_counters *neon_per_backend_counters_shared;
extern void inc_getpage_wait(uint64 latency);
extern void inc_page_cache_read_wait(uint64 latency);
extern void inc_page_cache_write_wait(uint64 latency);
extern void inc_query_time(uint64 elapsed);
extern Size NeonPerfCountersShmemSize(void);
extern void NeonPerfCountersShmemInit(void);

View File

@@ -5,7 +5,6 @@
#include "funcapi.h"
#include "miscadmin.h"
#include "access/xlog.h"
#include "utils/tuplestore.h"
#include "neon_pgversioncompat.h"
@@ -42,12 +41,5 @@ InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags)
rsinfo->setDesc = stored_tupdesc;
MemoryContextSwitchTo(old_context);
}
TimeLineID GetWALInsertionTimeLine(void)
{
return ThisTimeLineID + 1;
}
#endif

View File

@@ -162,7 +162,6 @@ InitBufferTag(BufferTag *tag, const RelFileNode *rnode,
#if PG_MAJORVERSION_NUM < 15
extern void InitMaterializedSRF(FunctionCallInfo fcinfo, bits32 flags);
extern TimeLineID GetWALInsertionTimeLine(void);
#endif
#endif /* NEON_PGVERSIONCOMPAT_H */

View File

@@ -69,7 +69,6 @@ struct NeonWALReader
WALSegmentContext segcxt;
WALOpenSegment seg;
int wre_errno;
TimeLineID local_active_tlid;
/* Explains failure to read, static for simplicity. */
char err_msg[NEON_WALREADER_ERR_MSG_LEN];
@@ -107,7 +106,7 @@ struct NeonWALReader
/* palloc and initialize NeonWALReader */
NeonWALReader *
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix, TimeLineID tlid)
NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix)
{
NeonWALReader *reader;
@@ -119,7 +118,6 @@ NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_
MemoryContextAllocZero(TopMemoryContext, sizeof(NeonWALReader));
reader->available_lsn = available_lsn;
reader->local_active_tlid = tlid;
reader->seg.ws_file = -1;
reader->seg.ws_segno = 0;
reader->seg.ws_tli = 0;
@@ -579,17 +577,6 @@ NeonWALReaderIsRemConnEstablished(NeonWALReader *state)
return state->rem_state == RS_ESTABLISHED;
}
/*
* Whether remote connection is established. Once this is done, until successful
* local read or error socket is stable and user can update socket events
* instead of readding it each time.
*/
TimeLineID
NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state)
{
return state->local_active_tlid;
}
/*
* Returns events user should wait on connection socket or 0 if remote
* connection is not active.

View File

@@ -19,10 +19,9 @@ typedef enum
NEON_WALREAD_ERROR,
} NeonWALReadResult;
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix, TimeLineID tlid);
extern NeonWALReader *NeonWALReaderAllocate(int wal_segment_size, XLogRecPtr available_lsn, char *log_prefix);
extern void NeonWALReaderFree(NeonWALReader *state);
extern void NeonWALReaderResetRemote(NeonWALReader *state);
extern TimeLineID NeonWALReaderLocalActiveTimeLineID(NeonWALReader *state);
extern NeonWALReadResult NeonWALRead(NeonWALReader *state, char *buf, XLogRecPtr startptr, Size count, TimeLineID tli);
extern pgsocket NeonWALReaderSocket(NeonWALReader *state);
extern uint32 NeonWALReaderEvents(NeonWALReader *state);

View File

@@ -98,7 +98,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
wp = palloc0(sizeof(WalProposer));
wp->config = config;
wp->api = api;
wp->localTimeLineID = config->pgTimeline;
wp->state = WPS_COLLECTING_TERMS;
wp->mconf.generation = INVALID_GENERATION;
wp->mconf.members.len = 0;
@@ -120,10 +119,6 @@ WalProposerCreate(WalProposerConfig *config, walproposer_api api)
{
wp_log(FATAL, "failed to parse neon.safekeepers generation number: %m");
}
if (*endptr != ':')
{
wp_log(FATAL, "failed to parse neon.safekeepers: no colon after generation");
}
/* Skip past : to the first hostname. */
host = endptr + 1;
}
@@ -1135,7 +1130,7 @@ VotesCollectedMset(WalProposer *wp, MemberSet *mset, Safekeeper **msk, StringInf
wp->propTermStartLsn = sk->voteResponse.flushLsn;
wp->donor = sk;
}
wp->truncateLsn = Max(sk->voteResponse.truncateLsn, wp->truncateLsn);
wp->truncateLsn = Max(wp->safekeeper[i].voteResponse.truncateLsn, wp->truncateLsn);
if (n_votes > 0)
appendStringInfoString(s, ", ");
@@ -1385,7 +1380,7 @@ ProcessPropStartPos(WalProposer *wp)
* we must bail out, as clog and other non rel data is inconsistent.
*/
walprop_shared = wp->api.get_shmem_state(wp);
if (!wp->config->syncSafekeepers && !walprop_shared->replica_promote)
if (!wp->config->syncSafekeepers)
{
/*
* Basebackup LSN always points to the beginning of the record (not

View File

@@ -391,7 +391,6 @@ typedef struct WalproposerShmemState
/* last feedback from each shard */
PageserverFeedback shard_ps_feedback[MAX_SHARDS];
int num_shards;
bool replica_promote;
/* aggregated feedback with min LSNs across shards */
PageserverFeedback min_ps_feedback;
@@ -807,9 +806,6 @@ typedef struct WalProposer
/* Safekeepers walproposer is connecting to. */
Safekeeper safekeeper[MAX_SAFEKEEPERS];
/* Current local TimeLineId in use */
TimeLineID localTimeLineID;
/* WAL has been generated up to this point */
XLogRecPtr availableLsn;

View File

@@ -35,7 +35,6 @@
#include "storage/proc.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/pg_shmem.h"
#include "storage/shmem.h"
#include "storage/spin.h"
#include "tcop/tcopprot.h"
@@ -160,19 +159,12 @@ WalProposerMain(Datum main_arg)
{
WalProposer *wp;
if (*wal_acceptors_list == '\0')
{
wpg_log(WARNING, "Safekeepers list is empty");
return;
}
init_walprop_config(false);
walprop_pg_init_bgworker();
am_walproposer = true;
walprop_pg_load_libpqwalreceiver();
wp = WalProposerCreate(&walprop_config, walprop_pg);
wp->localTimeLineID = GetWALInsertionTimeLine();
wp->last_reconnect_attempt = walprop_pg_get_current_timestamp(wp);
walprop_pg_init_walsender();
@@ -280,30 +272,6 @@ split_safekeepers_list(char *safekeepers_list, char *safekeepers[])
return n_safekeepers;
}
static char *split_off_safekeepers_generation(char *safekeepers_list, uint32 *generation)
{
char *endptr;
if (strncmp(safekeepers_list, "g#", 2) != 0)
{
return safekeepers_list;
}
else
{
errno = 0;
*generation = strtoul(safekeepers_list + 2, &endptr, 10);
if (errno != 0)
{
wp_log(FATAL, "failed to parse neon.safekeepers generation number: %m");
}
if (*endptr != ':')
{
wp_log(FATAL, "failed to parse neon.safekeepers: no colon after generation");
}
return endptr + 1;
}
}
/*
* Accept two coma-separated strings with list of safekeeper host:port addresses.
* Split them into arrays and return false if two sets do not match, ignoring the order.
@@ -315,16 +283,6 @@ safekeepers_cmp(char *old, char *new)
char *safekeepers_new[MAX_SAFEKEEPERS];
int len_old = 0;
int len_new = 0;
uint32 gen_old = INVALID_GENERATION;
uint32 gen_new = INVALID_GENERATION;
old = split_off_safekeepers_generation(old, &gen_old);
new = split_off_safekeepers_generation(new, &gen_new);
if (gen_old != gen_new)
{
return false;
}
len_old = split_safekeepers_list(old, safekeepers_old);
len_new = split_safekeepers_list(new, safekeepers_new);
@@ -358,9 +316,6 @@ assign_neon_safekeepers(const char *newval, void *extra)
char *newval_copy;
char *oldval;
if (newval && *newval != '\0' && UsedShmemSegAddr && walprop_shared && RecoveryInProgress())
walprop_shared->replica_promote = true;
if (!am_walproposer)
return;
@@ -551,15 +506,16 @@ BackpressureThrottlingTime(void)
/*
* Register a background worker proposing WAL to wal acceptors.
* We start walproposer bgworker even for replicas in order to support possible replica promotion.
* When pg_promote() function is called, then walproposer bgworker registered with BgWorkerStart_RecoveryFinished
* is automatically launched when promotion is completed.
*/
static void
walprop_register_bgworker(void)
{
BackgroundWorker bgw;
/* If no wal acceptors are specified, don't start the background worker. */
if (*wal_acceptors_list == '\0')
return;
memset(&bgw, 0, sizeof(bgw));
bgw.bgw_flags = BGWORKER_SHMEM_ACCESS;
bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
@@ -1336,7 +1292,9 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
#if PG_VERSION_NUM < 150000
if (ThisTimeLineID == 0)
ThisTimeLineID = 1;
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
#endif
/*
@@ -1550,7 +1508,7 @@ walprop_pg_wal_reader_allocate(Safekeeper *sk)
snprintf(log_prefix, sizeof(log_prefix), WP_LOG_PREFIX "sk %s:%s nwr: ", sk->host, sk->port);
Assert(!sk->xlogreader);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propTermStartLsn, log_prefix, sk->wp->localTimeLineID);
sk->xlogreader = NeonWALReaderAllocate(wal_segment_size, sk->wp->propTermStartLsn, log_prefix);
if (sk->xlogreader == NULL)
wpg_log(FATAL, "failed to allocate xlog reader");
}
@@ -1564,7 +1522,7 @@ walprop_pg_wal_read(Safekeeper *sk, char *buf, XLogRecPtr startptr, Size count,
buf,
startptr,
count,
sk->wp->localTimeLineID);
walprop_pg_get_timeline_id());
if (res == NEON_WALREAD_SUCCESS)
{

View File

@@ -111,7 +111,7 @@ NeonWALPageRead(
readBuf,
targetPagePtr,
count,
NeonWALReaderLocalActiveTimeLineID(wal_reader));
walprop_pg_get_timeline_id());
if (res == NEON_WALREAD_SUCCESS)
{
@@ -202,7 +202,7 @@ NeonOnDemandXLogReaderRoutines(XLogReaderRoutine *xlr)
{
elog(ERROR, "unable to start walsender when basebackupLsn is 0");
}
wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn, "[walsender] ", 1);
wal_reader = NeonWALReaderAllocate(wal_segment_size, basebackupLsn, "[walsender] ");
}
xlr->page_read = NeonWALPageRead;
xlr->segment_open = NeonWALReadSegmentOpen;

10
poetry.lock generated
View File

@@ -3051,19 +3051,19 @@ files = [
[[package]]
name = "requests"
version = "2.32.4"
version = "2.32.3"
description = "Python HTTP for Humans."
optional = false
python-versions = ">=3.8"
groups = ["main"]
files = [
{file = "requests-2.32.4-py3-none-any.whl", hash = "sha256:27babd3cda2a6d50b30443204ee89830707d396671944c998b5975b031ac2b2c"},
{file = "requests-2.32.4.tar.gz", hash = "sha256:27d0316682c8a29834d3264820024b62a36942083d52caf2f14c0591336d3422"},
{file = "requests-2.32.3-py3-none-any.whl", hash = "sha256:70761cfe03c773ceb22aa2f671b4757976145175cdfca038c02654d061d6dcc6"},
{file = "requests-2.32.3.tar.gz", hash = "sha256:55365417734eb18255590a9ff9eb97e9e1da868d4ccd6402399eaf68af20a760"},
]
[package.dependencies]
certifi = ">=2017.4.17"
charset_normalizer = ">=2,<4"
charset-normalizer = ">=2,<4"
idna = ">=2.5,<4"
urllib3 = ">=1.21.1,<3"
@@ -3846,4 +3846,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = "^3.11"
content-hash = "bd93313f110110aa53b24a3ed47ba2d7f60e2c658a79cdff7320fed1bb1b57b5"
content-hash = "7ab1e7b975af34b3271b7c6018fa22a261d3f73c7c0a0403b6b2bb86b5fbd36e"

Some files were not shown because too many files have changed in this diff Show More