mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-15 12:10:37 +00:00
Compare commits
107 Commits
problame/t
...
hotfix/202
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9655709739 | ||
|
|
bd87245abc | ||
|
|
fd73e138b4 | ||
|
|
9217e7cce7 | ||
|
|
d646ddcd07 | ||
|
|
c47af7ea9a | ||
|
|
1167aee661 | ||
|
|
f63cb18155 | ||
|
|
000eb1b069 | ||
|
|
0de603d88e | ||
|
|
240913912a | ||
|
|
91a4ea0de2 | ||
|
|
f51b48fa49 | ||
|
|
9f906ff236 | ||
|
|
c79dd8d458 | ||
|
|
ec4ecdd543 | ||
|
|
20a4d817ce | ||
|
|
5ebf7e5619 | ||
|
|
8608704f49 | ||
|
|
efef68ce99 | ||
|
|
0692fffbf3 | ||
|
|
093570af20 | ||
|
|
eb403da814 | ||
|
|
f3ad635911 | ||
|
|
a8d7360881 | ||
|
|
b0311cfdeb | ||
|
|
8daefd24da | ||
|
|
46cc8b7982 | ||
|
|
38cd90dd0c | ||
|
|
a51b269f15 | ||
|
|
43bf6d0a0f | ||
|
|
15273a9b66 | ||
|
|
78aca668d0 | ||
|
|
acbf4148ea | ||
|
|
6508540561 | ||
|
|
a41b5244a8 | ||
|
|
2b3189be95 | ||
|
|
248563c595 | ||
|
|
14cd6ca933 | ||
|
|
eb36403e71 | ||
|
|
3c6f779698 | ||
|
|
f67f0c1c11 | ||
|
|
edb02d3299 | ||
|
|
664a69e65b | ||
|
|
478322ebf9 | ||
|
|
802f174072 | ||
|
|
47f9890bae | ||
|
|
262265daad | ||
|
|
300da5b872 | ||
|
|
7b22b5c433 | ||
|
|
ffca97bc1e | ||
|
|
cb356f3259 | ||
|
|
c85374295f | ||
|
|
4992160677 | ||
|
|
bd535b3371 | ||
|
|
d90c5a03af | ||
|
|
2d02cc9079 | ||
|
|
49ad94b99f | ||
|
|
948a217398 | ||
|
|
125381eae7 | ||
|
|
cd01bbc715 | ||
|
|
d8b5e3b88d | ||
|
|
06d25f2186 | ||
|
|
f759b561f3 | ||
|
|
ece0555600 | ||
|
|
73ea0a0b01 | ||
|
|
d8f6d6fd6f | ||
|
|
d24de169a7 | ||
|
|
0816168296 | ||
|
|
277b44d57a | ||
|
|
68c2c3880e | ||
|
|
49da498f65 | ||
|
|
2c76ba3dd7 | ||
|
|
dbe3dc69ad | ||
|
|
8e5bb3ed49 | ||
|
|
ab0be7b8da | ||
|
|
b4c55f5d24 | ||
|
|
ede70d833c | ||
|
|
70c3d18bb0 | ||
|
|
7a491f52c4 | ||
|
|
323c4ecb4f | ||
|
|
3d2466607e | ||
|
|
ed478b39f4 | ||
|
|
91585a558d | ||
|
|
93467eae1f | ||
|
|
f3aac81d19 | ||
|
|
979ad60c19 | ||
|
|
9316cb1b1f | ||
|
|
e7939a527a | ||
|
|
36d26665e1 | ||
|
|
873347f977 | ||
|
|
e814ac16f9 | ||
|
|
ad3055d386 | ||
|
|
94e03eb452 | ||
|
|
380f26ef79 | ||
|
|
3c5b7f59d7 | ||
|
|
fee89f80b5 | ||
|
|
41cce8eaf1 | ||
|
|
f88fe0218d | ||
|
|
cc856eca85 | ||
|
|
cf350c6002 | ||
|
|
0ce6b6a0a3 | ||
|
|
73f247d537 | ||
|
|
960be82183 | ||
|
|
806e5a6c19 | ||
|
|
8d5df07cce | ||
|
|
df7a9d1407 |
2
.github/ansible/prod.us-east-2.hosts.yaml
vendored
2
.github/ansible/prod.us-east-2.hosts.yaml
vendored
@@ -27,6 +27,8 @@ storage:
|
||||
ansible_host: i-062227ba7f119eb8c
|
||||
pageserver-1.us-east-2.aws.neon.tech:
|
||||
ansible_host: i-0b3ec0afab5968938
|
||||
pageserver-2.us-east-2.aws.neon.tech:
|
||||
ansible_host: i-0d7a1c4325e71421d
|
||||
|
||||
safekeepers:
|
||||
hosts:
|
||||
|
||||
@@ -1,6 +1,22 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
|
||||
@@ -1,6 +1,22 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
|
||||
@@ -1,6 +1,22 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
|
||||
@@ -1,6 +1,22 @@
|
||||
# Helm chart values for neon-proxy-scram.
|
||||
# This is a YAML-formatted file.
|
||||
|
||||
deploymentStrategy:
|
||||
type: RollingUpdate
|
||||
rollingUpdate:
|
||||
maxSurge: 100%
|
||||
maxUnavailable: 50%
|
||||
|
||||
# Delay the kill signal by 7 days (7 * 24 * 60 * 60)
|
||||
# The pod(s) will stay in Terminating, keeps the existing connections
|
||||
# but doesn't receive new ones
|
||||
containerLifecycle:
|
||||
preStop:
|
||||
exec:
|
||||
command: ["/bin/sh", "-c", "sleep 604800"]
|
||||
terminationGracePeriodSeconds: 604800
|
||||
|
||||
|
||||
image:
|
||||
repository: neondatabase/neon
|
||||
|
||||
|
||||
19
Cargo.lock
generated
19
Cargo.lock
generated
@@ -854,6 +854,7 @@ dependencies = [
|
||||
"opentelemetry",
|
||||
"postgres",
|
||||
"regex",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tar",
|
||||
@@ -3066,15 +3067,6 @@ dependencies = [
|
||||
"workspace_hack",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "remove_dir_all"
|
||||
version = "0.5.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
|
||||
dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reqwest"
|
||||
version = "0.11.14"
|
||||
@@ -3848,16 +3840,15 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tempfile"
|
||||
version = "3.3.0"
|
||||
version = "3.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5cdb1ef4eaeeaddc8fbd371e5017057064af0911902ef36b39801f67cc6d79e4"
|
||||
checksum = "af18f7ae1acd354b992402e9ec5864359d693cd8a79dcbef59f76891701c1e95"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"fastrand",
|
||||
"libc",
|
||||
"redox_syscall",
|
||||
"remove_dir_all",
|
||||
"winapi",
|
||||
"rustix",
|
||||
"windows-sys 0.42.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -150,7 +150,7 @@ workspace_hack = { version = "0.1", path = "./workspace_hack/" }
|
||||
criterion = "0.4"
|
||||
rcgen = "0.10"
|
||||
rstest = "0.16"
|
||||
tempfile = "3.2"
|
||||
tempfile = "3.4"
|
||||
tonic-build = "0.8"
|
||||
|
||||
# This is only needed for proxy's tests.
|
||||
|
||||
@@ -32,11 +32,15 @@ RUN cd postgres && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/include install && \
|
||||
make MAKELEVEL=0 -j $(getconf _NPROCESSORS_ONLN) -s -C src/interfaces/libpq install && \
|
||||
# Enable some of contrib extensions
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/autoinc.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/bloom.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/intagg.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/earthdistance.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/insert_username.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/intagg.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/moddatetime.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrowlocks.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgstattuple.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/refint.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/xml2.control
|
||||
|
||||
#########################################################################################
|
||||
@@ -60,10 +64,11 @@ RUN wget https://gitlab.com/Oslandia/SFCGAL/-/archive/v1.3.10/SFCGAL-v1.3.10.tar
|
||||
DESTDIR=/sfcgal make install -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make clean && cp -R /sfcgal/* /
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin:$PATH"
|
||||
|
||||
RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postgis.tar.gz && \
|
||||
mkdir postgis-src && cd postgis-src && tar xvzf ../postgis.tar.gz --strip-components=1 -C . && \
|
||||
./autogen.sh && \
|
||||
export PATH="/usr/local/pgsql/bin:$PATH" && \
|
||||
./configure --with-sfcgal=/usr/local/bin/sfcgal-config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
cd extensions/postgis && \
|
||||
@@ -77,6 +82,15 @@ RUN wget https://download.osgeo.org/postgis/source/postgis-3.3.2.tar.gz -O postg
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/address_standardizer.control && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/address_standardizer_data_us.control
|
||||
|
||||
RUN wget https://github.com/pgRouting/pgrouting/archive/v3.4.2.tar.gz -O pgrouting.tar.gz && \
|
||||
mkdir pgrouting-src && cd pgrouting-src && tar xvzf ../pgrouting.tar.gz --strip-components=1 -C . && \
|
||||
mkdir build && \
|
||||
cd build && \
|
||||
cmake .. && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgrouting.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "plv8-build"
|
||||
@@ -181,6 +195,36 @@ RUN wget https://github.com/michelp/pgjwt/archive/9742dab1b2f297ad3811120db7b214
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pgjwt.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "hypopg-pg-build"
|
||||
# compile hypopg extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS hypopg-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/HypoPG/hypopg/archive/refs/tags/1.3.1.tar.gz -O hypopg.tar.gz && \
|
||||
mkdir hypopg-src && cd hypopg-src && tar xvzf ../hypopg.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/hypopg.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-hashids-pg-build"
|
||||
# compile pg_hashids extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-hashids-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
RUN wget https://github.com/iCyberon/pg_hashids/archive/refs/tags/v1.2.1.tar.gz -O pg_hashids.tar.gz && \
|
||||
mkdir pg_hashids-src && cd pg_hashids-src && tar xvzf ../pg_hashids.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install PG_CONFIG=/usr/local/pgsql/bin/pg_config USE_PGXS=1 && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_hashids.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions"
|
||||
@@ -221,6 +265,8 @@ FROM rust-extensions-build AS pg-jsonschema-pg-build
|
||||
RUN git clone --depth=1 --single-branch --branch neon_abi_v0.1.4 https://github.com/vadim2404/pg_jsonschema/ && \
|
||||
cd pg_jsonschema && \
|
||||
cargo pgx install --release && \
|
||||
# it's needed to enable extension because it uses untrusted C language
|
||||
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_jsonschema.control && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
|
||||
|
||||
#########################################################################################
|
||||
@@ -235,6 +281,8 @@ FROM rust-extensions-build AS pg-graphql-pg-build
|
||||
RUN git clone --depth=1 --single-branch --branch neon_abi_v1.1.0 https://github.com/vadim2404/pg_graphql && \
|
||||
cd pg_graphql && \
|
||||
cargo pgx install --release && \
|
||||
# it's needed to enable extension because it uses untrusted C language
|
||||
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_graphql.control && \
|
||||
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_graphql.control
|
||||
|
||||
#########################################################################################
|
||||
@@ -254,6 +302,8 @@ COPY --from=vector-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pgjwt-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-jsonschema-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-graphql-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=hypopg-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-hashids-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
|
||||
@@ -17,6 +17,7 @@ regex.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tar.workspace = true
|
||||
reqwest = { workspace = true, features = ["json"] }
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
tokio-postgres.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
@@ -65,6 +65,9 @@ fn main() -> Result<()> {
|
||||
let spec = matches.get_one::<String>("spec");
|
||||
let spec_path = matches.get_one::<String>("spec-path");
|
||||
|
||||
let compute_id = matches.get_one::<String>("compute-id");
|
||||
let control_plane_uri = matches.get_one::<String>("control-plane-uri");
|
||||
|
||||
// Try to use just 'postgres' if no path is provided
|
||||
let pgbin = matches.get_one::<String>("pgbin").unwrap();
|
||||
|
||||
@@ -77,8 +80,27 @@ fn main() -> Result<()> {
|
||||
let path = Path::new(sp);
|
||||
let file = File::open(path)?;
|
||||
serde_json::from_reader(file)?
|
||||
} else if let Some(id) = compute_id {
|
||||
if let Some(cp_base) = control_plane_uri {
|
||||
let cp_uri = format!("{cp_base}/management/api/v1/{id}/spec");
|
||||
let jwt: String = match std::env::var("NEON_CONSOLE_JWT") {
|
||||
Ok(v) => v,
|
||||
Err(_) => "".to_string(),
|
||||
};
|
||||
|
||||
reqwest::blocking::Client::new()
|
||||
.get(cp_uri)
|
||||
.header("Authorization", jwt)
|
||||
.send()?
|
||||
.json()?
|
||||
} else {
|
||||
panic!(
|
||||
"must specify --control-plane-uri \"{:#?}\" and --compute-id \"{:#?}\"",
|
||||
control_plane_uri, compute_id
|
||||
);
|
||||
}
|
||||
} else {
|
||||
panic!("cluster spec should be provided via --spec or --spec-path argument");
|
||||
panic!("compute spec should be provided via --spec or --spec-path argument");
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -227,6 +249,18 @@ fn cli() -> clap::Command {
|
||||
.long("spec-path")
|
||||
.value_name("SPEC_PATH"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("compute-id")
|
||||
.short('i')
|
||||
.long("compute-id")
|
||||
.value_name("COMPUTE_ID"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("control-plane-uri")
|
||||
.short('p')
|
||||
.long("control-plane-uri")
|
||||
.value_name("CONTROL_PLANE"),
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -98,6 +98,15 @@ impl RelTag {
|
||||
|
||||
name
|
||||
}
|
||||
|
||||
pub fn with_forknum(&self, forknum: u8) -> Self {
|
||||
RelTag {
|
||||
forknum,
|
||||
spcnode: self.spcnode,
|
||||
dbnode: self.dbnode,
|
||||
relnode: self.relnode,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
///
|
||||
|
||||
@@ -4,13 +4,14 @@ use anyhow::{anyhow, Context};
|
||||
use hyper::header::{HeaderName, AUTHORIZATION};
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::{header::CONTENT_TYPE, Body, Request, Response, Server};
|
||||
use hyper::{Method, StatusCode};
|
||||
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
|
||||
use once_cell::sync::Lazy;
|
||||
use routerify::ext::RequestExt;
|
||||
use routerify::RequestInfo;
|
||||
use routerify::{Middleware, Router, RouterBuilder, RouterService};
|
||||
use tokio::task::JoinError;
|
||||
use tracing::info;
|
||||
use tracing;
|
||||
|
||||
use std::future::Future;
|
||||
use std::net::TcpListener;
|
||||
@@ -27,7 +28,14 @@ static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
|
||||
});
|
||||
|
||||
async fn logger(res: Response<Body>, info: RequestInfo) -> Result<Response<Body>, ApiError> {
|
||||
info!("{} {} {}", info.method(), info.uri().path(), res.status(),);
|
||||
// cannot factor out the Level to avoid the repetition
|
||||
// because tracing can only work with const Level
|
||||
// which is not the case here
|
||||
if info.method() == Method::GET && res.status() == StatusCode::OK {
|
||||
tracing::debug!("{} {} {}", info.method(), info.uri().path(), res.status());
|
||||
} else {
|
||||
tracing::info!("{} {} {}", info.method(), info.uri().path(), res.status());
|
||||
}
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
@@ -203,7 +211,7 @@ pub fn serve_thread_main<S>(
|
||||
where
|
||||
S: Future<Output = ()> + Send + Sync,
|
||||
{
|
||||
info!("Starting an HTTP endpoint at {}", listener.local_addr()?);
|
||||
tracing::info!("Starting an HTTP endpoint at {}", listener.local_addr()?);
|
||||
|
||||
// Create a Service from the router above to handle incoming requests.
|
||||
let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap();
|
||||
|
||||
@@ -33,6 +33,7 @@ use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
|
||||
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
|
||||
use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
|
||||
use postgres_ffi::relfile_utils::{INIT_FORKNUM, MAIN_FORKNUM};
|
||||
use postgres_ffi::TransactionId;
|
||||
use postgres_ffi::XLogFileName;
|
||||
use postgres_ffi::PG_TLI;
|
||||
@@ -190,14 +191,31 @@ where
|
||||
{
|
||||
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
|
||||
|
||||
// Gather and send relational files in each database if full backup is requested.
|
||||
if self.full_backup {
|
||||
for rel in self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?
|
||||
{
|
||||
self.add_rel(rel).await?;
|
||||
// If full backup is requested, include all relation files.
|
||||
// Otherwise only include init forks of unlogged relations.
|
||||
let rels = self
|
||||
.timeline
|
||||
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
|
||||
.await?;
|
||||
for &rel in rels.iter() {
|
||||
// Send init fork as main fork to provide well formed empty
|
||||
// contents of UNLOGGED relations. Postgres copies it in
|
||||
// `reinit.c` during recovery.
|
||||
if rel.forknum == INIT_FORKNUM {
|
||||
// I doubt we need _init fork itself, but having it at least
|
||||
// serves as a marker relation is unlogged.
|
||||
self.add_rel(rel, rel).await?;
|
||||
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
if self.full_backup {
|
||||
if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
|
||||
{
|
||||
// skip this, will include it when we reach the init fork
|
||||
continue;
|
||||
}
|
||||
self.add_rel(rel, rel).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -220,15 +238,16 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
|
||||
/// Add contents of relfilenode `src`, naming it as `dst`.
|
||||
async fn add_rel(&mut self, src: RelTag, dst: RelTag) -> anyhow::Result<()> {
|
||||
let nblocks = self
|
||||
.timeline
|
||||
.get_rel_size(tag, self.lsn, false, self.ctx)
|
||||
.get_rel_size(src, self.lsn, false, self.ctx)
|
||||
.await?;
|
||||
|
||||
// If the relation is empty, create an empty file
|
||||
if nblocks == 0 {
|
||||
let file_name = tag.to_segfile_name(0);
|
||||
let file_name = dst.to_segfile_name(0);
|
||||
let header = new_tar_header(&file_name, 0)?;
|
||||
self.ar.append(&header, &mut io::empty()).await?;
|
||||
return Ok(());
|
||||
@@ -244,12 +263,12 @@ where
|
||||
for blknum in startblk..endblk {
|
||||
let img = self
|
||||
.timeline
|
||||
.get_rel_page_at_lsn(tag, blknum, self.lsn, false, self.ctx)
|
||||
.get_rel_page_at_lsn(src, blknum, self.lsn, false, self.ctx)
|
||||
.await?;
|
||||
segment_data.extend_from_slice(&img[..]);
|
||||
}
|
||||
|
||||
let file_name = tag.to_segfile_name(seg as u32);
|
||||
let file_name = dst.to_segfile_name(seg as u32);
|
||||
let header = new_tar_header(&file_name, segment_data.len() as u64)?;
|
||||
self.ar.append(&header, segment_data.as_slice()).await?;
|
||||
|
||||
|
||||
@@ -971,19 +971,22 @@ async fn timeline_checkpoint_handler(request: Request<Body>) -> Result<Response<
|
||||
let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?;
|
||||
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
|
||||
check_permission(&request, Some(tenant_id))?;
|
||||
async {
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
timeline
|
||||
.freeze_and_flush()
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
timeline
|
||||
.compact(&ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
|
||||
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
|
||||
timeline
|
||||
.freeze_and_flush()
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
timeline
|
||||
.compact(&ctx)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, ())
|
||||
json_response(StatusCode::OK, ())
|
||||
}
|
||||
.instrument(info_span!("manual_checkpoint", tenant_id = %tenant_id, timeline_id = %timeline_id))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn timeline_download_remote_layers_handler_post(
|
||||
|
||||
@@ -103,6 +103,7 @@ pub struct TenantConfOpt {
|
||||
pub checkpoint_distance: Option<u64>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(with = "humantime_serde")]
|
||||
#[serde(default)]
|
||||
pub checkpoint_timeout: Option<Duration>,
|
||||
|
||||
|
||||
@@ -364,7 +364,7 @@ pub trait PersistentLayer: Layer {
|
||||
}
|
||||
|
||||
/// Permanently remove this layer from disk.
|
||||
fn delete(&self) -> Result<()>;
|
||||
fn delete_resident_layer_file(&self) -> Result<()>;
|
||||
|
||||
fn downcast_remote_layer(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
None
|
||||
|
||||
@@ -438,7 +438,7 @@ impl PersistentLayer for DeltaLayer {
|
||||
))
|
||||
}
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
fn delete_resident_layer_file(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
Ok(())
|
||||
|
||||
@@ -252,7 +252,7 @@ impl PersistentLayer for ImageLayer {
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
fn delete_resident_layer_file(&self) -> Result<()> {
|
||||
// delete underlying file
|
||||
fs::remove_file(self.path())?;
|
||||
Ok(())
|
||||
|
||||
@@ -155,8 +155,8 @@ impl PersistentLayer for RemoteLayer {
|
||||
bail!("cannot iterate a remote layer");
|
||||
}
|
||||
|
||||
fn delete(&self) -> Result<()> {
|
||||
Ok(())
|
||||
fn delete_resident_layer_file(&self) -> Result<()> {
|
||||
bail!("remote layer has no layer file");
|
||||
}
|
||||
|
||||
fn downcast_remote_layer<'a>(self: Arc<Self>) -> Option<std::sync::Arc<RemoteLayer>> {
|
||||
|
||||
@@ -662,8 +662,8 @@ impl Timeline {
|
||||
// update the index file on next flush iteration too. But it
|
||||
// could take a while until that happens.
|
||||
//
|
||||
// Additionally, only do this on the terminal round before sleeping.
|
||||
if last_round {
|
||||
// Additionally, only do this once before we return from this function.
|
||||
if last_round || res.is_ok() {
|
||||
if let Some(remote_client) = &self.remote_client {
|
||||
remote_client.schedule_index_upload_for_file_changes()?;
|
||||
}
|
||||
@@ -1047,11 +1047,12 @@ impl Timeline {
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
let layer_metadata = LayerFileMetadata::new(
|
||||
local_layer
|
||||
.file_size()
|
||||
.expect("Local layer should have a file size"),
|
||||
);
|
||||
let layer_file_size = local_layer
|
||||
.file_size()
|
||||
.expect("Local layer should have a file size");
|
||||
|
||||
let layer_metadata = LayerFileMetadata::new(layer_file_size);
|
||||
|
||||
let new_remote_layer = Arc::new(match local_layer.filename() {
|
||||
LayerFileName::Image(image_name) => RemoteLayer::new_img(
|
||||
self.tenant_id,
|
||||
@@ -1075,15 +1076,22 @@ impl Timeline {
|
||||
|
||||
let replaced = match batch_updates.replace_historic(local_layer, new_remote_layer)? {
|
||||
Replacement::Replaced { .. } => {
|
||||
let layer_size = local_layer.file_size();
|
||||
|
||||
if let Err(e) = local_layer.delete() {
|
||||
if let Err(e) = local_layer.delete_resident_layer_file() {
|
||||
error!("failed to remove layer file on evict after replacement: {e:#?}");
|
||||
}
|
||||
|
||||
if let Some(layer_size) = layer_size {
|
||||
self.metrics.resident_physical_size_gauge.sub(layer_size);
|
||||
}
|
||||
// Always decrement the physical size gauge, even if we failed to delete the file.
|
||||
// Rationale: we already replaced the layer with a remote layer in the layer map,
|
||||
// and any subsequent download_remote_layer will
|
||||
// 1. overwrite the file on disk and
|
||||
// 2. add the downloaded size to the resident size gauge.
|
||||
//
|
||||
// If there is no re-download, and we restart the pageserver, then load_layer_map
|
||||
// will treat the file as a local layer again, count it towards resident size,
|
||||
// and it'll be like the layer removal never happened.
|
||||
// The bump in resident size is perhaps unexpected but overall a robust behavior.
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.sub(layer_file_size);
|
||||
|
||||
true
|
||||
}
|
||||
@@ -1942,11 +1950,14 @@ impl Timeline {
|
||||
layer: Arc<dyn PersistentLayer>,
|
||||
updates: &mut BatchedUpdates<'_, dyn PersistentLayer>,
|
||||
) -> anyhow::Result<()> {
|
||||
let layer_size = layer.file_size();
|
||||
|
||||
layer.delete()?;
|
||||
if let Some(layer_size) = layer_size {
|
||||
self.metrics.resident_physical_size_gauge.sub(layer_size);
|
||||
if !layer.is_remote_layer() {
|
||||
layer.delete_resident_layer_file()?;
|
||||
let layer_file_size = layer
|
||||
.file_size()
|
||||
.expect("Local layer should have a file size");
|
||||
self.metrics
|
||||
.resident_physical_size_gauge
|
||||
.sub(layer_file_size);
|
||||
}
|
||||
|
||||
// TODO Removing from the bottom of the layer map is expensive.
|
||||
|
||||
@@ -37,7 +37,7 @@ use crate::walrecord::*;
|
||||
use crate::ZERO_PAGE;
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use postgres_ffi::pg_constants;
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::relfile_utils::{FSM_FORKNUM, INIT_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
|
||||
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
|
||||
use postgres_ffi::v14::xlog_utils::*;
|
||||
use postgres_ffi::v14::CheckPoint;
|
||||
@@ -762,7 +762,7 @@ impl<'a> WalIngest<'a> {
|
||||
)?;
|
||||
|
||||
for xnode in &parsed.xnodes {
|
||||
for forknum in MAIN_FORKNUM..=VISIBILITYMAP_FORKNUM {
|
||||
for forknum in MAIN_FORKNUM..=INIT_FORKNUM {
|
||||
let rel = RelTag {
|
||||
forknum,
|
||||
spcnode: xnode.spcnode,
|
||||
|
||||
@@ -1,126 +0,0 @@
|
||||
import csv
|
||||
import logging
|
||||
import sys
|
||||
import textwrap
|
||||
import requests
|
||||
import argparse
|
||||
import json
|
||||
|
||||
|
||||
class Client:
|
||||
def __init__(self, endpoint) -> None:
|
||||
self.endpoint = endpoint
|
||||
|
||||
def get(self, rel_url, **kwargs):
|
||||
resp = requests.get(self.endpoint + rel_url, **kwargs)
|
||||
try:
|
||||
resp.raise_for_status()
|
||||
except requests.exceptions.HTTPError:
|
||||
print("API ERROR: " + resp.text)
|
||||
raise
|
||||
return resp.json()
|
||||
def put(self, rel_url, **kwargs):
|
||||
resp = requests.put(self.endpoint + rel_url, **kwargs)
|
||||
try:
|
||||
resp.raise_for_status()
|
||||
except requests.exceptions.HTTPError:
|
||||
print("API ERROR: " + resp.text)
|
||||
raise
|
||||
return resp.json()
|
||||
|
||||
class AppException(RuntimeError):
|
||||
pass
|
||||
|
||||
def do_one(tenant, endpoint, merge_existing_with, check_tenant_exists=True):
|
||||
global verbose
|
||||
client = Client(endpoint)
|
||||
|
||||
if check_tenant_exists:
|
||||
tenants = client.get(f"/v1/tenant")
|
||||
matching_tenant = [ t for t in tenants if t['id'] == tenant ]
|
||||
if len(matching_tenant) == 0:
|
||||
raise AppException(f"no tenant {tenant} on pageserver {endpoint}")
|
||||
elif len(matching_tenant) > 1:
|
||||
raise AppException(f"multiple ({len(matching_tenant)}) tenants with id {tenant} on pageserver {endpoint}")
|
||||
else:
|
||||
pass
|
||||
|
||||
config = client.get(f"/v1/tenant/{tenant}/config")
|
||||
|
||||
def comparable_json(obj):
|
||||
j = json.dumps(obj, indent=' ', sort_keys=True)
|
||||
return textwrap.indent(j, ' ')
|
||||
|
||||
if verbose:
|
||||
before = comparable_json(config)
|
||||
print(f"BEFORE:\n{before}")
|
||||
|
||||
overrides = config['tenant_specific_overrides']
|
||||
|
||||
updated = {**overrides, **merge_existing_with}
|
||||
|
||||
client.put("/v1/tenant/config", json={**updated, "tenant_id": tenant})
|
||||
|
||||
|
||||
if verbose:
|
||||
new_config = client.get(f"/v1/tenant/{tenant}/config")
|
||||
after = comparable_json(new_config)
|
||||
print(f"AFTER:\n{after}")
|
||||
|
||||
def do_csv(csv_file, merge_existing_with):
|
||||
succeeded = []
|
||||
failed = []
|
||||
for n, line in enumerate(csv.reader(csv_file)):
|
||||
if n == 0:
|
||||
# skip header row
|
||||
continue
|
||||
if len(line) != 2:
|
||||
logging.warn(f"skipping line {n+1}: {line}")
|
||||
continue
|
||||
tenant_id = line[0]
|
||||
pageserver = line[1]
|
||||
try:
|
||||
do_one(tenant_id, f"http://{pageserver}:9898", merge_existing_with, check_tenant_exists=False)
|
||||
logging.info(f"succeeded to configure tenant {tenant_id}")
|
||||
succeeded += [tenant_id]
|
||||
except Exception as e:
|
||||
logging.exception(f"failed to configure tenant {tenant_id}")
|
||||
failed += [tenant_id]
|
||||
|
||||
print(json.dumps({
|
||||
"succeeded": succeeded,
|
||||
"failed": failed,
|
||||
}, indent=' ', sort_keys=True))
|
||||
|
||||
verbose = False
|
||||
|
||||
def main():
|
||||
global verbose
|
||||
|
||||
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
|
||||
|
||||
p = argparse.ArgumentParser()
|
||||
p.add_argument("--merge-existing-with", type=str)
|
||||
p.add_argument("--verbose", action='store_true')
|
||||
subcommands = p.add_subparsers(dest="subcommand")
|
||||
one_tenant_parser = subcommands.add_parser("one", help='change config of one tenant, specified via CLI flags')
|
||||
one_tenant_parser.add_argument("--tenant", required=True)
|
||||
one_tenant_parser.add_argument("--endpoint", type=str, default='http://localhost:9898')
|
||||
csv_parser = subcommands.add_parser("csv", help='batch reconfigure tenants specified in a csv file')
|
||||
csv_parser.add_argument('csv_file', type=argparse.FileType())
|
||||
args = p.parse_args()
|
||||
|
||||
verbose = args.verbose
|
||||
|
||||
merge_existing_with = {}
|
||||
if args.merge_existing_with is not None:
|
||||
merge_existing_with = json.loads(args.merge_existing_with)
|
||||
assert isinstance(merge_existing_with, dict)
|
||||
|
||||
({
|
||||
'one': lambda: do_one(args.tenant, args.endpoint, merge_existing_with),
|
||||
'csv': lambda: do_csv(args.csv_file, merge_existing_with),
|
||||
}[args.subcommand])()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -366,17 +366,9 @@ class NeonBenchmarker:
|
||||
|
||||
def get_int_counter_value(self, pageserver: NeonPageserver, metric_name: str) -> int:
|
||||
"""Fetch the value of given int counter from pageserver metrics."""
|
||||
# TODO: If we start to collect more of the prometheus metrics in the
|
||||
# performance test suite like this, we should refactor this to load and
|
||||
# parse all the metrics into a more convenient structure in one go.
|
||||
#
|
||||
# The metric should be an integer, as it's a number of bytes. But in general
|
||||
# all prometheus metrics are floats. So to be pedantic, read it as a float
|
||||
# and round to integer.
|
||||
all_metrics = pageserver.http_client().get_metrics()
|
||||
matches = re.search(rf"^{metric_name} (\S+)$", all_metrics, re.MULTILINE)
|
||||
assert matches, f"metric {metric_name} not found"
|
||||
return int(round(float(matches.group(1))))
|
||||
sample = all_metrics.query_one(metric_name)
|
||||
return int(round(sample.value))
|
||||
|
||||
def get_timeline_size(
|
||||
self, repo_dir: Path, tenant_id: TenantId, timeline_id: TimelineId
|
||||
|
||||
@@ -13,7 +13,8 @@ class Metrics:
|
||||
self.metrics = defaultdict(list)
|
||||
self.name = name
|
||||
|
||||
def query_all(self, name: str, filter: Dict[str, str]) -> List[Sample]:
|
||||
def query_all(self, name: str, filter: Optional[Dict[str, str]] = None) -> List[Sample]:
|
||||
filter = filter or {}
|
||||
res = []
|
||||
for sample in self.metrics[name]:
|
||||
try:
|
||||
|
||||
@@ -14,6 +14,7 @@ import tempfile
|
||||
import textwrap
|
||||
import time
|
||||
import uuid
|
||||
from collections import defaultdict
|
||||
from contextlib import closing, contextmanager
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Flag, auto
|
||||
@@ -28,7 +29,6 @@ import asyncpg
|
||||
import backoff # type: ignore
|
||||
import boto3
|
||||
import jwt
|
||||
import prometheus_client
|
||||
import psycopg2
|
||||
import pytest
|
||||
import requests
|
||||
@@ -36,7 +36,7 @@ from _pytest.config import Config
|
||||
from _pytest.config.argparsing import Parser
|
||||
from _pytest.fixtures import FixtureRequest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.metrics import Metrics, parse_metrics
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
from fixtures.utils import (
|
||||
ATTACHMENT_NAME_REGEX,
|
||||
@@ -45,7 +45,6 @@ from fixtures.utils import (
|
||||
get_self_dir,
|
||||
subprocess_capture,
|
||||
)
|
||||
from prometheus_client.parser import text_string_to_metric_families
|
||||
|
||||
# Type-related stuff
|
||||
from psycopg2.extensions import connection as PgConnection
|
||||
@@ -1436,22 +1435,27 @@ class PageserverHttpClient(requests.Session):
|
||||
assert completed["successful_download_count"] > 0
|
||||
return completed
|
||||
|
||||
def get_metrics(self) -> str:
|
||||
def get_metrics_str(self) -> str:
|
||||
"""You probably want to use get_metrics() instead."""
|
||||
res = self.get(f"http://localhost:{self.port}/metrics")
|
||||
self.verbose_error(res)
|
||||
return res.text
|
||||
|
||||
def get_timeline_metric(self, tenant_id: TenantId, timeline_id: TimelineId, metric_name: str):
|
||||
raw = self.get_metrics()
|
||||
family: List[prometheus_client.Metric] = list(text_string_to_metric_families(raw))
|
||||
[metric] = [m for m in family if m.name == metric_name]
|
||||
[sample] = [
|
||||
s
|
||||
for s in metric.samples
|
||||
if s.labels["tenant_id"] == str(tenant_id)
|
||||
and s.labels["timeline_id"] == str(timeline_id)
|
||||
]
|
||||
return sample.value
|
||||
def get_metrics(self) -> Metrics:
|
||||
res = self.get_metrics_str()
|
||||
return parse_metrics(res)
|
||||
|
||||
def get_timeline_metric(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, metric_name: str
|
||||
) -> float:
|
||||
metrics = self.get_metrics()
|
||||
return metrics.query_one(
|
||||
metric_name,
|
||||
filter={
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
},
|
||||
).value
|
||||
|
||||
def get_remote_timeline_client_metric(
|
||||
self,
|
||||
@@ -1461,7 +1465,7 @@ class PageserverHttpClient(requests.Session):
|
||||
file_kind: str,
|
||||
op_kind: str,
|
||||
) -> Optional[float]:
|
||||
metrics = parse_metrics(self.get_metrics(), "pageserver")
|
||||
metrics = self.get_metrics()
|
||||
matches = metrics.query_all(
|
||||
name=metric_name,
|
||||
filter={
|
||||
@@ -1480,14 +1484,16 @@ class PageserverHttpClient(requests.Session):
|
||||
assert len(matches) < 2, "above filter should uniquely identify metric"
|
||||
return value
|
||||
|
||||
def get_metric_value(self, name: str) -> Optional[str]:
|
||||
def get_metric_value(
|
||||
self, name: str, filter: Optional[Dict[str, str]] = None
|
||||
) -> Optional[float]:
|
||||
metrics = self.get_metrics()
|
||||
relevant = [line for line in metrics.splitlines() if line.startswith(name)]
|
||||
if len(relevant) == 0:
|
||||
results = metrics.query_all(name, filter=filter)
|
||||
if not results:
|
||||
log.info(f'could not find metric "{name}"')
|
||||
return None
|
||||
assert len(relevant) == 1
|
||||
return relevant[0].lstrip(name).strip()
|
||||
assert len(results) == 1, f"metric {name} with given filters is not unique, got: {results}"
|
||||
return results[0].value
|
||||
|
||||
def layer_map_info(
|
||||
self,
|
||||
@@ -1516,6 +1522,11 @@ class PageserverHttpClient(requests.Session):
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
def evict_all_layers(self, tenant_id: TenantId, timeline_id: TimelineId):
|
||||
info = self.layer_map_info(tenant_id, timeline_id)
|
||||
for layer in info.historic_layers:
|
||||
self.evict_layer(tenant_id, timeline_id, layer.layer_file_name)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TenantConfig:
|
||||
@@ -1551,6 +1562,14 @@ class LayerMapInfo:
|
||||
|
||||
return info
|
||||
|
||||
def kind_count(self) -> Dict[str, int]:
|
||||
counts: Dict[str, int] = defaultdict(int)
|
||||
for inmem_layer in self.in_memory_layers:
|
||||
counts[inmem_layer.kind] += 1
|
||||
for hist_layer in self.historic_layers:
|
||||
counts[hist_layer.kind] += 1
|
||||
return counts
|
||||
|
||||
|
||||
@dataclass
|
||||
class InMemoryLayerInfo:
|
||||
@@ -1567,7 +1586,7 @@ class InMemoryLayerInfo:
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True)
|
||||
class HistoricLayerInfo:
|
||||
kind: str
|
||||
layer_file_name: str
|
||||
@@ -1669,7 +1688,7 @@ class AbstractNeonCli(abc.ABC):
|
||||
timeout=timeout,
|
||||
)
|
||||
if not res.returncode:
|
||||
log.info(f"Run success: {res.stdout}")
|
||||
log.info(f"Run {res.args} success: {res.stdout}")
|
||||
elif check_return_code:
|
||||
# this way command output will be in recorded and shown in CI in failure message
|
||||
msg = f"""\
|
||||
@@ -3463,6 +3482,14 @@ def wait_for_last_flush_lsn(
|
||||
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
|
||||
|
||||
|
||||
def wait_for_wal_insert_lsn(
|
||||
env: NeonEnv, pg: Postgres, tenant: TenantId, timeline: TimelineId
|
||||
) -> Lsn:
|
||||
"""Wait for pageserver to catch up the latest flush LSN, returns the last observed lsn."""
|
||||
last_flush_lsn = Lsn(pg.safe_psql("SELECT pg_current_wal_insert_lsn()")[0][0])
|
||||
return wait_for_last_record_lsn(env.pageserver.http_client(), tenant, timeline, last_flush_lsn)
|
||||
|
||||
|
||||
def fork_at_current_lsn(
|
||||
env: NeonEnv,
|
||||
pg: Postgres,
|
||||
@@ -3508,3 +3535,23 @@ def wait_for_sk_commit_lsn_to_reach_remote_storage(
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
wait_for_upload(ps_http, tenant_id, timeline_id, lsn)
|
||||
return lsn
|
||||
|
||||
|
||||
def wait_for_upload_queue_empty(
|
||||
pageserver: NeonPageserver, tenant_id: TenantId, timeline_id: TimelineId
|
||||
):
|
||||
ps_http = pageserver.http_client()
|
||||
while True:
|
||||
all_metrics = ps_http.get_metrics()
|
||||
tl = all_metrics.query_all(
|
||||
"pageserver_remote_timeline_client_calls_unfinished",
|
||||
{
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
},
|
||||
)
|
||||
assert len(tl) > 0
|
||||
log.info(f"upload queue for {tenant_id}/{timeline_id}: {tl}")
|
||||
if all(m.value == 0 for m in tl):
|
||||
return
|
||||
time.sleep(0.2)
|
||||
|
||||
@@ -8,7 +8,7 @@ def test_build_info_metric(neon_env_builder: NeonEnvBuilder, link_proxy: NeonPro
|
||||
|
||||
parsed_metrics = {}
|
||||
|
||||
parsed_metrics["pageserver"] = parse_metrics(env.pageserver.http_client().get_metrics())
|
||||
parsed_metrics["pageserver"] = parse_metrics(env.pageserver.http_client().get_metrics_str())
|
||||
parsed_metrics["safekeeper"] = parse_metrics(env.safekeepers[0].http_client().get_metrics_str())
|
||||
parsed_metrics["proxy"] = parse_metrics(link_proxy.get_metrics())
|
||||
|
||||
|
||||
@@ -4,7 +4,6 @@ import random
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
@@ -134,7 +133,7 @@ def test_gc_index_upload(neon_env_builder: NeonEnvBuilder, remote_storage_kind:
|
||||
|
||||
# Helper function that gets the number of given kind of remote ops from the metrics
|
||||
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
|
||||
ps_metrics = parse_metrics(env.pageserver.http_client().get_metrics(), "pageserver")
|
||||
ps_metrics = env.pageserver.http_client().get_metrics()
|
||||
total = 0.0
|
||||
for sample in ps_metrics.query_all(
|
||||
name="pageserver_remote_operation_seconds_count",
|
||||
|
||||
@@ -1,8 +1,13 @@
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
RemoteStorageKind,
|
||||
wait_for_last_flush_lsn,
|
||||
wait_for_last_record_lsn,
|
||||
wait_for_sk_commit_lsn_to_reach_remote_storage,
|
||||
wait_for_upload,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
@@ -138,3 +143,160 @@ def test_basic_eviction(
|
||||
assert (
|
||||
redownloaded_layer_map_info == initial_layer_map_info
|
||||
), "Should have the same layer map after redownloading the evicted layers"
|
||||
|
||||
|
||||
def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind=RemoteStorageKind.LOCAL_FS,
|
||||
test_name="test_gc_of_remote_layers",
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_config = {
|
||||
"pitr_interval": "1s", # set to non-zero, so GC actually does something
|
||||
"gc_period": "0s", # we want to control when GC runs
|
||||
"compaction_period": "0s", # we want to control when compaction runs
|
||||
"checkpoint_timeout": "24h", # something we won't reach
|
||||
"checkpoint_distance": f"{50 * (1024**2)}", # something we won't reach, we checkpoint manually
|
||||
"compaction_threshold": "3",
|
||||
# "image_creation_threshold": set at runtime
|
||||
"compaction_target_size": f"{128 * (1024**2)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
|
||||
}
|
||||
|
||||
def tenant_update_config(changes):
|
||||
tenant_config.update(changes)
|
||||
env.neon_cli.config_tenant(tenant_id, tenant_config)
|
||||
|
||||
tenant_id, timeline_id = env.neon_cli.create_tenant(conf=tenant_config)
|
||||
log.info("tenant id is %s", tenant_id)
|
||||
env.initial_tenant = tenant_id # update_and_gc relies on this
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
pg = env.postgres.create_start("main")
|
||||
|
||||
log.info("fill with data, creating delta & image layers, some of which are GC'able after")
|
||||
# no particular reason to create the layers like this, but we are sure
|
||||
# not to hit the image_creation_threshold here.
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("create table a (id bigserial primary key, some_value bigint not null)")
|
||||
cur.execute("insert into a(some_value) select i from generate_series(1, 10000) s(i)")
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# Create delta layers, then turn them into image layers.
|
||||
# Do it multiple times so that there's something to GC.
|
||||
for k in range(0, 2):
|
||||
# produce delta layers => disable image layer creation by setting high threshold
|
||||
tenant_update_config({"image_creation_threshold": "100"})
|
||||
for i in range(0, 2):
|
||||
for j in range(0, 3):
|
||||
# create a minimal amount of "delta difficulty" for this table
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("update a set some_value = -some_value + %s", (j,))
|
||||
|
||||
with pg.cursor() as cur:
|
||||
# vacuuming should aid to reuse keys, though it's not really important
|
||||
# with image_creation_threshold=1 which we will use on the last compaction
|
||||
cur.execute("vacuum")
|
||||
|
||||
wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
|
||||
if i == 1 and j == 2 and k == 1:
|
||||
# last iteration; stop before checkpoint to avoid leaving an inmemory layer
|
||||
pg.stop_and_destroy()
|
||||
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# images should not yet be created, because threshold is too high,
|
||||
# but these will be reshuffled to L1 layers
|
||||
ps_http.timeline_compact(tenant_id, timeline_id)
|
||||
|
||||
for _ in range(0, 20):
|
||||
# loop in case flushing is still in progress
|
||||
layers = ps_http.layer_map_info(tenant_id, timeline_id)
|
||||
if not layers.in_memory_layers:
|
||||
break
|
||||
time.sleep(0.2)
|
||||
|
||||
# now that we've grown some delta layers, turn them into image layers
|
||||
tenant_update_config({"image_creation_threshold": "1"})
|
||||
ps_http.timeline_compact(tenant_id, timeline_id)
|
||||
|
||||
# wait for all uploads to finish
|
||||
wait_for_sk_commit_lsn_to_reach_remote_storage(
|
||||
tenant_id, timeline_id, env.safekeepers, env.pageserver
|
||||
)
|
||||
|
||||
# shutdown safekeepers to avoid on-demand downloads from walreceiver
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
log.info("ensure the code above produced image and delta layers")
|
||||
pre_evict_info = ps_http.layer_map_info(tenant_id, timeline_id)
|
||||
log.info("layer map dump: %s", pre_evict_info)
|
||||
by_kind = pre_evict_info.kind_count()
|
||||
log.info("by kind: %s", by_kind)
|
||||
assert by_kind["Image"] > 0
|
||||
assert by_kind["Delta"] > 0
|
||||
assert by_kind["InMemory"] == 0
|
||||
resident_layers = list(env.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
|
||||
log.info("resident layers count before eviction: %s", len(resident_layers))
|
||||
|
||||
log.info("evict all layers")
|
||||
ps_http.evict_all_layers(tenant_id, timeline_id)
|
||||
|
||||
def ensure_resident_and_remote_size_metrics():
|
||||
log.info("ensure that all the layers are gone")
|
||||
resident_layers = list(env.timeline_dir(tenant_id, timeline_id).glob("*-*_*"))
|
||||
# we have disabled all background loops, so, this should hold
|
||||
assert len(resident_layers) == 0
|
||||
|
||||
info = ps_http.layer_map_info(tenant_id, timeline_id)
|
||||
log.info("layer map dump: %s", info)
|
||||
|
||||
log.info("ensure that resident_physical_size metric is zero")
|
||||
resident_physical_size_metric = ps_http.get_timeline_metric(
|
||||
tenant_id, timeline_id, "pageserver_resident_physical_size"
|
||||
)
|
||||
assert resident_physical_size_metric == 0
|
||||
log.info("ensure that resident_physical_size metric corresponds to layer map dump")
|
||||
assert resident_physical_size_metric == sum(
|
||||
[layer.layer_file_size or 0 for layer in info.historic_layers if not layer.remote]
|
||||
)
|
||||
|
||||
log.info("ensure that remote_physical_size metric matches layer map")
|
||||
remote_physical_size_metric = ps_http.get_timeline_metric(
|
||||
tenant_id, timeline_id, "pageserver_remote_physical_size"
|
||||
)
|
||||
log.info("ensure that remote_physical_size metric corresponds to layer map dump")
|
||||
assert remote_physical_size_metric == sum(
|
||||
layer.layer_file_size or 0 for layer in info.historic_layers if layer.remote
|
||||
)
|
||||
|
||||
log.info("before runnning GC, ensure that remote_physical size is zero")
|
||||
ensure_resident_and_remote_size_metrics()
|
||||
|
||||
log.info("run GC")
|
||||
time.sleep(2) # let pitr_interval + 1 second pass
|
||||
ps_http.timeline_gc(tenant_id, timeline_id, 0)
|
||||
time.sleep(1)
|
||||
assert not env.pageserver.log_contains("Nothing to GC")
|
||||
|
||||
log.info("ensure GC deleted some layers, otherwise this test is pointless")
|
||||
post_gc_info = ps_http.layer_map_info(tenant_id, timeline_id)
|
||||
log.info("layer map dump: %s", post_gc_info)
|
||||
log.info("by kind: %s", post_gc_info.kind_count())
|
||||
pre_evict_layers = set([layer.layer_file_name for layer in pre_evict_info.historic_layers])
|
||||
post_gc_layers = set([layer.layer_file_name for layer in post_gc_info.historic_layers])
|
||||
assert post_gc_layers.issubset(pre_evict_layers)
|
||||
assert len(post_gc_layers) < len(pre_evict_layers)
|
||||
|
||||
log.info("update_gc_info might download some layers. Evict them again.")
|
||||
ps_http.evict_all_layers(tenant_id, timeline_id)
|
||||
|
||||
log.info("after running GC, ensure that resident size is still zero")
|
||||
ensure_resident_and_remote_size_metrics()
|
||||
|
||||
@@ -9,7 +9,6 @@ from typing import Iterator
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.neon_fixtures import (
|
||||
PSQL,
|
||||
NeonEnvBuilder,
|
||||
@@ -143,7 +142,7 @@ def test_metric_collection(
|
||||
|
||||
# Helper function that gets the number of given kind of remote ops from the metrics
|
||||
def get_num_remote_ops(file_kind: str, op_kind: str) -> int:
|
||||
ps_metrics = parse_metrics(env.pageserver.http_client().get_metrics(), "pageserver")
|
||||
ps_metrics = env.pageserver.http_client().get_metrics()
|
||||
total = 0.0
|
||||
for sample in ps_metrics.query_all(
|
||||
name="pageserver_remote_operation_seconds_count",
|
||||
|
||||
@@ -11,6 +11,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PageserverApiException,
|
||||
PageserverHttpClient,
|
||||
RemoteStorageKind,
|
||||
assert_tenant_status,
|
||||
available_remote_storages,
|
||||
@@ -25,9 +26,16 @@ from fixtures.types import Lsn
|
||||
from fixtures.utils import query_scalar
|
||||
|
||||
|
||||
def get_num_downloaded_layers(client, tenant_id, timeline_id):
|
||||
def get_num_downloaded_layers(client: PageserverHttpClient, tenant_id, timeline_id):
|
||||
value = client.get_metric_value(
|
||||
f'pageserver_remote_operation_seconds_count{{file_kind="layer",op_kind="download",status="success",tenant_id="{tenant_id}",timeline_id="{timeline_id}"}}'
|
||||
"pageserver_remote_operation_seconds_count",
|
||||
{
|
||||
"file_kind": "layer",
|
||||
"op_kind": "download",
|
||||
"status": "success",
|
||||
"tenant_id": tenant_id,
|
||||
"timeline_id": timeline_id,
|
||||
},
|
||||
)
|
||||
if value is None:
|
||||
return 0
|
||||
|
||||
@@ -6,7 +6,6 @@ from threading import Thread
|
||||
import asyncpg
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.metrics import parse_metrics
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
@@ -79,7 +78,7 @@ def test_tenant_reattach(
|
||||
".*failed to perform remote task UploadMetadata.*, will retry.*"
|
||||
)
|
||||
|
||||
ps_metrics = parse_metrics(pageserver_http.get_metrics(), "pageserver")
|
||||
ps_metrics = pageserver_http.get_metrics()
|
||||
tenant_metric_filter = {
|
||||
"tenant_id": str(tenant_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
@@ -93,7 +92,7 @@ def test_tenant_reattach(
|
||||
|
||||
time.sleep(1) # for metrics propagation
|
||||
|
||||
ps_metrics = parse_metrics(pageserver_http.get_metrics(), "pageserver")
|
||||
ps_metrics = pageserver_http.get_metrics()
|
||||
pageserver_last_record_lsn = int(
|
||||
ps_metrics.query_one("pageserver_last_record_lsn", filter=tenant_metric_filter).value
|
||||
)
|
||||
|
||||
@@ -3,8 +3,15 @@ from typing import List, Tuple
|
||||
|
||||
import pytest
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import NeonEnv, NeonEnvBuilder, wait_for_last_flush_lsn
|
||||
from fixtures.types import Lsn
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
PageserverHttpClient,
|
||||
Postgres,
|
||||
wait_for_last_flush_lsn,
|
||||
wait_for_wal_insert_lsn,
|
||||
)
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
@@ -324,7 +331,7 @@ def test_single_branch_get_tenant_size_grows(
|
||||
# inserts is larger than gc_horizon. for example 0x20000 here hid the fact
|
||||
# that there next_gc_cutoff could be smaller than initdb_lsn, which will
|
||||
# obviously lead to issues when calculating the size.
|
||||
gc_horizon = 0x30000
|
||||
gc_horizon = 0x38000
|
||||
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
@@ -334,29 +341,75 @@ def test_single_branch_get_tenant_size_grows(
|
||||
|
||||
http_client = env.pageserver.http_client()
|
||||
|
||||
collected_responses: List[Tuple[Lsn, int]] = []
|
||||
collected_responses: List[Tuple[str, Lsn, int]] = []
|
||||
|
||||
size_debug_file = open(test_output_dir / "size_debug.html", "w")
|
||||
|
||||
def check_size_change(current_lsn: Lsn, initdb_lsn: Lsn, gc_horizon: int, size: int, prev: int):
|
||||
if current_lsn - initdb_lsn > gc_horizon:
|
||||
def check_size_change(
|
||||
current_lsn: Lsn, initdb_lsn: Lsn, gc_horizon: int, size: int, prev_size: int
|
||||
):
|
||||
if current_lsn - initdb_lsn >= gc_horizon:
|
||||
assert (
|
||||
size >= prev
|
||||
size >= prev_size
|
||||
), "tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size"
|
||||
else:
|
||||
assert (
|
||||
size > prev
|
||||
size > prev_size
|
||||
), "tenant_size should grow, because we continue to add WAL to initial snapshot size"
|
||||
|
||||
with env.postgres.create_start(branch_name, tenant_id=tenant_id) as pg:
|
||||
initdb_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
def get_current_consistent_size(
|
||||
env: NeonEnv,
|
||||
pg: Postgres,
|
||||
size_debug_file, # apparently there is no public signature for open()...
|
||||
http_client: PageserverHttpClient,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Tuple[Lsn, int]:
|
||||
consistent = False
|
||||
size_debug = None
|
||||
|
||||
current_lsn = wait_for_wal_insert_lsn(env, pg, tenant_id, timeline_id)
|
||||
# We want to make sure we have a self-consistent set of values.
|
||||
# Size changes with WAL, so only if both before and after getting
|
||||
# the size of the tenant reports the same WAL insert LSN, we're OK
|
||||
# to use that (size, LSN) combination.
|
||||
# Note that 'wait_for_wal_flush_lsn' is not accurate enough: There
|
||||
# can be more wal after the flush LSN that can arrive on the
|
||||
# pageserver before we're requesting the page size.
|
||||
# Anyway, in general this is only one iteration, so in general
|
||||
# this is fine.
|
||||
while not consistent:
|
||||
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
|
||||
after_lsn = wait_for_wal_insert_lsn(env, pg, tenant_id, timeline_id)
|
||||
consistent = current_lsn == after_lsn
|
||||
current_lsn = after_lsn
|
||||
size_debug_file.write(size_debug)
|
||||
return (current_lsn, size)
|
||||
|
||||
with env.postgres.create_start(
|
||||
branch_name,
|
||||
tenant_id=tenant_id,
|
||||
### autovacuum is disabled to limit WAL logging.
|
||||
config_lines=["autovacuum=off"],
|
||||
) as pg:
|
||||
(initdb_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
collected_responses.append(("INITDB", initdb_lsn, size))
|
||||
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL)")
|
||||
cur.execute("CREATE TABLE t0 (i BIGINT NOT NULL) WITH (fillfactor = 40)")
|
||||
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
collected_responses.append(("CREATE", current_lsn, size))
|
||||
|
||||
batch_size = 100
|
||||
|
||||
i = 0
|
||||
while True:
|
||||
for i in range(3):
|
||||
with pg.cursor() as cur:
|
||||
cur.execute(
|
||||
f"INSERT INTO t0(i) SELECT i FROM generate_series({batch_size} * %s, ({batch_size} * (%s + 1)) - 1) s(i)",
|
||||
@@ -365,27 +418,24 @@ def test_single_branch_get_tenant_size_grows(
|
||||
|
||||
i += 1
|
||||
|
||||
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
prev_size = collected_responses[-1][2]
|
||||
if size == 0:
|
||||
assert prev_size == 0
|
||||
else:
|
||||
# branch start shouldn't be past gc_horizon yet
|
||||
# thus the size should grow as we insert more data
|
||||
# "gc_horizon" is tuned so that it kicks in _after_ the
|
||||
# insert phase, but before the update phase ends.
|
||||
assert (
|
||||
current_lsn - initdb_lsn <= gc_horizon
|
||||
), "Tuning of GC window is likely out-of-date"
|
||||
assert size > prev_size
|
||||
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
size_debug_file.write(size_debug)
|
||||
|
||||
if len(collected_responses) > 0:
|
||||
prev = collected_responses[-1][1]
|
||||
if size == 0:
|
||||
assert prev == 0
|
||||
else:
|
||||
# branch start shouldn't be past gc_horizon yet
|
||||
# thus the size should grow as we insert more data
|
||||
assert current_lsn - initdb_lsn <= gc_horizon
|
||||
assert size > prev
|
||||
|
||||
collected_responses.append((current_lsn, size))
|
||||
|
||||
if len(collected_responses) > 2:
|
||||
break
|
||||
collected_responses.append(("INSERT", current_lsn, size))
|
||||
|
||||
while True:
|
||||
with pg.cursor() as cur:
|
||||
@@ -397,18 +447,15 @@ def test_single_branch_get_tenant_size_grows(
|
||||
if updated == 0:
|
||||
break
|
||||
|
||||
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
size, sizes = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
prev_size = collected_responses[-1][2]
|
||||
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
size_debug_file.write(size_debug)
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
|
||||
|
||||
prev = collected_responses[-1][1]
|
||||
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
|
||||
|
||||
collected_responses.append((current_lsn, size))
|
||||
collected_responses.append(("UPDATE", current_lsn, size))
|
||||
|
||||
while True:
|
||||
with pg.cursor() as cur:
|
||||
@@ -418,40 +465,47 @@ def test_single_branch_get_tenant_size_grows(
|
||||
if deleted == 0:
|
||||
break
|
||||
|
||||
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
size = http_client.tenant_size(tenant_id)
|
||||
prev = collected_responses[-1][1]
|
||||
prev_size = collected_responses[-1][2]
|
||||
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
|
||||
|
||||
collected_responses.append((current_lsn, size))
|
||||
collected_responses.append(("DELETE", current_lsn, size))
|
||||
|
||||
with pg.cursor() as cur:
|
||||
cur.execute("DROP TABLE t0")
|
||||
|
||||
current_lsn = wait_for_last_flush_lsn(env, pg, tenant_id, timeline_id)
|
||||
# The size of the tenant should still be as large as before we dropped
|
||||
# the table, because the drop operation can still be undone in the PITR
|
||||
# defined by gc_horizon.
|
||||
(current_lsn, size) = get_current_consistent_size(
|
||||
env, pg, size_debug_file, http_client, tenant_id, timeline_id
|
||||
)
|
||||
|
||||
size = http_client.tenant_size(tenant_id)
|
||||
prev = collected_responses[-1][1]
|
||||
prev_size = collected_responses[-1][2]
|
||||
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev)
|
||||
check_size_change(current_lsn, initdb_lsn, gc_horizon, size, prev_size)
|
||||
|
||||
collected_responses.append((current_lsn, size))
|
||||
collected_responses.append(("DROP", current_lsn, size))
|
||||
|
||||
# this isn't too many lines to forget for a while. observed while
|
||||
# developing these tests that locally the value is a bit more than what we
|
||||
# get in the ci.
|
||||
for lsn, size in collected_responses:
|
||||
log.info(f"collected: {lsn}, {size}")
|
||||
for phase, lsn, size in collected_responses:
|
||||
log.info(f"collected: {phase}, {lsn}, {size}")
|
||||
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
size_after = http_client.tenant_size(tenant_id)
|
||||
size_debug = http_client.tenant_size_debug(tenant_id)
|
||||
size_debug_file.write(size_debug)
|
||||
size_debug_file.close()
|
||||
|
||||
size_after = http_client.tenant_size(tenant_id)
|
||||
prev = collected_responses[-1][1]
|
||||
prev = collected_responses[-1][2]
|
||||
|
||||
assert size_after == prev, "size after restarting pageserver should not have changed"
|
||||
|
||||
|
||||
@@ -50,16 +50,22 @@ def test_tenant_tasks(neon_env_builder: NeonEnvBuilder):
|
||||
wait_until(10, 0.2, lambda: assert_active(tenant_id))
|
||||
|
||||
# Assert that all tasks finish quickly after tenant is detached
|
||||
task_starts = client.get_metric_value('pageserver_tenant_task_events{event="start"}')
|
||||
task_starts = client.get_metric_value("pageserver_tenant_task_events_total", {"event": "start"})
|
||||
assert task_starts is not None
|
||||
assert int(task_starts) > 0
|
||||
client.tenant_detach(tenant)
|
||||
client.tenant_detach(env.initial_tenant)
|
||||
|
||||
def assert_tasks_finish():
|
||||
tasks_started = client.get_metric_value('pageserver_tenant_task_events{event="start"}')
|
||||
tasks_ended = client.get_metric_value('pageserver_tenant_task_events{event="stop"}')
|
||||
tasks_panicked = client.get_metric_value('pageserver_tenant_task_events{event="panic"}')
|
||||
tasks_started = client.get_metric_value(
|
||||
"pageserver_tenant_task_events_total", {"event": "start"}
|
||||
)
|
||||
tasks_ended = client.get_metric_value(
|
||||
"pageserver_tenant_task_events_total", {"event": "stop"}
|
||||
)
|
||||
tasks_panicked = client.get_metric_value(
|
||||
"pageserver_tenant_task_events_total", {"event": "panic"}
|
||||
)
|
||||
log.info(f"started {tasks_started}, ended {tasks_ended}, panicked {tasks_panicked}")
|
||||
assert tasks_started == tasks_ended
|
||||
assert tasks_panicked is None or int(tasks_panicked) == 0
|
||||
|
||||
@@ -107,7 +107,7 @@ def test_metrics_normal_work(neon_env_builder: NeonEnvBuilder):
|
||||
assert cur.fetchone() == (5000050000,)
|
||||
|
||||
collected_metrics = {
|
||||
"pageserver": env.pageserver.http_client().get_metrics(),
|
||||
"pageserver": env.pageserver.http_client().get_metrics_str(),
|
||||
}
|
||||
for sk in env.safekeepers:
|
||||
collected_metrics[f"safekeeper{sk.id}"] = sk.http_client().get_metrics_str()
|
||||
@@ -207,7 +207,7 @@ def test_pageserver_metrics_removed_after_detach(
|
||||
assert cur.fetchone() == (5000050000,)
|
||||
|
||||
def get_ps_metric_samples_for_tenant(tenant_id: TenantId) -> List[Sample]:
|
||||
ps_metrics = parse_metrics(env.pageserver.http_client().get_metrics(), "pageserver")
|
||||
ps_metrics = env.pageserver.http_client().get_metrics()
|
||||
samples = []
|
||||
for metric_name in ps_metrics.metrics:
|
||||
for sample in ps_metrics.query_all(
|
||||
@@ -307,7 +307,7 @@ def test_pageserver_with_empty_tenants(
|
||||
|
||||
time.sleep(1) # to allow metrics propagation
|
||||
|
||||
ps_metrics = parse_metrics(client.get_metrics(), "pageserver")
|
||||
ps_metrics = client.get_metrics()
|
||||
broken_tenants_metric_filter = {
|
||||
"tenant_id": str(tenant_without_timelines_dir),
|
||||
"state": "broken",
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import math
|
||||
import queue
|
||||
import random
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from contextlib import closing
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import psycopg2.errors
|
||||
import psycopg2.extras
|
||||
@@ -19,9 +19,11 @@ from fixtures.neon_fixtures import (
|
||||
PgBin,
|
||||
PortDistributor,
|
||||
Postgres,
|
||||
RemoteStorageKind,
|
||||
VanillaPostgres,
|
||||
assert_tenant_status,
|
||||
wait_for_last_flush_lsn,
|
||||
wait_for_upload_queue_empty,
|
||||
wait_until,
|
||||
)
|
||||
from fixtures.types import TenantId, TimelineId
|
||||
@@ -302,8 +304,18 @@ def test_timeline_initial_logical_size_calculation_cancellation(
|
||||
# message emitted by the code behind failpoint "timeline-calculate-logical-size-check-dir-exists"
|
||||
|
||||
|
||||
def test_timeline_physical_size_init(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
|
||||
def test_timeline_physical_size_init(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
|
||||
):
|
||||
|
||||
if remote_storage_kind is not None:
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind, "test_timeline_physical_size_init"
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_init")
|
||||
pg = env.postgres.create_start("test_timeline_physical_size_init")
|
||||
|
||||
@@ -331,12 +343,22 @@ def test_timeline_physical_size_init(neon_simple_env: NeonEnv):
|
||||
)
|
||||
|
||||
assert_physical_size_invariants(
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id)
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
|
||||
remote_storage_kind,
|
||||
)
|
||||
|
||||
|
||||
def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
|
||||
def test_timeline_physical_size_post_checkpoint(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
|
||||
):
|
||||
if remote_storage_kind is not None:
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind, "test_timeline_physical_size_init"
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
new_timeline_id = env.neon_cli.create_branch("test_timeline_physical_size_post_checkpoint")
|
||||
pg = env.postgres.create_start("test_timeline_physical_size_post_checkpoint")
|
||||
@@ -354,11 +376,21 @@ def test_timeline_physical_size_post_checkpoint(neon_simple_env: NeonEnv):
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
|
||||
|
||||
assert_physical_size_invariants(
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id)
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
|
||||
remote_storage_kind,
|
||||
)
|
||||
|
||||
|
||||
def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
|
||||
def test_timeline_physical_size_post_compaction(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
|
||||
):
|
||||
|
||||
if remote_storage_kind is not None:
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind, "test_timeline_physical_size_init"
|
||||
)
|
||||
|
||||
# Disable background compaction as we don't want it to happen after `get_physical_size` request
|
||||
# and before checking the expected size on disk, which makes the assertion failed
|
||||
neon_env_builder.pageserver_config_override = (
|
||||
@@ -387,15 +419,33 @@ def test_timeline_physical_size_post_compaction(neon_env_builder: NeonEnvBuilder
|
||||
)
|
||||
|
||||
wait_for_last_flush_lsn(env, pg, env.initial_tenant, new_timeline_id)
|
||||
|
||||
# shutdown safekeepers to prevent new data from coming in
|
||||
for sk in env.safekeepers:
|
||||
sk.stop()
|
||||
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
|
||||
pageserver_http.timeline_compact(env.initial_tenant, new_timeline_id)
|
||||
|
||||
if remote_storage_kind is not None:
|
||||
wait_for_upload_queue_empty(env.pageserver, env.initial_tenant, new_timeline_id)
|
||||
|
||||
assert_physical_size_invariants(
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id)
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
|
||||
remote_storage_kind,
|
||||
)
|
||||
|
||||
|
||||
def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
|
||||
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
|
||||
def test_timeline_physical_size_post_gc(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
|
||||
):
|
||||
|
||||
if remote_storage_kind is not None:
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind, "test_timeline_physical_size_init"
|
||||
)
|
||||
|
||||
# Disable background compaction and GC as we don't want it to happen after `get_physical_size` request
|
||||
# and before checking the expected size on disk, which makes the assertion failed
|
||||
neon_env_builder.pageserver_config_override = "tenant_config={checkpoint_distance=100000, compaction_period='0s', gc_period='0s', pitr_interval='1s'}"
|
||||
@@ -431,8 +481,12 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
|
||||
pageserver_http.timeline_checkpoint(env.initial_tenant, new_timeline_id)
|
||||
pageserver_http.timeline_gc(env.initial_tenant, new_timeline_id, gc_horizon=None)
|
||||
|
||||
if remote_storage_kind is not None:
|
||||
wait_for_upload_queue_empty(env.pageserver, env.initial_tenant, new_timeline_id)
|
||||
|
||||
assert_physical_size_invariants(
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id)
|
||||
get_physical_size_values(env, env.initial_tenant, new_timeline_id, remote_storage_kind),
|
||||
remote_storage_kind,
|
||||
)
|
||||
|
||||
|
||||
@@ -465,26 +519,26 @@ def test_timeline_size_metrics(
|
||||
|
||||
# get the metrics and parse the metric for the current timeline's physical size
|
||||
metrics = env.pageserver.http_client().get_metrics()
|
||||
matches = re.search(
|
||||
f'^pageserver_resident_physical_size{{tenant_id="{env.initial_tenant}",timeline_id="{new_timeline_id}"}} (\\S+)$',
|
||||
metrics,
|
||||
re.MULTILINE,
|
||||
)
|
||||
assert matches
|
||||
tl_physical_size_metric = int(matches.group(1))
|
||||
tl_physical_size_metric = metrics.query_one(
|
||||
name="pageserver_resident_physical_size",
|
||||
filter={
|
||||
"tenant_id": str(env.initial_tenant),
|
||||
"timeline_id": str(new_timeline_id),
|
||||
},
|
||||
).value
|
||||
|
||||
# assert that the physical size metric matches the actual physical size on disk
|
||||
timeline_path = env.timeline_dir(env.initial_tenant, new_timeline_id)
|
||||
assert tl_physical_size_metric == get_timeline_dir_size(timeline_path)
|
||||
|
||||
# Check that the logical size metric is sane, and matches
|
||||
matches = re.search(
|
||||
f'^pageserver_current_logical_size{{tenant_id="{env.initial_tenant}",timeline_id="{new_timeline_id}"}} (\\S+)$',
|
||||
metrics,
|
||||
re.MULTILINE,
|
||||
)
|
||||
assert matches
|
||||
tl_logical_size_metric = int(matches.group(1))
|
||||
tl_logical_size_metric = metrics.query_one(
|
||||
name="pageserver_current_logical_size",
|
||||
filter={
|
||||
"tenant_id": str(env.initial_tenant),
|
||||
"timeline_id": str(new_timeline_id),
|
||||
},
|
||||
).value
|
||||
|
||||
pgdatadir = test_output_dir / "pgdata-vanilla"
|
||||
pg_bin = PgBin(test_output_dir, pg_distrib_dir, pg_version)
|
||||
@@ -516,18 +570,29 @@ def test_timeline_size_metrics(
|
||||
assert math.isclose(dbsize_sum, tl_logical_size_metric, abs_tol=2 * 1024 * 1024)
|
||||
|
||||
|
||||
def test_tenant_physical_size(neon_simple_env: NeonEnv):
|
||||
@pytest.mark.parametrize("remote_storage_kind", [None, RemoteStorageKind.LOCAL_FS])
|
||||
def test_tenant_physical_size(
|
||||
neon_env_builder: NeonEnvBuilder, remote_storage_kind: Optional[RemoteStorageKind]
|
||||
):
|
||||
random.seed(100)
|
||||
|
||||
env = neon_simple_env
|
||||
if remote_storage_kind is not None:
|
||||
neon_env_builder.enable_remote_storage(
|
||||
remote_storage_kind, "test_timeline_physical_size_init"
|
||||
)
|
||||
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
pageserver_http = env.pageserver.http_client()
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
tenant, timeline = env.neon_cli.create_tenant()
|
||||
if remote_storage_kind is not None:
|
||||
wait_for_upload_queue_empty(env.pageserver, tenant, timeline)
|
||||
|
||||
def get_timeline_resident_physical_size(timeline: TimelineId):
|
||||
sizes = get_physical_size_values(env, tenant, timeline)
|
||||
assert_physical_size_invariants(sizes)
|
||||
sizes = get_physical_size_values(env, tenant, timeline, remote_storage_kind)
|
||||
assert_physical_size_invariants(sizes, remote_storage_kind)
|
||||
return sizes.prometheus_resident_physical
|
||||
|
||||
timeline_total_resident_physical_size = get_timeline_resident_physical_size(timeline)
|
||||
@@ -547,6 +612,9 @@ def test_tenant_physical_size(neon_simple_env: NeonEnv):
|
||||
wait_for_last_flush_lsn(env, pg, tenant, timeline)
|
||||
pageserver_http.timeline_checkpoint(tenant, timeline)
|
||||
|
||||
if remote_storage_kind is not None:
|
||||
wait_for_upload_queue_empty(env.pageserver, tenant, timeline)
|
||||
|
||||
timeline_total_resident_physical_size += get_timeline_resident_physical_size(timeline)
|
||||
|
||||
pg.stop()
|
||||
@@ -564,21 +632,39 @@ def test_tenant_physical_size(neon_simple_env: NeonEnv):
|
||||
|
||||
class TimelinePhysicalSizeValues:
|
||||
api_current_physical: int
|
||||
prometheus_resident_physical: int
|
||||
prometheus_resident_physical: float
|
||||
prometheus_remote_physical: Optional[float] = None
|
||||
python_timelinedir_layerfiles_physical: int
|
||||
layer_map_file_size_sum: int
|
||||
|
||||
|
||||
def get_physical_size_values(
|
||||
env: NeonEnv, tenant_id: TenantId, timeline_id: TimelineId
|
||||
env: NeonEnv,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
remote_storage_kind: Optional[RemoteStorageKind],
|
||||
) -> TimelinePhysicalSizeValues:
|
||||
res = TimelinePhysicalSizeValues()
|
||||
|
||||
client = env.pageserver.http_client()
|
||||
|
||||
res.prometheus_resident_physical = client.get_timeline_metric(
|
||||
tenant_id, timeline_id, "pageserver_resident_physical_size"
|
||||
res.layer_map_file_size_sum = sum(
|
||||
layer.layer_file_size or 0
|
||||
for layer in client.layer_map_info(tenant_id, timeline_id).historic_layers
|
||||
)
|
||||
|
||||
metrics = client.get_metrics()
|
||||
metrics_filter = {"tenant_id": str(tenant_id), "timeline_id": str(timeline_id)}
|
||||
res.prometheus_resident_physical = metrics.query_one(
|
||||
"pageserver_resident_physical_size", metrics_filter
|
||||
).value
|
||||
if remote_storage_kind is not None:
|
||||
res.prometheus_remote_physical = metrics.query_one(
|
||||
"pageserver_remote_physical_size", metrics_filter
|
||||
).value
|
||||
else:
|
||||
res.prometheus_remote_physical = None
|
||||
|
||||
detail = client.timeline_detail(
|
||||
tenant_id, timeline_id, include_timeline_dir_layer_file_size_sum=True
|
||||
)
|
||||
@@ -590,11 +676,20 @@ def get_physical_size_values(
|
||||
return res
|
||||
|
||||
|
||||
def assert_physical_size_invariants(sizes: TimelinePhysicalSizeValues):
|
||||
def assert_physical_size_invariants(
|
||||
sizes: TimelinePhysicalSizeValues, remote_storage_kind: Optional[RemoteStorageKind]
|
||||
):
|
||||
# resident phyiscal size is defined as
|
||||
assert sizes.python_timelinedir_layerfiles_physical == sizes.prometheus_resident_physical
|
||||
assert sizes.python_timelinedir_layerfiles_physical == sizes.layer_map_file_size_sum
|
||||
|
||||
# we don't do layer eviction, so, all layers are resident
|
||||
assert sizes.api_current_physical == sizes.prometheus_resident_physical
|
||||
if remote_storage_kind is not None:
|
||||
assert sizes.prometheus_resident_physical == sizes.prometheus_remote_physical
|
||||
# XXX would be nice to assert layer file physical storage utilization here as well, but we can only do that for LocalFS
|
||||
else:
|
||||
assert sizes.prometheus_remote_physical is None
|
||||
|
||||
|
||||
# Timeline logical size initialization is an asynchronous background task that runs once,
|
||||
|
||||
34
test_runner/regress/test_unlogged.py
Normal file
34
test_runner/regress/test_unlogged.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from fixtures.neon_fixtures import NeonEnv, fork_at_current_lsn
|
||||
|
||||
|
||||
#
|
||||
# Test UNLOGGED tables/relations. Postgres copies init fork contents to main
|
||||
# fork to reset them during recovery. In Neon, pageserver directly sends init
|
||||
# fork contents as main fork during basebackup.
|
||||
#
|
||||
def test_unlogged(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
env.neon_cli.create_branch("test_unlogged", "empty")
|
||||
pg = env.postgres.create_start("test_unlogged")
|
||||
|
||||
conn = pg.connect()
|
||||
cur = conn.cursor()
|
||||
|
||||
cur.execute("CREATE UNLOGGED TABLE iut (id int);")
|
||||
# create index to test unlogged index relation as well
|
||||
cur.execute("CREATE UNIQUE INDEX iut_idx ON iut (id);")
|
||||
cur.execute("INSERT INTO iut values (42);")
|
||||
|
||||
# create another compute to fetch inital empty contents from pageserver
|
||||
fork_at_current_lsn(env, pg, "test_unlogged_basebackup", "test_unlogged")
|
||||
pg2 = env.postgres.create_start(
|
||||
"test_unlogged_basebackup",
|
||||
)
|
||||
|
||||
conn2 = pg2.connect()
|
||||
cur2 = conn2.cursor()
|
||||
# after restart table should be empty but valid
|
||||
cur2.execute("PREPARE iut_plan (int) AS INSERT INTO iut VALUES ($1)")
|
||||
cur2.execute("EXECUTE iut_plan (43);")
|
||||
cur2.execute("SELECT * FROM iut")
|
||||
assert cur2.fetchall() == [(43,)]
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: f210ac524b...5fb2e0bba0
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 33f9763454...919851e781
Reference in New Issue
Block a user