Compare commits

..

20 Commits

Author SHA1 Message Date
Christian Schwarz
9742e253ee pagebench: add a 'basebackup' benchmark 2023-12-13 13:54:56 +00:00
Christian Schwarz
e253101727 pagebench: scaffold 2023-12-13 13:34:14 +00:00
Christian Schwarz
96ab3d67a7 pageserver client for mgmt_api and page_service 2023-12-13 13:31:01 +00:00
Christian Schwarz
4fb1a7a35a debug spans 2023-12-13 13:17:01 +00:00
Christian Schwarz
800e0802ea implement tracing_chrome & tracing_flame support for utils::logging 2023-12-13 13:03:10 +00:00
Anastasia Lubennikova
2a12e9c46b Add documentation for our sample pre-commit hook (#5868) 2023-11-22 12:04:36 +00:00
Christian Schwarz
9e3c07611c logging: support output to stderr (#5896)
(part of the getpage benchmarking epic #5771)

The plan is to make the benchmarking tool log on stderr and emit results
as JSON on stdout. That way, the test suite can simply take captures
stdout and json.loads() it, while interactive users of the benchmarking
tool have a reasonable experience as well.

Existing logging users continue to print to stdout, so, this change
should be a no-op functionally and performance-wise.
2023-11-22 11:08:35 +00:00
Christian Schwarz
d353fa1998 refer to our rust-postgres.git fork by branch name (#5894)
This way, `cargo update -p tokio-postgres` just works. The `Cargo.toml`
communicates more clearly that we're referring to the `main` branch. And
the git revision is still pinned in `Cargo.lock`.
2023-11-22 10:58:27 +00:00
Joonas Koivunen
0d10992e46 Cleanup compact_level0_phase1 fsyncing (#5852)
While reviewing code noticed a scary `layer_paths.pop().unwrap()` then
realized this should be further asyncified, something I forgot to do
when I switched the `compact_level0_phase1` back to async in #4938.

This keeps the double-fsync for new deltas as #4749 is still unsolved.
2023-11-21 15:30:40 +02:00
Arpad Müller
3e131bb3d7 Update Rust to 1.74.0 (#5873)
[Release notes](https://github.com/rust-lang/rust/releases/tag/1.74.0).
2023-11-21 11:41:41 +01:00
Sasha Krassovsky
81b2cefe10 Disallow CREATE DATABASE WITH OWNER neon_superuser (#5887)
## Problem
Currently, control plane doesn't know about neon_superuser, so if a user
creates a database with owner neon_superuser it causes an exception when
it tries to forward it. It is also currently possible to ALTER ROLE
neon_superuser.

## Summary of changes
Disallow creating database with owner neon_superuser. This is probably
fine, since I don't think you can create a database with owner normal
superuser. Also forbids altering neon_superuser
2023-11-20 22:39:47 +00:00
Christian Schwarz
d2ca410919 build: back to opt-level=0 in debug builds, for faster compile times (#5751)
This change brings down incremental compilation for me
from > 1min to 10s (and this is a pretty old Ryzen 1700X).

More details: "incremental compilation" here means to change one
character
in the `failed to read value from offset` string in `image_layer.rs`.
The command for incremental compilation is `cargo build_testing`.
The system on which I got these numbers uses `mold` via
`~/.cargo/config.toml`.

As a bonus, `rust-gdb` is now at least a little fun again.

Some tests are timing out in debug builds due to these changes.
This PR makes them skip for debug builds.
We run both with debug and release build, so, the loss of coverage is
marginal.

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2023-11-20 15:41:37 +01:00
Joonas Koivunen
d98ac04136 chore(background_tasks): missed allowed_error change, logging change (#5883)
- I am always confused by the log for the error wait time, now it will
be `2s` or `2.0s` not `2.0`
- fix missed string change introduced in #5881 [evidence]

[evidence]:
https://neon-github-public-dev.s3.amazonaws.com/reports/main/6921062837/index.html#suites/f9eba3cfdb71aa6e2b54f6466222829b/87897fe1ddee3825
2023-11-20 07:33:17 +00:00
Joonas Koivunen
ac08072d2e fix(layer): VirtualFile opening and read errors can be caused by contention (#5880)
A very low number of layer loads have been marked wrongly as permanent,
as I did not remember that `VirtualFile::open` or reading could fail
transiently for contention. Return separate errors for transient and
persistent errors from `{Delta,Image}LayerInner::load`.

Includes drive-by comment changes.

The implementation looks quite ugly because having the same type be both
the inner (operation error) and outer (critical error), but with the
alternatives I tried I did not find a better way.
2023-11-19 14:57:39 +00:00
John Spray
d22dce2e31 pageserver: shut down idle walredo processes (#5877)
The longer a pageserver runs, the more walredo processes it accumulates
from tenants that are touched intermittently (e.g. by availability
checks). This can lead to getting OOM killed.

Changes:
- Add an Instant recording the last use of the walredo process for a
tenant
- After compaction iteration in the background task, check for idleness
and stop the walredo process if idle for more than 10x compaction
period.

Cc: #3620

Co-authored-by: Joonas Koivunen <joonas@neon.tech>
Co-authored-by: Shany Pozin <shany@neon.tech>
2023-11-19 14:21:16 +00:00
Joonas Koivunen
3b3f040be3 fix(background_tasks): first backoff, compaction error stacktraces (#5881)
First compaction/gc error backoff starts from 0 which is less than 2s
what it was before #5672. This is now fixed to be the intended 2**n.

Additionally noticed the `compaction_iteration` creating an
`anyhow::Error` via `into()` always captures a stacktrace even if we had
a stacktraceful anyhow error within the CompactionError because there is
no stable api for querying that.
2023-11-19 14:16:31 +00:00
Em Sharnoff
cad0dca4b8 compute_ctl: Remove deprecated flag --file-cache-on-disk (#5622)
See neondatabase/cloud#7516 for more.
2023-11-18 12:43:54 +01:00
Em Sharnoff
5d13a2e426 Improve error message when neon.max_cluster_size reached (#4173)
Changes the error message encountered when the `neon.max_cluster_size`
limit is reached. Reasoning is that this is user-visible, and so should
*probably* use language that's closer to what users are familiar with.
2023-11-16 21:51:26 +00:00
khanova
0c243faf96 Proxy log pid hack (#5869)
## Problem

Improve observability for the compute node.

## Summary of changes

Log pid from the compute node. Doesn't work with pgbouncer.
2023-11-16 20:46:23 +00:00
Em Sharnoff
d0a842a509 Update vm-builder to v0.19.0 and move its customization here (#5783)
ref neondatabase/autoscaling#600 for more
2023-11-16 18:17:42 +01:00
61 changed files with 1527 additions and 314 deletions

View File

@@ -1,17 +1,3 @@
# The binaries are really slow, if you compile them in 'dev' mode with the defaults.
# Enable some optimizations even in 'dev' mode, to make tests faster. The basic
# optimizations enabled by "opt-level=1" don't affect debuggability too much.
#
# See https://www.reddit.com/r/rust/comments/gvrgca/this_is_a_neat_trick_for_getting_good_runtime/
#
[profile.dev.package."*"]
# Set the default for dependencies in Development mode.
opt-level = 3
[profile.dev]
# Turn on a small amount of optimization in Development mode.
opt-level = 1
[build]
# This is only present for local builds, as it will be overridden
# by the RUSTDOCFLAGS env var in CI.

View File

@@ -9,6 +9,24 @@ refactoring, additional comments, and so forth. Let's try to raise the
bar, and clean things up as we go. Try to leave code in a better shape
than it was before.
## Pre-commit hook
We have a sample pre-commit hook in `pre-commit.py`.
To set it up, run:
```bash
ln -s ../../pre-commit.py .git/hooks/pre-commit
```
This will run following checks on staged files before each commit:
- `rustfmt`
- checks for python files, see [obligatory checks](/docs/sourcetree.md#obligatory-checks).
There is also a separate script `./run_clippy.sh` that runs `cargo clippy` on the whole project
and `./scripts/reformat` that runs all formatting tools to ensure the project is up to date.
If you want to skip the hook, run `git commit` with `--no-verify` option.
## Submitting changes
1. Get at least one +1 on your PR before you push.

85
Cargo.lock generated
View File

@@ -1955,6 +1955,20 @@ dependencies = [
"hashbrown 0.13.2",
]
[[package]]
name = "hdrhistogram"
version = "7.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "765c9198f173dd59ce26ff9f95ef0aafd0a0fe01fb9d72841bc5066a4c06511d"
dependencies = [
"base64 0.21.1",
"byteorder",
"crossbeam-channel",
"flate2",
"nom",
"num-traits",
]
[[package]]
name = "heapless"
version = "0.8.0"
@@ -2634,6 +2648,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num-bigint"
version = "0.4.3"
@@ -2894,6 +2918,31 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "pagebench"
version = "0.1.0"
dependencies = [
"anyhow",
"clap",
"futures",
"hdrhistogram",
"humantime",
"humantime-serde",
"pageserver",
"rand 0.8.5",
"serde",
"serde_json",
"tokio",
"tracing",
"utils",
]
[[package]]
name = "pagectl"
version = "0.1.0"
@@ -2979,6 +3028,7 @@ dependencies = [
"tokio",
"tokio-io-timeout",
"tokio-postgres",
"tokio-stream",
"tokio-tar",
"tokio-util",
"toml_edit",
@@ -3221,7 +3271,7 @@ dependencies = [
[[package]]
name = "postgres"
version = "0.19.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#988d0ddb4184c408fa7fc1bd0ecca7993c02978f"
dependencies = [
"bytes",
"fallible-iterator",
@@ -3234,7 +3284,7 @@ dependencies = [
[[package]]
name = "postgres-native-tls"
version = "0.5.0"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#988d0ddb4184c408fa7fc1bd0ecca7993c02978f"
dependencies = [
"native-tls",
"tokio",
@@ -3245,7 +3295,7 @@ dependencies = [
[[package]]
name = "postgres-protocol"
version = "0.6.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#988d0ddb4184c408fa7fc1bd0ecca7993c02978f"
dependencies = [
"base64 0.20.0",
"byteorder",
@@ -3263,7 +3313,7 @@ dependencies = [
[[package]]
name = "postgres-types"
version = "0.2.4"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#988d0ddb4184c408fa7fc1bd0ecca7993c02978f"
dependencies = [
"bytes",
"fallible-iterator",
@@ -4933,7 +4983,7 @@ dependencies = [
[[package]]
name = "tokio-postgres"
version = "0.7.7"
source = "git+https://github.com/neondatabase/rust-postgres.git?rev=ce7260db5998fe27167da42503905a12e7ad9048#ce7260db5998fe27167da42503905a12e7ad9048"
source = "git+https://github.com/neondatabase/rust-postgres.git?branch=neon#988d0ddb4184c408fa7fc1bd0ecca7993c02978f"
dependencies = [
"async-trait",
"byteorder",
@@ -5218,6 +5268,17 @@ dependencies = [
"syn 2.0.28",
]
[[package]]
name = "tracing-chrome"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "496b3cd5447f7ff527bbbf19b071ad542a000adf297d4127078b4dfdb931f41a"
dependencies = [
"serde_json",
"tracing-core",
"tracing-subscriber",
]
[[package]]
name = "tracing-core"
version = "0.1.31"
@@ -5238,6 +5299,17 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "tracing-flame"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bae117ee14789185e129aaee5d93750abe67fdc5a9a62650452bfe4e122a3a9"
dependencies = [
"lazy_static",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
@@ -5290,6 +5362,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
@@ -5504,7 +5577,9 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tracing",
"tracing-chrome",
"tracing-error",
"tracing-flame",
"tracing-subscriber",
"url",
"uuid",

View File

@@ -5,6 +5,7 @@ members = [
"control_plane",
"pageserver",
"pageserver/ctl",
"pageserver/pagebench",
"proxy",
"safekeeper",
"storage_broker",
@@ -79,6 +80,7 @@ futures-util = "0.3"
git-version = "0.3"
hashbrown = "0.13"
hashlink = "0.8.1"
hdrhistogram = "7.5.2"
hex = "0.4"
hex-literal = "0.4"
hmac = "0.12.1"
@@ -165,11 +167,11 @@ env_logger = "0.10"
log = "0.4"
## Libraries from neondatabase/ git forks, ideally with changes to be upstreamed
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-native-tls = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-protocol = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
postgres-types = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
## Other git libraries
heapless = { default-features=false, features=[], git = "https://github.com/japaric/heapless.git", rev = "644653bf3b831c6bb4963be2de24804acf5e5001" } # upstream release pending
@@ -206,7 +208,7 @@ tonic-build = "0.9"
# This is only needed for proxy's tests.
# TODO: we should probably fork `tokio-postgres-rustls` instead.
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", rev="ce7260db5998fe27167da42503905a12e7ad9048" }
tokio-postgres = { git = "https://github.com/neondatabase/rust-postgres.git", branch="neon" }
################# Binary contents sections

View File

@@ -714,23 +714,6 @@ RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.3.tar.gz -
cargo pgrx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/ulid.control
#########################################################################################
#
# Layer "pg-wait-sampling-pg-build"
# compile pg_wait_sampling extension
#
#########################################################################################
FROM build-deps AS pg-wait-sampling-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/postgrespro/pg_wait_sampling/archive/refs/tags/v1.1.5.tar.gz -O pg_wait_sampling.tar.gz && \
echo 'a03da6a413f5652ce470a3635ed6ebba528c74cb26aa4cfced8aff8a8441f81ec6dd657ff62cd6ce96a4e6ce02cad9f2519ae9525367ece60497aa20faafde5c pg_wait_sampling.tar.gz' | sha512sum -c && \
mkdir pg_wait_sampling-src && cd pg_wait_sampling-src && tar xvzf ../pg_wait_sampling.tar.gz --strip-components=1 -C . && \
make USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN) && \
make USE_PGXS=1 -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_wait_sampling.control
#########################################################################################
#
# Layer "neon-pg-ext-build"
@@ -767,7 +750,6 @@ COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-embedding-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-wait-sampling-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \

View File

@@ -479,13 +479,6 @@ fn cli() -> clap::Command {
)
.value_name("FILECACHE_CONNSTR"),
)
.arg(
// DEPRECATED, NO LONGER DOES ANYTHING.
// See https://github.com/neondatabase/cloud/issues/7516
Arg::new("file-cache-on-disk")
.long("file-cache-on-disk")
.action(clap::ArgAction::SetTrue),
)
}
#[test]

View File

@@ -86,7 +86,10 @@ where
.stdout(process_log_file)
.stderr(same_file_for_stderr)
.args(args);
let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command));
let filled_cmd = fill_env_vars_prefixed_neon(fill_remote_storage_secrets_vars(
fill_rust_env_vars(background_command),
));
filled_cmd.envs(envs);
let pid_file_to_check = match initial_pid_file {
@@ -253,6 +256,15 @@ fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command {
cmd
}
fn fill_env_vars_prefixed_neon(mut cmd: &mut Command) -> &mut Command {
for (var, val) in std::env::vars() {
if var.starts_with("NEON_") {
cmd = cmd.env(var, val);
}
}
cmd
}
/// Add a `pre_exec` to the cmd that, inbetween fork() and exec(),
/// 1. Claims a pidfile with a fcntl lock on it and
/// 2. Sets up the pidfile's file descriptor so that it (and the lock)

View File

@@ -283,9 +283,10 @@ fn make_router(persistent_state: PersistentState) -> RouterBuilder<hyper::Body,
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::init(
let _guard = logging::init(
LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
let args = Cli::parse();

View File

@@ -140,3 +140,35 @@ impl Key {
})
}
}
impl std::str::FromStr for Key {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
Self::from_hex(s)
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use crate::key::Key;
#[test]
fn display_fromstr_bijection() {
let mut rng = rand::thread_rng();
use rand::Rng;
let key = Key {
field1: rng.gen(),
field2: rng.gen(),
field3: rng.gen(),
field4: rng.gen(),
field5: rng.gen(),
field6: rng.gen(),
};
assert_eq!(key, Key::from_str(&format!("{key}")).unwrap());
}
}

View File

@@ -18,7 +18,7 @@ use utils::{
use crate::{reltag::RelTag, shard::TenantShardId};
use anyhow::bail;
use bytes::{BufMut, Bytes, BytesMut};
use bytes::{Buf, BufMut, Bytes, BytesMut};
/// The state of a tenant in this pageserver.
///
@@ -767,6 +767,36 @@ impl PagestreamBeMessage {
bytes.into()
}
pub fn deserialize(buf: Bytes) -> anyhow::Result<Self> {
let mut buf = buf.reader();
let msg_tag = buf.read_u8()?;
match msg_tag {
100 => todo!(),
101 => todo!(),
102 => {
let buf = buf.get_ref();
/* TODO use constant */
if buf.len() == 8192 {
Ok(PagestreamBeMessage::GetPage(PagestreamGetPageResponse {
page: buf.clone(),
}))
} else {
anyhow::bail!("invalid page size: {}", buf.len());
}
}
103 => {
let buf = buf.get_ref();
let cstr = std::ffi::CStr::from_bytes_until_nul(buf)?;
let rust_str = cstr.to_str()?;
Ok(PagestreamBeMessage::Error(PagestreamErrorResponse {
message: rust_str.to_owned(),
}))
}
104 => todo!(),
_ => bail!("unknown tag: {:?}", msg_tag),
}
}
}
#[cfg(test)]

View File

@@ -278,9 +278,10 @@ async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Res
fn ensure_logging_ready() {
LOGGING_DONE.get_or_init(|| {
utils::logging::init(
let _ = utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::Output::Stdout,
)
.expect("logging init failed");
});

View File

@@ -207,9 +207,10 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
fn ensure_logging_ready() {
LOGGING_DONE.get_or_init(|| {
utils::logging::init(
let _ = utils::logging::init(
utils::logging::LogFormat::Test,
utils::logging::TracingErrorLayerEnablement::Disabled,
utils::logging::Output::Stdout,
)
.expect("logging init failed");
});

View File

@@ -49,6 +49,8 @@ const_format.workspace = true
# to use tokio channels as streams, this is faster to compile than async_stream
# why is it only here? no other crate should use it, streams are rarely needed.
tokio-stream = { version = "0.1.14" }
tracing-chrome = "0.7.1"
tracing-flame = "0.2.0"
[dev-dependencies]
byteorder.workspace = true

View File

@@ -1,4 +1,4 @@
use std::str::FromStr;
use std::{io::BufWriter, str::FromStr};
use anyhow::Context;
use once_cell::sync::Lazy;
@@ -66,10 +66,25 @@ pub enum TracingErrorLayerEnablement {
EnableWithRustLogFilter,
}
/// Where the logging should output to.
#[derive(Clone, Copy)]
pub enum Output {
Stdout,
Stderr,
}
/// Keep alive and drop it before the program terminates.
#[must_use]
pub struct FlushGuard {
_tracing_chrome_layer: Option<tracing_chrome::FlushGuard>,
_tracing_flame_layer: Option<tracing_flame::FlushGuard<BufWriter<std::fs::File>>>,
}
pub fn init(
log_format: LogFormat,
tracing_error_layer_enablement: TracingErrorLayerEnablement,
) -> anyhow::Result<()> {
output: Output,
) -> anyhow::Result<FlushGuard> {
// We fall back to printing all spans at info-level or above if
// the RUST_LOG environment variable is not set.
let rust_log_env_filter = || {
@@ -77,15 +92,60 @@ pub fn init(
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"))
};
// WIP: lift it up as an argument
let enable_tracing_chrome = match std::env::var("NEON_PAGESERVER_ENABLE_TRACING_CHROME") {
Ok(s) if s != "0" => true,
Ok(_s) => false,
Err(std::env::VarError::NotPresent) => false,
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var NEON_PAGESERVER_ENABLE_TRACING_CHROME not unicode")
}
};
// WIP: lift it up as an argument
let enable_tracing_flame = match std::env::var("NEON_PAGESERVER_ENABLE_TRACING_FLAME") {
Ok(s) if s != "0" => true,
Ok(_s) => false,
Err(std::env::VarError::NotPresent) => false,
Err(std::env::VarError::NotUnicode(_)) => {
panic!("env var NEON_PAGESERVER_ENABLE_TRACING_FLAME not unicode")
}
};
// NB: the order of the with() calls does not matter.
// See https://docs.rs/tracing-subscriber/0.3.16/tracing_subscriber/layer/index.html#per-layer-filtering
use tracing_subscriber::prelude::*;
let r = tracing_subscriber::registry();
let r = r.with({
// https://users.rust-lang.org/t/how-can-i-init-tracing-registry-dynamically-with-multiple-outputs/94307/6
#[derive(Default)]
struct LayerStack {
layers:
Option<Box<dyn tracing_subscriber::Layer<tracing_subscriber::Registry> + Sync + Send>>,
}
impl LayerStack {
fn add_layer<L>(&mut self, new_layer: L)
where
L: tracing_subscriber::Layer<tracing_subscriber::Registry> + Send + Sync,
{
let new = match self.layers.take() {
Some(layers) => Some(layers.and_then(new_layer).boxed()),
None => Some(new_layer.boxed()),
};
self.layers = new;
}
}
let mut layers = LayerStack::default();
layers.add_layer({
let log_layer = tracing_subscriber::fmt::layer()
.with_target(false)
.with_ansi(false)
.with_writer(std::io::stdout);
.with_writer(move || -> Box<dyn std::io::Write> {
match output {
Output::Stdout => Box::new(std::io::stdout()),
Output::Stderr => Box::new(std::io::stderr()),
}
});
let log_layer = match log_format {
LogFormat::Json => log_layer.json().boxed(),
LogFormat::Plain => log_layer.boxed(),
@@ -93,15 +153,47 @@ pub fn init(
};
log_layer.with_filter(rust_log_env_filter())
});
let r = r.with(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
layers
.add_layer(TracingEventCountLayer(&TRACING_EVENT_COUNT).with_filter(rust_log_env_filter()));
let tracing_chrome_layer_flush_guard = if enable_tracing_chrome {
let (layer, guard) = tracing_chrome::ChromeLayerBuilder::new()
.trace_style(tracing_chrome::TraceStyle::Async)
.build();
layers.add_layer(layer.with_filter(rust_log_env_filter()));
Some(guard)
} else {
None
};
let tracing_flame_flush_guard = if enable_tracing_flame {
let (layer, guard) = tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
let layer = layer
.with_empty_samples(false)
.with_module_path(false)
.with_file_and_line(false)
.with_threads_collapsed(true);
layers.add_layer(layer.with_filter(rust_log_env_filter()));
Some(guard)
} else {
None
};
match tracing_error_layer_enablement {
TracingErrorLayerEnablement::EnableWithRustLogFilter => r
.with(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter()))
.init(),
TracingErrorLayerEnablement::Disabled => r.init(),
TracingErrorLayerEnablement::EnableWithRustLogFilter => layers
.add_layer(tracing_error::ErrorLayer::default().with_filter(rust_log_env_filter())),
TracingErrorLayerEnablement::Disabled => (),
}
Ok(())
let r = tracing_subscriber::registry();
r.with(layers.layers.expect("we add at least one layer"))
.init();
Ok(FlushGuard {
_tracing_chrome_layer: tracing_chrome_layer_flush_guard,
_tracing_flame_layer: tracing_flame_flush_guard,
})
}
/// Disable the default rust panic hook by using `set_hook`.

View File

@@ -366,6 +366,47 @@ impl MonotonicCounter<Lsn> for RecordLsn {
}
}
/// Implements [`rand::distributions::uniform::UniformSampler`] so we can sample [`Lsn`]s.
pub struct LsnSampler(<u64 as rand::distributions::uniform::SampleUniform>::Sampler);
impl rand::distributions::uniform::SampleUniform for Lsn {
type Sampler = LsnSampler;
}
impl rand::distributions::uniform::UniformSampler for LsnSampler {
type X = Lsn;
fn new<B1, B2>(low: B1, high: B2) -> Self
where
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
{
Self(
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new(
low.borrow().0,
high.borrow().0,
),
)
}
fn new_inclusive<B1, B2>(low: B1, high: B2) -> Self
where
B1: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
B2: rand::distributions::uniform::SampleBorrow<Self::X> + Sized,
{
Self(
<u64 as rand::distributions::uniform::SampleUniform>::Sampler::new_inclusive(
low.borrow().0,
high.borrow().0,
),
)
}
fn sample<R: rand::prelude::Rng + ?Sized>(&self, rng: &mut R) -> Self::X {
Lsn(self.0.sample(rng))
}
}
#[cfg(test)]
mod tests {
use crate::bin_ser::BeSer;

View File

@@ -61,6 +61,7 @@ thiserror.workspace = true
tokio = { workspace = true, features = ["process", "sync", "fs", "rt", "io-util", "time"] }
tokio-io-timeout.workspace = true
tokio-postgres.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tracing.workspace = true

View File

@@ -0,0 +1,22 @@
[package]
name = "pagebench"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow.workspace = true
clap.workspace = true
futures.workspace = true
hdrhistogram.workspace = true
humantime.workspace = true
humantime-serde.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tokio.workspace = true
pageserver = { path = ".." }
utils = { path = "../../libs/utils/" }

View File

@@ -0,0 +1,403 @@
use anyhow::Context;
use pageserver::client::page_service::BasebackupRequest;
use utils::lsn::Lsn;
use rand::prelude::*;
use tokio::sync::Barrier;
use tokio::task::JoinSet;
use tracing::{debug, info, instrument};
use utils::id::TenantId;
use utils::logging;
use std::cell::RefCell;
use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::ops::Range;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use crate::util::tenant_timeline_id::TenantTimelineId;
/// basebackup@LatestLSN
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "localhost:64000")]
page_service_host_port: String,
#[clap(long)]
pageserver_jwt: Option<String>,
#[clap(long, default_value = "1")]
num_clients: NonZeroUsize,
#[clap(long, default_value = "1.0")]
gzip_probability: f64,
#[clap(long)]
runtime: Option<humantime::Duration>,
targets: Option<Vec<TenantTimelineId>>,
}
#[derive(Debug, Default)]
struct LiveStats {
completed_requests: AtomicU64,
}
impl LiveStats {
fn inc(&self) {
self.completed_requests.fetch_add(1, Ordering::Relaxed);
}
}
#[derive(serde::Serialize)]
struct Output {
total: PerTaskOutput,
}
const LATENCY_PERCENTILES: [f64; 4] = [95.0, 99.00, 99.90, 99.99];
struct LatencyPercentiles {
latency_percentiles: [Duration; 4],
}
impl serde::Serialize for LatencyPercentiles {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut ser = serializer.serialize_map(Some(LATENCY_PERCENTILES.len()))?;
for p in LATENCY_PERCENTILES {
ser.serialize_entry(
&format!("p{p}"),
&format!(
"{}",
&humantime::format_duration(self.latency_percentiles[0])
),
)?;
}
ser.end()
}
}
#[derive(serde::Serialize)]
struct PerTaskOutput {
request_count: u64,
#[serde(with = "humantime_serde")]
latency_mean: Duration,
latency_percentiles: LatencyPercentiles,
}
struct ThreadLocalStats {
latency_histo: hdrhistogram::Histogram<u64>,
}
impl ThreadLocalStats {
fn new() -> Self {
Self {
// Initialize with fixed bounds so that we panic at runtime instead of resizing the histogram,
// which would skew the benchmark results.
latency_histo: hdrhistogram::Histogram::new_with_bounds(1, 1_000_000_000, 3).unwrap(),
}
}
fn observe(&mut self, latency: Duration) -> anyhow::Result<()> {
let micros: u64 = latency
.as_micros()
.try_into()
.context("latency greater than u64")?;
self.latency_histo
.record(micros)
.context("add to histogram")?;
Ok(())
}
fn output(&self) -> PerTaskOutput {
let latency_percentiles = std::array::from_fn(|idx| {
let micros = self
.latency_histo
.value_at_percentile(LATENCY_PERCENTILES[idx]);
Duration::from_micros(micros)
});
PerTaskOutput {
request_count: self.latency_histo.len(),
latency_mean: Duration::from_micros(self.latency_histo.mean() as u64),
latency_percentiles: LatencyPercentiles {
latency_percentiles,
},
}
}
fn add(&mut self, other: &Self) {
let Self {
ref mut latency_histo,
} = self;
latency_histo.add(&other.latency_histo).unwrap();
}
}
thread_local! {
pub static STATS: RefCell<Arc<Mutex<ThreadLocalStats>>> = std::cell::RefCell::new(
Arc::new(Mutex::new(ThreadLocalStats::new()))
);
}
pub(crate) fn main(args: Args) -> anyhow::Result<()> {
let _guard = logging::init(
logging::LogFormat::Plain,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stderr,
)
.unwrap();
let thread_local_stats = Arc::new(Mutex::new(Vec::new()));
let rt = tokio::runtime::Builder::new_multi_thread()
.on_thread_start({
let thread_local_stats = Arc::clone(&thread_local_stats);
move || {
// pre-initialize the histograms
STATS.with(|stats| {
let stats: Arc<_> = Arc::clone(&*stats.borrow());
thread_local_stats.lock().unwrap().push(stats);
});
}
})
.enable_all()
.build()
.unwrap();
let main_task = rt.spawn(main_impl(args, thread_local_stats));
rt.block_on(main_task).unwrap()
}
struct Target {
timeline: TenantTimelineId,
lsn_range: Option<Range<Lsn>>,
}
async fn main_impl(
args: Args,
thread_local_stats: Arc<Mutex<Vec<Arc<Mutex<ThreadLocalStats>>>>>,
) -> anyhow::Result<()> {
let args: &'static Args = Box::leak(Box::new(args));
let mgmt_api_client = Arc::new(pageserver::client::mgmt_api::Client::new(
args.mgmt_api_endpoint.clone(),
args.pageserver_jwt.as_deref(),
));
// discover targets
let mut timelines: Vec<TenantTimelineId> = Vec::new();
if args.targets.is_some() {
timelines = args.targets.clone().unwrap();
} else {
let tenants: Vec<TenantId> = mgmt_api_client
.list_tenants()
.await?
.into_iter()
.map(|ti| ti.id)
.collect();
let mut js = JoinSet::new();
for tenant_id in tenants {
js.spawn({
let mgmt_api_client = Arc::clone(&mgmt_api_client);
async move {
(
tenant_id,
mgmt_api_client.list_timelines(tenant_id).await.unwrap(),
)
}
});
}
while let Some(res) = js.join_next().await {
let (tenant_id, tl_infos) = res.unwrap();
for tl in tl_infos {
timelines.push(TenantTimelineId {
tenant_id,
timeline_id: tl.timeline_id,
});
}
}
}
info!("timelines:\n{:?}", timelines);
let mut js = JoinSet::new();
for timeline in &timelines {
js.spawn({
let timeline = *timeline;
let info = mgmt_api_client
.timeline_info(timeline.tenant_id, timeline.timeline_id)
.await
.unwrap();
async move {
anyhow::Ok(Target {
timeline,
// TODO: lsn_range != latest LSN
lsn_range: Some(info.last_record_lsn..(info.last_record_lsn + 1)),
})
}
});
}
let mut all_targets: Vec<Target> = Vec::new();
while let Some(res) = js.join_next().await {
all_targets.push(res.unwrap().unwrap());
}
let live_stats = Arc::new(LiveStats::default());
let num_client_tasks = timelines.len();
let num_live_stats_dump = 1;
let num_work_sender_tasks = 1;
let start_work_barrier = Arc::new(tokio::sync::Barrier::new(
num_client_tasks + num_live_stats_dump + num_work_sender_tasks,
));
let all_work_done_barrier = Arc::new(tokio::sync::Barrier::new(num_client_tasks));
tokio::spawn({
let stats = Arc::clone(&live_stats);
let start_work_barrier = Arc::clone(&start_work_barrier);
async move {
start_work_barrier.wait().await;
loop {
let start = std::time::Instant::now();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let completed_requests = stats.completed_requests.swap(0, Ordering::Relaxed);
let elapsed = start.elapsed();
info!(
"RPS: {:.0}",
completed_requests as f64 / elapsed.as_secs_f64()
);
}
}
});
let mut work_senders = HashMap::new();
let mut tasks = Vec::new();
for tl in &timelines {
let (sender, receiver) = tokio::sync::mpsc::channel(1); // TODO: not sure what the implications of this are
work_senders.insert(tl, sender);
tasks.push(tokio::spawn(client(
args,
*tl,
Arc::clone(&start_work_barrier),
receiver,
Arc::clone(&all_work_done_barrier),
Arc::clone(&live_stats),
)));
}
let work_sender = async move {
start_work_barrier.wait().await;
loop {
let (timeline, work) = {
let mut rng = rand::thread_rng();
let target = all_targets.choose(&mut rng).unwrap();
let lsn = target.lsn_range.clone().map(|r| rng.gen_range(r));
(
target.timeline,
Work {
lsn,
gzip: rng.gen_bool(args.gzip_probability),
},
)
};
let sender = work_senders.get(&timeline).unwrap();
// TODO: what if this blocks?
sender.send(work).await.ok().unwrap();
}
};
if let Some(runtime) = args.runtime {
match tokio::time::timeout(runtime.into(), work_sender).await {
Ok(()) => unreachable!("work sender never terminates"),
Err(_timeout) => {
// this implicitly drops the work_senders, making all the clients exit
}
}
} else {
work_sender.await;
unreachable!("work sender never terminates");
}
for t in tasks {
t.await.unwrap();
}
let output = Output {
total: {
let mut agg_stats = ThreadLocalStats::new();
for stats in thread_local_stats.lock().unwrap().iter() {
let stats = stats.lock().unwrap();
agg_stats.add(&*stats);
}
agg_stats.output()
},
};
let output = serde_json::to_string_pretty(&output).unwrap();
println!("{output}");
anyhow::Ok(())
}
#[derive(Copy, Clone)]
struct Work {
lsn: Option<Lsn>,
gzip: bool,
}
#[instrument(skip_all)]
async fn client(
args: &'static Args,
timeline: TenantTimelineId,
start_work_barrier: Arc<Barrier>,
mut work: tokio::sync::mpsc::Receiver<Work>,
all_work_done_barrier: Arc<Barrier>,
live_stats: Arc<LiveStats>,
) {
start_work_barrier.wait().await;
let client =
pageserver::client::page_service::Client::new(crate::util::connstring::connstring(
&args.page_service_host_port,
args.pageserver_jwt.as_deref(),
))
.await
.unwrap();
while let Some(Work { lsn, gzip }) = work.recv().await {
let start = Instant::now();
let copy_out_stream = client
.basebackup(&BasebackupRequest {
tenant_id: timeline.tenant_id,
timeline_id: timeline.timeline_id,
lsn,
gzip,
})
.await
.with_context(|| format!("start basebackup for {timeline}"))
.unwrap();
use futures::StreamExt;
let size = Arc::new(AtomicUsize::new(0));
copy_out_stream
.for_each({
|r| {
let size = Arc::clone(&size);
async move {
let size = Arc::clone(&size);
size.fetch_add(r.unwrap().len(), Ordering::Relaxed);
}
}
})
.await;
debug!("basebackup size is {} bytes", size.load(Ordering::Relaxed));
let elapsed = start.elapsed();
live_stats.inc();
STATS.with(|stats| {
stats.borrow().lock().unwrap().observe(elapsed).unwrap();
});
}
all_work_done_barrier.wait().await;
}

View File

@@ -0,0 +1,19 @@
use clap::Parser;
pub(crate) mod util;
mod basebackup;
/// Component-level performance test for pageserver.
#[derive(clap::Parser)]
enum Args {
Basebackup(basebackup::Args),
}
fn main() {
let args = Args::parse();
match args {
Args::Basebackup(args) => basebackup::main(args),
}
.unwrap()
}

View File

@@ -0,0 +1,2 @@
pub(crate) mod connstring;
pub(crate) mod tenant_timeline_id;

View File

@@ -0,0 +1,8 @@
pub(crate) fn connstring(host_port: &str, jwt: Option<&str>) -> String {
let colon_and_jwt = if let Some(jwt) = jwt {
format!(":{jwt}") // TODO: urlescape
} else {
format!("")
};
format!("postgres://postgres{colon_and_jwt}@{host_port}")
}

View File

@@ -0,0 +1,36 @@
use std::str::FromStr;
use anyhow::Context;
use utils::id::TimelineId;
use utils::id::TenantId;
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub(crate) struct TenantTimelineId {
pub(crate) tenant_id: TenantId,
pub(crate) timeline_id: TimelineId,
}
impl FromStr for TenantTimelineId {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let (tenant_id, timeline_id) = s
.split_once("/")
.context("tenant and timeline id must be separated by `/`")?;
let tenant_id = TenantId::from_str(&tenant_id)
.with_context(|| format!("invalid tenant id: {tenant_id:?}"))?;
let timeline_id = TimelineId::from_str(&timeline_id)
.with_context(|| format!("invalid timeline id: {timeline_id:?}"))?;
Ok(Self {
tenant_id,
timeline_id,
})
}
}
impl std::fmt::Display for TenantTimelineId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.tenant_id, self.timeline_id)
}
}

View File

@@ -166,71 +166,111 @@ where
}
}
// Gather non-relational files from object storage pages.
debug!("Gather non-relational files from object storage pages");
for kind in [
SlruKind::Clog,
SlruKind::MultiXactOffsets,
SlruKind::MultiXactMembers,
] {
for segno in self
.timeline
.list_slru_segments(kind, self.lsn, self.ctx)
.await?
{
self.add_slru_segment(kind, segno).await?;
async {
debug!("list slru segments");
for segno in self
.timeline
.list_slru_segments(kind, self.lsn, self.ctx)
.await?
{
async {
debug!("add slru segment");
self.add_slru_segment(kind, segno).await?;
anyhow::Ok(())
}
.instrument(debug_span!("slru segment", ?segno))
.await?;
}
anyhow::Ok(())
}
.instrument(debug_span!("non-rel file", ?kind))
.await?;
}
let mut min_restart_lsn: Lsn = Lsn::MAX;
// Create tablespace directories
debug!("Create tablespace directories");
for ((spcnode, dbnode), has_relmap_file) in
self.timeline.list_dbdirs(self.lsn, self.ctx).await?
{
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
async {
debug!("iter");
self.add_dbdir(spcnode, dbnode, has_relmap_file).await?;
// If full backup is requested, include all relation files.
// Otherwise only include init forks of unlogged relations.
let rels = self
.timeline
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
.await?;
for &rel in rels.iter() {
// Send init fork as main fork to provide well formed empty
// contents of UNLOGGED relations. Postgres copies it in
// `reinit.c` during recovery.
if rel.forknum == INIT_FORKNUM {
// I doubt we need _init fork itself, but having it at least
// serves as a marker relation is unlogged.
self.add_rel(rel, rel).await?;
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
continue;
}
// If full backup is requested, include all relation files.
// Otherwise only include init forks of unlogged relations.
debug!("list rels");
let rels = self
.timeline
.list_rels(spcnode, dbnode, self.lsn, self.ctx)
.await?;
for &rel in rels.iter() {
async {
debug!("iter");
// Send init fork as main fork to provide well formed empty
// contents of UNLOGGED relations. Postgres copies it in
// `reinit.c` during recovery.
if rel.forknum == INIT_FORKNUM {
// I doubt we need _init fork itself, but having it at least
// serves as a marker relation is unlogged.
self.add_rel(rel, rel).await?;
self.add_rel(rel, rel.with_forknum(MAIN_FORKNUM)).await?;
return Ok(());
}
if self.full_backup {
if rel.forknum == MAIN_FORKNUM && rels.contains(&rel.with_forknum(INIT_FORKNUM))
{
// skip this, will include it when we reach the init fork
continue;
if self.full_backup {
if rel.forknum == MAIN_FORKNUM
&& rels.contains(&rel.with_forknum(INIT_FORKNUM))
{
// skip this, will include it when we reach the init fork
return Ok(());
}
self.add_rel(rel, rel).await?;
}
anyhow::Ok(())
}
self.add_rel(rel, rel).await?;
.instrument(debug_span!("process rel", ?rel))
.await?;
}
}
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
if path.starts_with("pg_replslot") {
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
let restart_lsn = Lsn(u64::from_le_bytes(
content[offs..offs + 8].try_into().unwrap(),
));
info!("Replication slot {} restart LSN={}", path, restart_lsn);
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
debug!("list aux files");
for (path, content) in self.timeline.list_aux_files(self.lsn, self.ctx).await? {
async {
debug!("iter");
if path.starts_with("pg_replslot") {
let offs = pg_constants::REPL_SLOT_ON_DISK_OFFSETOF_RESTART_LSN;
let restart_lsn = Lsn(u64::from_le_bytes(
content[offs..offs + 8].try_into().unwrap(),
));
info!("Replication slot {} restart LSN={}", path, restart_lsn);
min_restart_lsn = Lsn::min(min_restart_lsn, restart_lsn);
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
.append(&header, &*content)
.await
.context("could not add aux file to basebackup tarball")?;
anyhow::Ok(())
}
.instrument(debug_span!("process aux file", ?path))
.await?;
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
.append(&header, &*content)
.await
.context("could not add aux file to basebackup tarball")?;
debug!("done");
anyhow::Ok(())
}
.instrument(debug_span!(
"process tablespace directory",
?spcnode,
?dbnode
))
.await?;
}
if min_restart_lsn != Lsn::MAX {
info!(
@@ -244,19 +284,25 @@ where
.await
.context("could not add restart.lsn file to basebackup tarball")?;
}
debug!("list twophase files");
for xid in self
.timeline
.list_twophase_files(self.lsn, self.ctx)
.await?
{
self.add_twophase_file(xid).await?;
async {
self.add_twophase_file(xid).await?;
anyhow::Ok(())
}
.instrument(debug_span!("process twophase file", ?xid))
.await?;
}
fail_point!("basebackup-before-control-file", |_| {
bail!("failpoint basebackup-before-control-file")
});
// Generate pg_control and bootstrap WAL segment.
debug!("Generate pg_control and bootstrap WAL segment.");
self.add_pgcontrol_file().await?;
self.ar.finish().await?;
debug!("all tarred up!");

View File

@@ -103,7 +103,11 @@ fn main() -> anyhow::Result<()> {
} else {
TracingErrorLayerEnablement::Disabled
};
logging::init(conf.log_format, tracing_error_layer_enablement)?;
let _guard = logging::init(
conf.log_format,
tracing_error_layer_enablement,
logging::Output::Stdout,
)?;
// mind the order required here: 1. logging, 2. panic_hook, 3. sentry.
// disarming this hook on pageserver, because we never tear down tracing.

2
pageserver/src/client.rs Normal file
View File

@@ -0,0 +1,2 @@
pub mod mgmt_api;
pub mod page_service;

View File

@@ -0,0 +1,91 @@
use anyhow::Context;
use hyper::{client::HttpConnector, Uri};
use utils::id::{TenantId, TimelineId};
pub struct Client {
mgmt_api_endpoint: String,
authorization_header: Option<String>,
client: hyper::Client<HttpConnector, hyper::Body>,
}
impl Client {
pub fn new(mgmt_api_endpoint: String, jwt: Option<&str>) -> Self {
Self {
mgmt_api_endpoint,
authorization_header: jwt.map(|jwt| format!("Bearer {jwt}")),
client: hyper::client::Client::new(),
}
}
pub async fn list_tenants(&self) -> anyhow::Result<Vec<pageserver_api::models::TenantInfo>> {
let uri = Uri::try_from(format!("{}/v1/tenant", self.mgmt_api_endpoint))?;
let resp = self.get(uri).await?;
if !resp.status().is_success() {
anyhow::bail!("status error");
}
let body = hyper::body::to_bytes(resp).await?;
Ok(serde_json::from_slice(&body)?)
}
pub async fn list_timelines(
&self,
tenant_id: TenantId,
) -> anyhow::Result<Vec<pageserver_api::models::TimelineInfo>> {
let uri = Uri::try_from(format!(
"{}/v1/tenant/{tenant_id}/timeline",
self.mgmt_api_endpoint
))?;
let resp = self.get(uri).await?;
if !resp.status().is_success() {
anyhow::bail!("status error");
}
let body = hyper::body::to_bytes(resp).await?;
Ok(serde_json::from_slice(&body)?)
}
pub async fn timeline_info(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<pageserver_api::models::TimelineInfo> {
let uri = Uri::try_from(format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}",
self.mgmt_api_endpoint
))?;
let resp = self.get(uri).await?;
if !resp.status().is_success() {
anyhow::bail!("status error");
}
let body = hyper::body::to_bytes(resp).await?;
Ok(serde_json::from_slice(&body)?)
}
pub async fn keyspace(
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<crate::http::models::partitioning::Partitioning> {
let uri = Uri::try_from(format!(
"{}/v1/tenant/{tenant_id}/timeline/{timeline_id}/keyspace?check_serialization_roundtrip=true",
self.mgmt_api_endpoint
))?;
let resp = self.get(uri).await?;
if !resp.status().is_success() {
anyhow::bail!("status error");
}
let body = hyper::body::to_bytes(resp).await?;
Ok(serde_json::from_slice(&body).context("deserialize")?)
}
async fn get(&self, uri: Uri) -> hyper::Result<hyper::Response<hyper::Body>> {
let req = hyper::Request::builder().uri(uri).method("GET");
let req = if let Some(value) = &self.authorization_header {
req.header("Authorization", value)
} else {
req
};
let req = req.body(hyper::Body::default());
self.client.request(req.unwrap()).await
}
}

View File

@@ -0,0 +1,145 @@
use std::pin::Pin;
use futures::SinkExt;
use pageserver_api::{
models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
PagestreamGetPageResponse,
},
reltag::RelTag,
};
use tokio::task::JoinHandle;
use tokio_postgres::CopyOutStream;
use tokio_stream::StreamExt;
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
pub struct Client {
client: tokio_postgres::Client,
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
pub struct BasebackupRequest {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub lsn: Option<Lsn>,
pub gzip: bool,
}
impl Client {
pub async fn new(connstring: String) -> anyhow::Result<Self> {
let (client, connection) = tokio_postgres::connect(&connstring, postgres::NoTls).await?;
let conn_task_cancel = CancellationToken::new();
let conn_task = tokio::spawn({
let conn_task_cancel = conn_task_cancel.clone();
async move {
tokio::select! {
_ = conn_task_cancel.cancelled() => { }
res = connection => {
res.unwrap();
}
}
}
});
Ok(Self {
cancel_on_client_drop: Some(conn_task_cancel.drop_guard()),
conn_task,
client,
})
}
pub async fn pagestream(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> anyhow::Result<PagestreamClient> {
let copy_both: tokio_postgres::CopyBothDuplex<bytes::Bytes> = self
.client
.copy_both_simple(&format!("pagestream {tenant_id} {timeline_id}"))
.await?;
let Client {
cancel_on_client_drop,
conn_task,
client: _,
} = self;
Ok(PagestreamClient {
copy_both: Box::pin(copy_both),
conn_task,
cancel_on_client_drop,
})
}
pub async fn basebackup(&self, req: &BasebackupRequest) -> anyhow::Result<CopyOutStream> {
let BasebackupRequest {
tenant_id,
timeline_id,
lsn,
gzip,
} = req;
let mut args = Vec::with_capacity(5);
args.push("basebackup".to_string());
args.push(format!("{tenant_id}"));
args.push(format!("{timeline_id}"));
if let Some(lsn) = lsn {
args.push(format!("{lsn}"));
}
if *gzip {
args.push(format!("--gzip"))
}
Ok(self.client.copy_out(&args.join(" ")).await?)
}
}
/// Create using [`Client::pagestream`].
pub struct PagestreamClient {
copy_both: Pin<Box<tokio_postgres::CopyBothDuplex<bytes::Bytes>>>,
cancel_on_client_drop: Option<tokio_util::sync::DropGuard>,
conn_task: JoinHandle<()>,
}
pub struct RelTagBlockNo {
pub rel_tag: RelTag,
pub block_no: u32,
}
impl PagestreamClient {
pub async fn shutdown(mut self) {
let _ = self.cancel_on_client_drop.take();
self.conn_task.await.unwrap();
}
pub async fn getpage(
&mut self,
key: RelTagBlockNo,
lsn: Lsn,
) -> anyhow::Result<PagestreamGetPageResponse> {
let req = PagestreamGetPageRequest {
latest: false,
rel: key.rel_tag,
blkno: key.block_no,
lsn,
};
let req = PagestreamFeMessage::GetPage(req);
let req: bytes::Bytes = req.serialize();
// let mut req = tokio_util::io::ReaderStream::new(&req);
let mut req = tokio_stream::once(Ok(req));
self.copy_both.send_all(&mut req).await?;
let next: Option<Result<bytes::Bytes, _>> = self.copy_both.next().await;
let next = next.unwrap().unwrap();
match PagestreamBeMessage::deserialize(next)? {
PagestreamBeMessage::Exists(_) => todo!(),
PagestreamBeMessage::Nblocks(_) => todo!(),
PagestreamBeMessage::GetPage(p) => Ok(p),
PagestreamBeMessage::Error(e) => anyhow::bail!("Error: {:?}", e),
PagestreamBeMessage::DbSize(_) => todo!(),
}
}
}

View File

@@ -1,4 +1,4 @@
pub mod routes;
pub use routes::make_router;
pub use pageserver_api::models;
pub mod models;

View File

@@ -0,0 +1,3 @@
//! If possible, use `::pageserver_api::models` instead.
pub mod partitioning;

View File

@@ -0,0 +1,112 @@
use utils::lsn::Lsn;
#[derive(Debug, PartialEq, Eq)]
pub struct Partitioning {
pub keys: crate::keyspace::KeySpace,
pub at_lsn: Lsn,
}
impl serde::Serialize for Partitioning {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
pub struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
impl<'a> serde::Serialize for KeySpace<'a> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
for kr in &self.0.ranges {
seq.serialize_element(&KeyRange(kr))?;
}
seq.end()
}
}
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_key("keys")?;
map.serialize_value(&KeySpace(&self.keys))?;
map.serialize_key("at_lsn")?;
map.serialize_value(&WithDisplay(&self.at_lsn))?;
map.end()
}
}
pub struct WithDisplay<'a, T>(&'a T);
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(&self.0)
}
}
pub struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
impl<'a> serde::Serialize for KeyRange<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeTuple;
let mut t = serializer.serialize_tuple(2)?;
t.serialize_element(&WithDisplay(&self.0.start))?;
t.serialize_element(&WithDisplay(&self.0.end))?;
t.end()
}
}
impl<'a> serde::Deserialize<'a> for Partitioning {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'a>,
{
pub struct KeySpace(crate::keyspace::KeySpace);
impl<'de> serde::Deserialize<'de> for KeySpace {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[serde_with::serde_as]
#[derive(serde::Deserialize)]
#[serde(transparent)]
struct Key(#[serde_as(as = "serde_with::DisplayFromStr")] crate::repository::Key);
#[serde_with::serde_as]
#[derive(serde::Deserialize)]
struct Range(Key, Key);
let ranges: Vec<Range> = serde::Deserialize::deserialize(deserializer)?;
Ok(Self(crate::keyspace::KeySpace {
ranges: ranges
.into_iter()
.map(|Range(start, end)| (start.0..end.0))
.collect(),
}))
}
}
#[serde_with::serde_as]
#[derive(serde::Deserialize)]
struct De {
keys: KeySpace,
#[serde_as(as = "serde_with::DisplayFromStr")]
at_lsn: Lsn,
}
let de: De = serde::Deserialize::deserialize(deserializer)?;
Ok(Self {
at_lsn: de.at_lsn,
keys: de.keys.0,
})
}
}

View File

@@ -26,10 +26,6 @@ use utils::http::endpoint::request_span;
use utils::http::json::json_request_or_empty_body;
use utils::http::request::{get_request_param, must_get_query_param, parse_query_param};
use super::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
};
use crate::context::{DownloadBehavior, RequestContext};
use crate::deletion_queue::DeletionQueueClient;
use crate::metrics::{StorageTimeOperation, STORAGE_TIME_GLOBAL};
@@ -46,6 +42,10 @@ use crate::tenant::timeline::Timeline;
use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError, TenantSharedResources};
use crate::{config::PageServerConf, tenant::mgr};
use crate::{disk_usage_eviction_task, tenant};
use pageserver_api::models::{
StatusResponse, TenantConfigRequest, TenantCreateRequest, TenantCreateResponse, TenantInfo,
TimelineCreateRequest, TimelineGcRequest, TimelineInfo,
};
use utils::{
auth::SwappableJwtAuth,
generation::Generation,
@@ -61,7 +61,7 @@ use utils::{
};
// Imports only used for testing APIs
use super::models::ConfigureFailpointsRequest;
use pageserver_api::models::ConfigureFailpointsRequest;
pub struct State {
conf: &'static PageServerConf,
@@ -1422,71 +1422,11 @@ async fn timeline_collect_keyspace(
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_id))?;
struct Partitioning {
keys: crate::keyspace::KeySpace,
at_lsn: Lsn,
}
impl serde::Serialize for Partitioning {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeMap;
let mut map = serializer.serialize_map(Some(2))?;
map.serialize_key("keys")?;
map.serialize_value(&KeySpace(&self.keys))?;
map.serialize_key("at_lsn")?;
map.serialize_value(&WithDisplay(&self.at_lsn))?;
map.end()
}
}
struct WithDisplay<'a, T>(&'a T);
impl<'a, T: std::fmt::Display> serde::Serialize for WithDisplay<'a, T> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.collect_str(&self.0)
}
}
struct KeySpace<'a>(&'a crate::keyspace::KeySpace);
impl<'a> serde::Serialize for KeySpace<'a> {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeSeq;
let mut seq = serializer.serialize_seq(Some(self.0.ranges.len()))?;
for kr in &self.0.ranges {
seq.serialize_element(&KeyRange(kr))?;
}
seq.end()
}
}
struct KeyRange<'a>(&'a std::ops::Range<crate::repository::Key>);
impl<'a> serde::Serialize for KeyRange<'a> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeTuple;
let mut t = serializer.serialize_tuple(2)?;
t.serialize_element(&WithDisplay(&self.0.start))?;
t.serialize_element(&WithDisplay(&self.0.end))?;
t.end()
}
}
let at_lsn: Option<Lsn> = parse_query_param(&request, "at_lsn")?;
let check_serialization_roundtrip: bool =
parse_query_param(&request, "check_serialization_roundtrip")?.unwrap_or(false);
async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(tenant_id, timeline_id).await?;
@@ -1496,7 +1436,20 @@ async fn timeline_collect_keyspace(
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
json_response(StatusCode::OK, Partitioning { keys, at_lsn })
let res = crate::http::models::partitioning::Partitioning { keys, at_lsn };
if check_serialization_roundtrip {
(|| {
let ser = serde_json::ser::to_vec(&res).context("serialize")?;
let de: crate::http::models::partitioning::Partitioning =
serde_json::from_slice(&ser).context("deserialize")?;
anyhow::ensure!(de == res, "not equal");
info!("passed serialization rountrip check");
Ok(())
})()
.context("serialization rountrip")
.map_err(ApiError::InternalServerError)?;
}
json_response(StatusCode::OK, res)
}
.instrument(info_span!("timeline_collect_keyspace", %tenant_id, %timeline_id))
.await

View File

@@ -5,7 +5,7 @@ use std::ops::Range;
///
/// Represents a set of Keys, in a compact form.
///
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct KeySpace {
/// Contiguous ranges of keys that belong to the key space. In key order,
/// and with no overlap.

View File

@@ -25,6 +25,7 @@ pub mod walingest;
pub mod walrecord;
pub mod walredo;
pub mod client;
pub mod failpoint_support;
use crate::task_mgr::TaskKind;

View File

@@ -638,7 +638,7 @@ const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
///
/// Operations:
/// - open ([`std::fs::OpenOptions::open`])
/// - close (dropping [`std::fs::File`])
/// - close (dropping [`crate::virtual_file::VirtualFile`])
/// - close-by-replace (close by replacement algorithm)
/// - read (`read_at`)
/// - write (`write_at`)

View File

@@ -291,6 +291,16 @@ impl From<harness::TestRedoManager> for WalRedoManager {
}
impl WalRedoManager {
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
match self {
Self::Prod(mgr) => mgr.maybe_quiesce(idle_timeout),
#[cfg(test)]
Self::Test(_) => {
// Not applicable to test redo manager
}
}
}
pub async fn request_redo(
&self,
key: crate::repository::Key,
@@ -1649,22 +1659,16 @@ impl Tenant {
/// This function is periodically called by compactor task.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
pub async fn compaction_iteration(
async fn compaction_iteration(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> anyhow::Result<()> {
// Don't start doing work during shutdown
if let TenantState::Stopping { .. } = self.current_state() {
) -> anyhow::Result<(), timeline::CompactionError> {
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
if !self.is_active() {
return Ok(());
}
// We should only be called once the tenant has activated.
anyhow::ensure!(
self.is_active(),
"Cannot run compaction iteration on inactive tenant"
);
{
let conf = self.tenant_conf.read().unwrap();
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
@@ -3500,6 +3504,7 @@ pub(crate) mod harness {
// enable it in case the tests exercise code paths that use
// debug_assert_current_span_has_tenant_and_timeline_id
logging::TracingErrorLayerEnablement::EnableWithRustLogFilter,
logging::Output::Stdout,
)
.expect("Failed to init test logging")
});

View File

@@ -20,12 +20,14 @@ use std::io::{Error, ErrorKind};
impl<'a> BlockCursor<'a> {
/// Read a blob into a new buffer.
#[tracing::instrument(skip_all, fields(%offset), level = tracing::Level::DEBUG)]
pub async fn read_blob(
&self,
offset: u64,
ctx: &RequestContext,
) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new();
tracing::debug!("reading blob");
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
Ok(buf)
}

View File

@@ -141,6 +141,7 @@ impl<'a> BlockCursor<'a> {
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
#[inline(always)]
#[tracing::instrument(skip_all, level = tracing::Level::DEBUG)]
pub async fn read_blk(
&self,
blknum: u32,

View File

@@ -181,6 +181,7 @@ impl LayerMap {
/// NOTE: This only searches the 'historic' layers, *not* the
/// 'open' and 'frozen' layers!
///
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub fn search(&self, key: Key, end_lsn: Lsn) -> Option<SearchResult> {
let version = self.historic.get().unwrap().get_version(end_lsn.0 - 1)?;
let latest_delta = version.delta_coverage.query(key.to_i128());

View File

@@ -289,7 +289,9 @@ impl DeltaLayer {
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
let path = self.path();
let loaded = DeltaLayerInner::load(&path, None, ctx).await?;
let loaded = DeltaLayerInner::load(&path, None, ctx)
.await
.and_then(|res| res)?;
// not production code
let actual_filename = path.file_name().unwrap().to_owned();
@@ -610,18 +612,28 @@ impl Drop for DeltaLayerWriter {
}
impl DeltaLayerInner {
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure
/// - outer has the permanent failure
pub(super) async fn load(
path: &Utf8Path,
summary: Option<Summary>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.await
.with_context(|| format!("Failed to open file '{path}'"))?;
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let summary_blk = match file.read_blk(0, ctx).await {
Ok(blk) => blk,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
};
// TODO: this should be an assertion instead; see ImageLayerInner::load
let actual_summary =
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
if let Some(mut expected_summary) = summary {
// production code path
@@ -636,11 +648,11 @@ impl DeltaLayerInner {
}
}
Ok(DeltaLayerInner {
Ok(Ok(DeltaLayerInner {
file,
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
})
}))
}
pub(super) async fn get_value_reconstruct_data(

View File

@@ -249,7 +249,9 @@ impl ImageLayer {
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
let path = self.path();
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx).await?;
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, ctx)
.await
.and_then(|res| res)?;
// not production code
let actual_filename = path.file_name().unwrap().to_owned();
@@ -295,18 +297,31 @@ impl ImageLayer {
}
impl ImageLayerInner {
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure
/// - outer has the permanent failure
pub(super) async fn load(
path: &Utf8Path,
lsn: Lsn,
summary: Option<Summary>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let summary_blk = match file.read_blk(0, ctx).await {
Ok(blk) => blk,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
};
// length is the only way how this could fail, so it's not actually likely at all unless
// read_blk returns wrong sized block.
//
// TODO: confirm and make this into assertion
let actual_summary =
Summary::des_prefix(summary_blk.as_ref()).context("deserialize first block")?;
if let Some(mut expected_summary) = summary {
// production code path
@@ -322,12 +337,12 @@ impl ImageLayerInner {
}
}
Ok(ImageLayerInner {
Ok(Ok(ImageLayerInner {
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
lsn,
file,
})
}))
}
pub(super) async fn get_value_reconstruct_data(

View File

@@ -226,6 +226,7 @@ impl Layer {
///
/// It is up to the caller to collect more data from the previous layer and
/// perform WAL redo, if necessary.
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn get_value_reconstruct_data(
&self,
key: Key,
@@ -868,6 +869,9 @@ impl LayerInner {
}
Ok((Err(e), _permit)) => {
// FIXME: this should be with the spawned task and be cancellation sensitive
//
// while we should not need this, this backoff has turned out to be useful with
// a bug of unexpectedly deleted remote layer file (#5787).
let consecutive_failures =
self.consecutive_failures.fetch_add(1, Ordering::Relaxed);
tracing::error!(consecutive_failures, "layer file download failed: {e:#}");
@@ -1196,7 +1200,7 @@ impl DownloadedLayer {
));
delta_layer::DeltaLayerInner::load(&owner.path, summary, ctx)
.await
.map(LayerKind::Delta)
.map(|res| res.map(LayerKind::Delta))
} else {
let lsn = owner.desc.image_layer_lsn();
let summary = Some(image_layer::Summary::expected(
@@ -1207,23 +1211,32 @@ impl DownloadedLayer {
));
image_layer::ImageLayerInner::load(&owner.path, lsn, summary, ctx)
.await
.map(LayerKind::Image)
}
// this will be a permanent failure
.context("load layer");
.map(|res| res.map(LayerKind::Image))
};
if let Err(e) = res.as_ref() {
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
// TODO(#5815): we are not logging all errors, so temporarily log them here as well
tracing::error!("layer loading failed permanently: {e:#}");
match res {
Ok(Ok(layer)) => Ok(Ok(layer)),
Ok(Err(transient)) => Err(transient),
Err(permanent) => {
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
// TODO(#5815): we are not logging all errors, so temporarily log them **once**
// here as well
let permanent = permanent.context("load layer");
tracing::error!("layer loading failed permanently: {permanent:#}");
Ok(Err(permanent))
}
}
res
};
self.kind.get_or_init(init).await.as_ref().map_err(|e| {
// errors are not clonabled, cannot but stringify
// test_broken_timeline matches this string
anyhow::anyhow!("layer loading failed: {e:#}")
})
self.kind
.get_or_try_init(init)
// return transient errors using `?`
.await?
.as_ref()
.map_err(|e| {
// errors are not clonabled, cannot but stringify
// test_broken_timeline matches this string
anyhow::anyhow!("layer loading failed: {e:#}")
})
}
async fn get_value_reconstruct_data(

View File

@@ -180,16 +180,16 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
// Run compaction
if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count,
error_run_count + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
error!(
"Compaction failed {error_run_count} times, retrying in {:?}: {e:?}",
wait_duration
"Compaction failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
);
Duration::from_secs_f64(wait_duration)
wait_duration
} else {
error_run_count = 0;
period
@@ -198,6 +198,10 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
warn_when_period_overrun(started_at.elapsed(), period, BackgroundLoopKind::Compaction);
// Perhaps we did no work and the walredo process has been idle for some time:
// give it a chance to shut down to avoid leaving walredo process running indefinitely.
tenant.walredo_mgr.maybe_quiesce(period * 10);
// Sleep
if tokio::time::timeout(sleep_duration, cancel.cancelled())
.await
@@ -261,16 +265,16 @@ async fn gc_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
.await;
if let Err(e) = res {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count,
error_run_count + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
error!(
"Gc failed {error_run_count} times, retrying in {:?}: {e:?}",
wait_duration
"Gc failed {error_run_count} times, retrying in {wait_duration:?}: {e:?}",
);
Duration::from_secs_f64(wait_duration)
wait_duration
} else {
error_run_count = 0;
period

View File

@@ -468,6 +468,7 @@ impl Timeline {
/// an ancestor branch, for example, or waste a lot of cycles chasing the
/// non-existing key.
///
#[instrument(skip_all, fields(%key, %lsn), level = tracing::Level::DEBUG)]
pub async fn get(
&self,
key: Key,
@@ -2044,6 +2045,7 @@ impl Timeline {
///
/// This function takes the current timeline's locked LayerMap as an argument,
/// so callers can avoid potential race conditions.
#[instrument(level = tracing::Level::DEBUG, skip_all)]
async fn get_reconstruct_data(
&self,
key: Key,
@@ -3497,21 +3499,22 @@ impl Timeline {
}
// FIXME: the writer already fsyncs all data, only rename needs to be fsynced here
let mut layer_paths: Vec<Utf8PathBuf> = new_layers
let layer_paths: Vec<Utf8PathBuf> = new_layers
.iter()
.map(|l| l.local_path().to_owned())
.collect();
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
//
// FIXME: spawn_blocking above for this
par_fsync::par_fsync(&layer_paths).context("fsync all new layers")?;
par_fsync::par_fsync_async(&layer_paths)
.await
.context("fsync all new layers")?;
par_fsync::par_fsync(&[self.conf.timeline_path(&self.tenant_id, &self.timeline_id)])
let timeline_dir = self.conf.timeline_path(&self.tenant_id, &self.timeline_id);
par_fsync::par_fsync_async(&[timeline_dir])
.await
.context("fsync of timeline dir")?;
layer_paths.pop().unwrap();
}
stats.write_layer_files_micros = stats.read_lock_drop_micros.till_now();

View File

@@ -91,6 +91,7 @@ struct ProcessOutput {
pub struct PostgresRedoManager {
tenant_id: TenantId,
conf: &'static PageServerConf,
last_redo_at: std::sync::Mutex<Option<Instant>>,
redo_process: RwLock<Option<Arc<WalRedoProcess>>>,
}
@@ -187,10 +188,26 @@ impl PostgresRedoManager {
PostgresRedoManager {
tenant_id,
conf,
last_redo_at: std::sync::Mutex::default(),
redo_process: RwLock::new(None),
}
}
/// This type doesn't have its own background task to check for idleness: we
/// rely on our owner calling this function periodically in its own housekeeping
/// loops.
pub(crate) fn maybe_quiesce(&self, idle_timeout: Duration) {
if let Ok(g) = self.last_redo_at.try_lock() {
if let Some(last_redo_at) = *g {
if last_redo_at.elapsed() >= idle_timeout {
drop(g);
let mut guard = self.redo_process.write().unwrap();
*guard = None;
}
}
}
}
///
/// Process one request for WAL redo using wal-redo postgres
///
@@ -205,6 +222,8 @@ impl PostgresRedoManager {
wal_redo_timeout: Duration,
pg_version: u32,
) -> anyhow::Result<Bytes> {
*(self.last_redo_at.lock().unwrap()) = Some(Instant::now());
let (rel, blknum) = key_to_rel_block(key).context("invalid record")?;
const MAX_RETRY_ATTEMPTS: u32 = 1;
let mut n_attempts = 0u32;
@@ -348,12 +367,13 @@ impl PostgresRedoManager {
self.apply_record_neon(key, &mut page, *record_lsn, record)?;
}
// Success!
let end_time = Instant::now();
let duration = end_time.duration_since(start_time);
let duration = start_time.elapsed();
// FIXME: using the same metric here creates a bimodal distribution by default, and because
// there could be multiple batch sizes this would be N+1 modal.
WAL_REDO_TIME.observe(duration.as_secs_f64());
debug!(
"neon applied {} WAL records in {} ms to reconstruct page image at LSN {}",
"neon applied {} WAL records in {} us to reconstruct page image at LSN {}",
records.len(),
duration.as_micros(),
lsn

View File

@@ -475,6 +475,12 @@ NeonXactCallback(XactEvent event, void *arg)
Assert(CurrentDdlTable == &RootTable);
}
static bool
RoleIsNeonSuperuser(const char *role_name)
{
return strcmp(role_name, "neon_superuser") == 0;
}
static void
HandleCreateDb(CreatedbStmt *stmt)
{
@@ -501,9 +507,16 @@ HandleCreateDb(CreatedbStmt *stmt)
entry->type = Op_Set;
if (downer && downer->arg)
entry->owner = get_role_oid(defGetString(downer), false);
{
const char *owner_name = defGetString(downer);
if (RoleIsNeonSuperuser(owner_name))
elog(ERROR, "can't create a database with owner neon_superuser");
entry->owner = get_role_oid(owner_name, false);
}
else
{
entry->owner = GetUserId();
}
}
static void
@@ -522,8 +535,10 @@ HandleAlterOwner(AlterOwnerStmt *stmt)
if (!found)
memset(entry->old_name, 0, sizeof(entry->old_name));
entry->owner = get_role_oid(get_rolespec_name(stmt->newowner), false);
const char *new_owner = get_rolespec_name(stmt->newowner);
if (RoleIsNeonSuperuser(new_owner))
elog(ERROR, "can't alter owner to neon_superuser");
entry->owner = get_role_oid(new_owner, false);
entry->type = Op_Set;
}
@@ -617,6 +632,9 @@ HandleAlterRole(AlterRoleStmt *stmt)
InitRoleTableIfNeeded();
DefElem *dpass = NULL;
ListCell *option;
const char *role_name = stmt->role->rolename;
if (RoleIsNeonSuperuser(role_name))
elog(ERROR, "can't ALTER neon_superuser");
foreach(option, stmt->options)
{
@@ -631,7 +649,7 @@ HandleAlterRole(AlterRoleStmt *stmt)
bool found = false;
RoleEntry *entry = hash_search(
CurrentDdlTable->role_table,
stmt->role->rolename,
role_name,
HASH_ENTER,
&found);

View File

@@ -1687,9 +1687,9 @@ neon_extend(SMgrRelation reln, ForkNumber forkNum, BlockNumber blkno,
if (current_size >= ((uint64) max_cluster_size) * 1024 * 1024)
ereport(ERROR,
(errcode(ERRCODE_DISK_FULL),
errmsg("could not extend file because cluster size limit (%d MB) has been exceeded",
errmsg("could not extend file because project size limit (%d MB) has been exceeded",
max_cluster_size),
errhint("This limit is defined by neon.max_cluster_size GUC")));
errhint("This limit is defined externally by the project size limit, and internally by neon.max_cluster_size GUC")));
}
/*

View File

@@ -248,6 +248,7 @@ impl ConnCfg {
// connect_raw() will not use TLS if sslmode is "disable"
let (client, connection) = self.0.connect_raw(stream, tls).await?;
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
let stream = connection.stream.into_inner();
info!(

View File

@@ -514,7 +514,7 @@ pub fn invalidate_cache(node_info: console::CachedNodeInfo) -> compute::ConnCfg
}
/// Try to connect to the compute node once.
#[tracing::instrument(name = "connect_once", skip_all)]
#[tracing::instrument(name = "connect_once", fields(pid = tracing::field::Empty), skip_all)]
async fn connect_to_compute_once(
node_info: &console::CachedNodeInfo,
timeout: time::Duration,

View File

@@ -208,6 +208,10 @@ impl GlobalConnPool {
} else {
info!("pool: reusing connection '{conn_info}'");
client.session.send(session_id)?;
tracing::Span::current().record(
"pid",
&tracing::field::display(client.inner.get_process_id()),
);
latency_timer.pool_hit();
latency_timer.success();
return Ok(Client::new(client, pool).await);
@@ -224,6 +228,12 @@ impl GlobalConnPool {
)
.await
};
if let Ok(client) = &new_client {
tracing::Span::current().record(
"pid",
&tracing::field::display(client.inner.get_process_id()),
);
}
match &new_client {
// clear the hash. it's no longer valid
@@ -257,12 +267,11 @@ impl GlobalConnPool {
}
_ => {}
}
// new_client.map(|inner| Client::new(inner, pool).await)
Ok(Client::new(new_client?, pool).await)
let new_client = new_client?;
Ok(Client::new(new_client, pool).await)
}
fn put(&self, conn_info: &ConnInfo, client: ClientInner, pid: i32) -> anyhow::Result<()> {
fn put(&self, conn_info: &ConnInfo, client: ClientInner) -> anyhow::Result<()> {
let conn_id = client.conn_id;
// We want to hold this open while we return. This ensures that the pool can't close
@@ -306,9 +315,9 @@ impl GlobalConnPool {
// do logging outside of the mutex
if returned {
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}, pid={pid}");
info!(%conn_id, "pool: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
} else {
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}, pid={pid}");
info!(%conn_id, "pool: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
}
Ok(())
@@ -385,7 +394,7 @@ impl ConnectMechanism for TokioMechanism<'_> {
// Wake up the destination if needed. Code here is a bit involved because
// we reuse the code from the usual proxy and we need to prepare few structures
// that this code expects.
#[tracing::instrument(skip_all)]
#[tracing::instrument(fields(pid = tracing::field::Empty), skip_all)]
async fn connect_to_compute(
config: &config::ProxyConfig,
conn_info: &ConnInfo,
@@ -452,6 +461,7 @@ async fn connect_to_compute_once(
.connect_timeout(timeout)
.connect(tokio_postgres::NoTls)
.await?;
tracing::Span::current().record("pid", &tracing::field::display(client.get_process_id()));
let (tx, mut rx) = tokio::sync::watch::channel(session);
@@ -519,22 +529,6 @@ struct ClientInner {
conn_id: uuid::Uuid,
}
impl ClientInner {
pub async fn get_pid(&mut self) -> anyhow::Result<i32> {
let rows = self.inner.query("select pg_backend_pid();", &[]).await?;
if rows.len() != 1 {
Err(anyhow::anyhow!(
"expected 1 row from pg_backend_pid(), got {}",
rows.len()
))
} else {
let pid = rows[0].get(0);
info!(%pid, "got pid");
Ok(pid)
}
}
}
impl Client {
pub fn metrics(&self) -> Arc<MetricCounter> {
USAGE_METRICS.register(self.inner.as_ref().unwrap().ids.clone())
@@ -546,7 +540,6 @@ pub struct Client {
span: Span,
inner: Option<ClientInner>,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
pid: i32,
}
pub struct Discard<'a> {
@@ -556,12 +549,11 @@ pub struct Discard<'a> {
impl Client {
pub(self) async fn new(
mut inner: ClientInner,
inner: ClientInner,
pool: Option<(ConnInfo, Arc<GlobalConnPool>)>,
) -> Self {
Self {
conn_id: inner.conn_id,
pid: inner.get_pid().await.unwrap_or(-1),
inner: Some(inner),
span: Span::current(),
pool,
@@ -573,7 +565,6 @@ impl Client {
pool,
conn_id,
span: _,
pid: _,
} = self;
(
&mut inner
@@ -630,11 +621,10 @@ impl Drop for Client {
.expect("client inner should not be removed");
if let Some((conn_info, conn_pool)) = self.pool.take() {
let current_span = self.span.clone();
let pid = self.pid;
// return connection to the pool
tokio::task::spawn_blocking(move || {
let _span = current_span.enter();
let _ = conn_pool.put(&conn_info, client, pid);
let _ = conn_pool.put(&conn_info, client);
});
}
}

View File

@@ -250,7 +250,7 @@ pub async fn handle(
Ok(response)
}
#[instrument(name = "sql-over-http", skip_all)]
#[instrument(name = "sql-over-http", fields(pid = tracing::field::Empty), skip_all)]
async fn handle_inner(
request: Request<Body>,
sni_hostname: Option<String>,

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.73.0"
channel = "1.74.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -199,9 +199,10 @@ async fn main() -> anyhow::Result<()> {
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(
let _guard = logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
info!("version: {GIT_VERSION}");

View File

@@ -431,9 +431,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// 1. init logging
// 2. tracing panic hook
// 3. sentry
logging::init(
let _guard = logging::init(
LogFormat::from_config(&args.log_format)?,
logging::TracingErrorLayerEnablement::Disabled,
logging::Output::Stdout,
)?;
logging::replace_panic_hook_with_tracing_panic_hook().forget();
// initialize sentry if SENTRY_DSN is provided

View File

@@ -1663,7 +1663,7 @@ class NeonPageserver(PgProtocol):
# these can happen anytime we do compactions from background task and shutdown pageserver
r".*ERROR.*ancestor timeline \S+ is being stopped",
# this is expected given our collaborative shutdown approach for the UploadQueue
".*Compaction failed.*, retrying in .*: queue is in state Stopped.*",
".*Compaction failed.*, retrying in .*: Other\\(queue is in state Stopped.*",
# Pageserver timeline deletion should be polled until it gets 404, so ignore it globally
".*Error processing HTTP request: NotFound: Timeline .* was not found",
".*took more than expected to complete.*",

View File

@@ -46,7 +46,10 @@ from fixtures.utils import query_scalar
# Because the delta layer D covering lsn1 is corrupted, creating a branch
# starting from lsn1 should return an error as follows:
# could not find data for key ... at LSN ..., for request at LSN ...
def test_branch_and_gc(neon_simple_env: NeonEnv):
def test_branch_and_gc(neon_simple_env: NeonEnv, build_type: str):
if build_type == "debug":
pytest.skip("times out in debug builds")
env = neon_simple_env
pageserver_http_client = env.pageserver.http_client()

View File

@@ -245,6 +245,19 @@ def test_ddl_forwarding(ddl: DdlForwardingContext):
raise AssertionError("Could not count databases")
assert result[0] == 0, "Database 'failure' still exists after drop"
# We don't have compute_ctl, so here, so create neon_superuser here manually
cur.execute("CREATE ROLE neon_superuser NOLOGIN CREATEDB CREATEROLE")
with pytest.raises(psycopg2.InternalError):
cur.execute("ALTER ROLE neon_superuser LOGIN")
with pytest.raises(psycopg2.InternalError):
cur.execute("CREATE DATABASE trololobus WITH OWNER neon_superuser")
cur.execute("CREATE DATABASE trololobus")
with pytest.raises(psycopg2.InternalError):
cur.execute("ALTER DATABASE trololobus OWNER TO neon_superuser")
conn.close()

View File

@@ -1,5 +1,6 @@
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -15,7 +16,11 @@ from fixtures.utils import query_scalar
# and then download them back.
def test_basic_eviction(
neon_env_builder: NeonEnvBuilder,
build_type: str,
):
if build_type == "debug":
pytest.skip("times out in debug builds")
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start(

View File

@@ -144,7 +144,10 @@ def test_pageserver_restart(neon_env_builder: NeonEnvBuilder, generations: bool)
# Test that repeatedly kills and restarts the page server, while the
# safekeeper and compute node keep running.
@pytest.mark.timeout(540)
def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder):
def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder, build_type: str):
if build_type == "debug":
pytest.skip("times out in debug builds")
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.enable_scrub_on_exit()

View File

@@ -307,7 +307,7 @@ def test_tenant_detach_smoke(neon_env_builder: NeonEnvBuilder):
)
gc_thread = Thread(target=lambda: do_gc_target(pageserver_http, tenant_id, timeline_id))
gc_thread.start()
time.sleep(1)
time.sleep(5)
# By now the gc task is spawned but in sleep for another second due to the failpoint.
log.info("detaching tenant")

View File

@@ -602,7 +602,10 @@ async def run_wal_lagging(env: NeonEnv, endpoint: Endpoint, test_output_dir: Pat
# The test takes more than default 5 minutes on Postgres 16,
# see https://github.com/neondatabase/neon/issues/5305
@pytest.mark.timeout(600)
def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path):
def test_wal_lagging(neon_env_builder: NeonEnvBuilder, test_output_dir: Path, build_type: str):
if build_type == "debug":
pytest.skip("times out in debug builds")
neon_env_builder.num_safekeepers = 3
env = neon_env_builder.init_start()

View File

@@ -4,15 +4,15 @@ commands:
- name: cgconfigparser
user: root
sysvInitAction: sysinit
shell: "cgconfigparser -l /etc/cgconfig.conf -s 1664"
shell: 'cgconfigparser -l /etc/cgconfig.conf -s 1664'
- name: pgbouncer
user: nobody
sysvInitAction: respawn
shell: "/usr/local/bin/pgbouncer /etc/pgbouncer.ini"
shell: '/usr/local/bin/pgbouncer /etc/pgbouncer.ini'
- name: postgres-exporter
user: nobody
sysvInitAction: respawn
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter --extend.query-path /etc/postgres_exporter_queries.yml'
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter'
shutdownHook: |
su -p postgres --session-command '/usr/local/bin/pg_ctl stop -D /var/db/postgres/compute/pgdata -m fast --wait -t 10'
files:
@@ -46,21 +46,6 @@ files:
}
memory {}
}
- filename: postgres_exporter_queries.yml
content: |
postgres_exporter_pg_database_size:
query: "SELECT pg_database.datname, pg_database_size(pg_database.datname) as bytes, 42 as fourtytwo FROM pg_database"
cache_seconds: 30
metrics:
- datname:
usage: "LABEL"
description: "Name of the database"
- bytes:
usage: "GAUGE"
description: "Disk space used by the database"
- fourtytwo:
usage: "GAUGE"
description: "fourtytwo"
build: |
# Build cgroup-tools
#
@@ -129,12 +114,10 @@ merge: |
COPY cgconfig.conf /etc/cgconfig.conf
COPY pgbouncer.ini /etc/pgbouncer.ini
COPY postgres_exporter_queries.yml /etc/postgres_exporter_queries.yml
RUN set -e \
&& chown postgres:postgres /etc/pgbouncer.ini \
&& chmod 0644 /etc/pgbouncer.ini \
&& chmod 0644 /etc/cgconfig.conf \
&& chmod 0644 /etc/postgres_exporter_queries.yml
&& chmod 0644 /etc/cgconfig.conf
COPY --from=libcgroup-builder /libcgroup-install/bin/* /usr/bin/
COPY --from=libcgroup-builder /libcgroup-install/lib/* /usr/lib/