Compare commits

..

16 Commits

Author SHA1 Message Date
Christian Schwarz
9ac1efbccd starvation prevention while allowing concurrent find_victims 2023-10-04 17:33:47 +00:00
Christian Schwarz
6bfc0492ac Revert "serialize find_victim callers through std mutex"
This reverts commit 74601238ee.
2023-10-04 16:47:50 +00:00
Christian Schwarz
3adaec3ab2 Revert "async version of previous commit"
This reverts commit 79c577c2eb.
2023-10-04 16:47:46 +00:00
Christian Schwarz
79c577c2eb async version of previous commit
both perform badly in uncontended case, so, not an option
2023-10-04 16:47:24 +00:00
Christian Schwarz
74601238ee serialize find_victim callers through std mutex 2023-10-04 16:34:59 +00:00
Christian Schwarz
edf24e7afc Revert "yield to executor every time we move backwards"
This reverts commit 49bf66a467.
2023-10-04 15:54:57 +00:00
Christian Schwarz
c5f24bab55 Revert "the effect of yield_now() was just less competition in find_victim, prove by replacing it with busy loop"
This reverts commit 6124ad694a.
2023-10-04 15:54:44 +00:00
Christian Schwarz
6124ad694a the effect of yield_now() was just less competition in find_victim, prove by replacing it with busy loop 2023-10-04 15:41:05 +00:00
Christian Schwarz
49bf66a467 yield to executor every time we move backwards
Results are looking good, the bottleneck is now the file descriptor cache.
2023-10-04 14:42:51 +00:00
Christian Schwarz
d0497786d9 idea: use downgrade() to ensure forward progress 2023-10-04 14:33:01 +00:00
Christian Schwarz
926d53de2d disable the timeout, see whether that reduces find_victim CPU burn 2023-10-04 14:24:19 +00:00
Christian Schwarz
d598481894 Revert "revert recent VirtualFile asyncification changes (#5291)"
This reverts commit ab1f37e908.
2023-10-02 16:01:11 +00:00
Christian Schwarz
b1fd8db8b3 REPRO: rebase fallout & add some instructions 2023-10-02 16:00:25 +00:00
Christian Schwarz
219bc223f4 HACK: BACKGROUND_RUNTIME webserver to measure response time using wrk 2023-10-02 15:25:22 +00:00
Christian Schwarz
b22675c6ac REPRO the problem: , uses 430GB of space; 4 seconds load time; constant 20kIOPS after ~20s 2023-10-02 15:25:22 +00:00
Christian Schwarz
356a18fa4c disable concurrent compaction limit (it wasn't there when I first analyzed the issue) 2023-10-02 15:25:22 +00:00
97 changed files with 1445 additions and 1690 deletions

View File

@@ -76,8 +76,8 @@ runs:
rm -f ${ALLURE_ZIP}
fi
env:
ALLURE_VERSION: 2.24.0
ALLURE_ZIP_SHA256: 60b1d6ce65d9ef24b23cf9c2c19fd736a123487c38e54759f1ed1a7a77353c90
ALLURE_VERSION: 2.23.1
ALLURE_ZIP_SHA256: 11141bfe727504b3fd80c0f9801eb317407fd0ac983ebb57e671f14bac4bcd86
# Potentially we could have several running build for the same key (for example, for the main branch), so we use improvised lock for this
- name: Acquire lock

59
Cargo.lock generated
View File

@@ -158,6 +158,17 @@ dependencies = [
"syn 1.0.109",
]
[[package]]
name = "async-channel"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
]
[[package]]
name = "async-compression"
version = "0.4.0"
@@ -798,22 +809,6 @@ dependencies = [
"either",
]
[[package]]
name = "camino"
version = "1.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c59e92b5a388f549b863a7bea62612c09f24c8393560709a54558a9abdfb3b9c"
[[package]]
name = "camino-tempfile"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2ab15a83d13f75dbd86f082bdefd160b628476ef58d3b900a0ef74e001bb097"
dependencies = [
"camino",
"tempfile",
]
[[package]]
name = "cast"
version = "0.3.0"
@@ -1031,6 +1026,15 @@ dependencies = [
"zstd",
]
[[package]]
name = "concurrent-queue"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "const_format"
version = "0.2.30"
@@ -1069,7 +1073,6 @@ name = "control_plane"
version = "0.1.0"
dependencies = [
"anyhow",
"camino",
"clap",
"comfy-table",
"compute_api",
@@ -1452,6 +1455,12 @@ dependencies = [
"libc",
]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "fail"
version = "0.5.1"
@@ -2658,7 +2667,6 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bytes",
"camino",
"clap",
"git-version",
"pageserver",
@@ -2674,13 +2682,12 @@ name = "pageserver"
version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"async-compression",
"async-stream",
"async-trait",
"byteorder",
"bytes",
"camino",
"camino-tempfile",
"chrono",
"clap",
"close_fds",
@@ -2732,6 +2739,7 @@ dependencies = [
"strum_macros",
"svg_fmt",
"sync_wrapper",
"tempfile",
"tenant_size_model",
"thiserror",
"tokio",
@@ -3424,8 +3432,6 @@ dependencies = [
"aws-sdk-s3",
"aws-smithy-http",
"aws-types",
"camino",
"camino-tempfile",
"hyper",
"metrics",
"once_cell",
@@ -3434,6 +3440,7 @@ dependencies = [
"scopeguard",
"serde",
"serde_json",
"tempfile",
"test-context",
"tokio",
"tokio-util",
@@ -3785,8 +3792,6 @@ dependencies = [
"async-trait",
"byteorder",
"bytes",
"camino",
"camino-tempfile",
"chrono",
"clap",
"const_format",
@@ -3815,6 +3820,7 @@ dependencies = [
"serde_with",
"signal-hook",
"storage_broker",
"tempfile",
"thiserror",
"tokio",
"tokio-io-timeout",
@@ -5113,8 +5119,6 @@ dependencies = [
"bincode",
"byteorder",
"bytes",
"camino",
"camino-tempfile",
"chrono",
"const_format",
"criterion",
@@ -5140,6 +5144,7 @@ dependencies = [
"signal-hook",
"strum",
"strum_macros",
"tempfile",
"thiserror",
"tokio",
"tokio-stream",
@@ -5213,7 +5218,6 @@ name = "wal_craft"
version = "0.1.0"
dependencies = [
"anyhow",
"camino-tempfile",
"clap",
"env_logger",
"log",
@@ -5221,6 +5225,7 @@ dependencies = [
"postgres",
"postgres_ffi",
"regex",
"tempfile",
"utils",
"workspace_hack",
]

View File

@@ -51,7 +51,6 @@ bindgen = "0.65"
bstr = "1.0"
byteorder = "1.4"
bytes = "1.0"
camino = "1.1.6"
cfg-if = "1.0.0"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
clap = { version = "4.0", features = ["derive"] }
@@ -188,7 +187,7 @@ workspace_hack = { version = "0.1", path = "./workspace_hack/" }
criterion = "0.5.1"
rcgen = "0.11"
rstest = "0.18"
camino-tempfile = "1.0.2"
tempfile = "3.4"
tonic-build = "0.9"
[patch.crates-io]

View File

@@ -612,13 +612,51 @@ RUN wget https://gitlab.com/dalibo/postgresql_anonymizer/-/archive/1.1.0/postgre
sort -o /before.txt /before.txt && sort -o /after.txt /after.txt && \
comm -13 /before.txt /after.txt | tar --directory=/usr/local/pgsql --zstd -cf /extensions/anon.tar.zst -T -
#########################################################################################
#
# Layer "rust extensions" for older extension which hasn't been updated to `pgrx` yet
# This layer is used to build `pgx` deps
#
#########################################################################################
FROM build-deps AS rust-extensions-build-pgx
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt-get update && \
apt-get install -y curl libclang-dev cmake && \
useradd -ms /bin/bash nonroot -b /home
ENV HOME=/home/nonroot
ENV PATH="/home/nonroot/.cargo/bin:/usr/local/pgsql/bin/:$PATH"
USER nonroot
WORKDIR /home/nonroot
ARG PG_VERSION
RUN case "${PG_VERSION}" in \
"v14" | "v15") \
;; \
"v16") \
echo "TODO: Not yet supported for PostgreSQL 16. Need to update pgrx dependencies" && exit 0 \
;; \
*) \
echo "unexpected PostgreSQL version ${PG_VERSION}" && exit 1 \
;; \
esac && \
curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && \
chmod +x rustup-init && \
./rustup-init -y --no-modify-path --profile minimal --default-toolchain stable && \
rm rustup-init && \
cargo install --locked --version 0.7.3 cargo-pgx && \
/bin/bash -c 'cargo pgx init --pg${PG_VERSION:1}=/usr/local/pgsql/bin/pg_config'
USER root
#########################################################################################
#
# Layer "rust extensions"
# This layer is used to build `pgrx` deps
#
#########################################################################################
FROM build-deps AS rust-extensions-build
FROM build-deps AS rust-extensions-build-pgrx
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
RUN apt-get update && \
@@ -647,14 +685,26 @@ USER root
#
#########################################################################################
FROM rust-extensions-build AS pg-jsonschema-pg-build
FROM rust-extensions-build-pgx AS pg-jsonschema-pg-build
ARG PG_VERSION
RUN wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.2.0.tar.gz -O pg_jsonschema.tar.gz && \
echo "9118fc508a6e231e7a39acaa6f066fcd79af17a5db757b47d2eefbe14f7794f0 pg_jsonschema.tar.gz" | sha256sum --check && \
# caeab60d70b2fd3ae421ec66466a3abbb37b7ee6 made on 06/03/2023
# there is no release tag yet, but we need it due to the superuser fix in the control file, switch to git tag after release >= 0.1.5
RUN case "${PG_VERSION}" in \
"v14" | "v15") \
;; \
"v16") \
echo "TODO: Not yet supported for PostgreSQL 16. Need to update pgrx dependencies" && exit 0 \
;; \
*) \
echo "unexpected PostgreSQL version \"${PG_VERSION}\"" && exit 1 \
;; \
esac && \
wget https://github.com/supabase/pg_jsonschema/archive/caeab60d70b2fd3ae421ec66466a3abbb37b7ee6.tar.gz -O pg_jsonschema.tar.gz && \
echo "54129ce2e7ee7a585648dbb4cef6d73f795d94fe72f248ac01119992518469a4 pg_jsonschema.tar.gz" | sha256sum --check && \
mkdir pg_jsonschema-src && cd pg_jsonschema-src && tar xvzf ../pg_jsonschema.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "0.10.2"/pgrx = { version = "0.10.2", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release && \
sed -i 's/pgx = "0.7.1"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgx install --release && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_jsonschema.control
#########################################################################################
@@ -664,14 +714,29 @@ RUN wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.2.0.tar.
#
#########################################################################################
FROM rust-extensions-build AS pg-graphql-pg-build
FROM rust-extensions-build-pgx AS pg-graphql-pg-build
ARG PG_VERSION
RUN wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.4.0.tar.gz -O pg_graphql.tar.gz && \
echo "bd8dc7230282b3efa9ae5baf053a54151ed0e66881c7c53750e2d0c765776edc pg_graphql.tar.gz" | sha256sum --check && \
# b4988843647450a153439be367168ed09971af85 made on 22/02/2023 (from remove-pgx-contrib-spiext branch)
# Currently pgx version bump to >= 0.7.2 causes "call to unsafe function" compliation errors in
# pgx-contrib-spiext. There is a branch that removes that dependency, so use it. It is on the
# same 1.1 version we've used before.
RUN case "${PG_VERSION}" in \
"v14" | "v15") \
;; \
"v16") \
echo "TODO: Not yet supported for PostgreSQL 16. Need to update pgrx dependencies" && exit 0 \
;; \
*) \
echo "unexpected PostgreSQL version" && exit 1 \
;; \
esac && \
wget https://github.com/yrashk/pg_graphql/archive/b4988843647450a153439be367168ed09971af85.tar.gz -O pg_graphql.tar.gz && \
echo "0c7b0e746441b2ec24187d0e03555faf935c2159e2839bddd14df6dafbc8c9bd pg_graphql.tar.gz" | sha256sum --check && \
mkdir pg_graphql-src && cd pg_graphql-src && tar xvzf ../pg_graphql.tar.gz --strip-components=1 -C . && \
sed -i 's/pgrx = "=0.10.2"/pgrx = { version = "0.10.2", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
cargo pgrx install --release && \
sed -i 's/pgx = "~0.7.1"/pgx = { version = "0.7.3", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
sed -i 's/pgx-tests = "~0.7.1"/pgx-tests = "0.7.3"/g' Cargo.toml && \
cargo pgx install --release && \
# it's needed to enable extension because it uses untrusted C language
sed -i 's/superuser = false/superuser = true/g' /usr/local/pgsql/share/extension/pg_graphql.control && \
echo "trusted = true" >> /usr/local/pgsql/share/extension/pg_graphql.control
@@ -683,7 +748,7 @@ RUN wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.4.0.tar.gz
#
#########################################################################################
FROM rust-extensions-build AS pg-tiktoken-pg-build
FROM rust-extensions-build-pgrx AS pg-tiktoken-pg-build
ARG PG_VERSION
# 26806147b17b60763039c6a6878884c41a262318 made on 26/09/2023
@@ -700,7 +765,7 @@ RUN wget https://github.com/kelvich/pg_tiktoken/archive/26806147b17b60763039c6a6
#
#########################################################################################
FROM rust-extensions-build AS pg-pgx-ulid-build
FROM rust-extensions-build-pgrx AS pg-pgx-ulid-build
ARG PG_VERSION
RUN wget https://github.com/pksunkara/pgx_ulid/archive/refs/tags/v0.1.3.tar.gz -O pgx_ulid.tar.gz && \

View File

@@ -1039,7 +1039,7 @@ LIMIT 100",
let remote_extensions = spec
.remote_extensions
.as_ref()
.ok_or(anyhow::anyhow!("Remote extensions are not configured"))?;
.ok_or(anyhow::anyhow!("Remote extensions are not configured",))?;
info!("parse shared_preload_libraries from spec.cluster.settings");
let mut libs_vec = Vec::new();

View File

@@ -1,5 +1,5 @@
use std::sync::Arc;
use std::{thread, time::Duration};
use std::{thread, time};
use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
@@ -7,7 +7,7 @@ use tracing::{debug, info};
use crate::compute::ComputeNode;
const MONITOR_CHECK_INTERVAL: Duration = Duration::from_millis(500);
const MONITOR_CHECK_INTERVAL: u64 = 500; // milliseconds
// Spin in a loop and figure out the last activity time in the Postgres.
// Then update it in the shared state. This function never errors out.
@@ -17,12 +17,13 @@ fn watch_compute_activity(compute: &ComputeNode) {
let connstr = compute.connstr.as_str();
// Define `client` outside of the loop to reuse existing connection if it's active.
let mut client = Client::connect(connstr, NoTls);
let timeout = time::Duration::from_millis(MONITOR_CHECK_INTERVAL);
info!("watching Postgres activity at {}", connstr);
loop {
// Should be outside of the write lock to allow others to read while we sleep.
thread::sleep(MONITOR_CHECK_INTERVAL);
thread::sleep(timeout);
match &mut client {
Ok(cli) => {

View File

@@ -6,7 +6,6 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
camino.workspace = true
clap.workspace = true
comfy-table.workspace = true
git-version.workspace = true

View File

@@ -1,6 +1,5 @@
use crate::{background_process, local_env::LocalEnv};
use anyhow::anyhow;
use camino::Utf8PathBuf;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use std::{path::PathBuf, process::Child};
@@ -48,9 +47,8 @@ impl AttachmentService {
}
}
fn pid_file(&self) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("attachment_service.pid"))
.expect("non-Unicode path")
fn pid_file(&self) -> PathBuf {
self.env.base_data_dir.join("attachment_service.pid")
}
pub fn start(&self) -> anyhow::Result<Child> {

View File

@@ -16,13 +16,12 @@ use std::ffi::OsStr;
use std::io::Write;
use std::os::unix::prelude::AsRawFd;
use std::os::unix::process::CommandExt;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::process::{Child, Command};
use std::time::Duration;
use std::{fs, io, thread};
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use nix::errno::Errno;
use nix::fcntl::{FcntlArg, FdFlag};
use nix::sys::signal::{kill, Signal};
@@ -46,9 +45,9 @@ const NOTICE_AFTER_RETRIES: u64 = 50;
/// it itself.
pub enum InitialPidFile<'t> {
/// Create a pidfile, to allow future CLI invocations to manipulate the process.
Create(&'t Utf8Path),
Create(&'t Path),
/// The process will create the pidfile itself, need to wait for that event.
Expect(&'t Utf8Path),
Expect(&'t Path),
}
/// Start a background child process using the parameters given.
@@ -138,11 +137,7 @@ where
}
/// Stops the process, using the pid file given. Returns Ok also if the process is already not running.
pub fn stop_process(
immediate: bool,
process_name: &str,
pid_file: &Utf8Path,
) -> anyhow::Result<()> {
pub fn stop_process(immediate: bool, process_name: &str, pid_file: &Path) -> anyhow::Result<()> {
let pid = match pid_file::read(pid_file)
.with_context(|| format!("read pid_file {pid_file:?}"))?
{
@@ -257,9 +252,9 @@ fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command {
/// will remain held until the cmd exits.
fn pre_exec_create_pidfile<P>(cmd: &mut Command, path: P) -> &mut Command
where
P: Into<Utf8PathBuf>,
P: Into<PathBuf>,
{
let path: Utf8PathBuf = path.into();
let path: PathBuf = path.into();
// SAFETY
// pre_exec is marked unsafe because it runs between fork and exec.
// Why is that dangerous in various ways?
@@ -316,7 +311,7 @@ where
fn process_started<F>(
pid: Pid,
pid_file_to_check: Option<&Utf8Path>,
pid_file_to_check: Option<&Path>,
status_check: &F,
) -> anyhow::Result<bool>
where

View File

@@ -7,7 +7,7 @@
//! ```
use anyhow::Context;
use camino::Utf8PathBuf;
use std::path::PathBuf;
use crate::{background_process, local_env};
@@ -30,7 +30,7 @@ pub fn start_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
|| {
let url = broker.client_url();
let status_url = url.join("status").with_context(|| {
format!("Failed to append /status path to broker endpoint {url}")
format!("Failed to append /status path to broker endpoint {url}",)
})?;
let request = client
.get(status_url)
@@ -50,7 +50,6 @@ pub fn stop_broker_process(env: &local_env::LocalEnv) -> anyhow::Result<()> {
background_process::stop_process(true, "storage_broker", &storage_broker_pid_file_path(env))
}
fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(env.base_data_dir.join("storage_broker.pid"))
.expect("non-Unicode path")
fn storage_broker_pid_file_path(env: &local_env::LocalEnv) -> PathBuf {
env.base_data_dir.join("storage_broker.pid")
}

View File

@@ -14,7 +14,6 @@ use std::process::{Child, Command};
use std::{io, result};
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
@@ -145,7 +144,7 @@ impl PageServerNode {
pub fn initialize(&self, config_overrides: &[&str]) -> anyhow::Result<()> {
// First, run `pageserver --init` and wait for it to write a config into FS and exit.
self.pageserver_init(config_overrides)
.with_context(|| format!("Failed to run init for pageserver node {}", self.conf.id))
.with_context(|| format!("Failed to run init for pageserver node {}", self.conf.id,))
}
pub fn repo_path(&self) -> PathBuf {
@@ -155,9 +154,8 @@ impl PageServerNode {
/// The pid file is created by the pageserver process, with its pid stored inside.
/// Other pageservers cannot lock the same file and overwrite it for as long as the current
/// pageserver runs. (Unless someone removes the file manually; never do that!)
fn pid_file(&self) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(self.repo_path().join("pageserver.pid"))
.expect("non-Unicode path")
fn pid_file(&self) -> PathBuf {
self.repo_path().join("pageserver.pid")
}
pub fn start(&self, config_overrides: &[&str]) -> anyhow::Result<Child> {

View File

@@ -11,7 +11,6 @@ use std::process::Child;
use std::{io, result};
use anyhow::Context;
use camino::Utf8PathBuf;
use postgres_connection::PgConnectionConfig;
use reqwest::blocking::{Client, RequestBuilder, Response};
use reqwest::{IntoUrl, Method};
@@ -98,9 +97,8 @@ impl SafekeeperNode {
SafekeeperNode::datadir_path_by_id(&self.env, self.id)
}
pub fn pid_file(&self) -> Utf8PathBuf {
Utf8PathBuf::from_path_buf(self.datadir_path().join("safekeeper.pid"))
.expect("non-Unicode path")
pub fn pid_file(&self) -> PathBuf {
self.datadir_path().join("safekeeper.pid")
}
pub fn start(&self, extra_opts: Vec<String>) -> anyhow::Result<Child> {

View File

@@ -12,7 +12,7 @@ log.workspace = true
once_cell.workspace = true
postgres.workspace = true
postgres_ffi.workspace = true
camino-tempfile.workspace = true
tempfile.workspace = true
workspace_hack.workspace = true

View File

@@ -1,5 +1,4 @@
use anyhow::{bail, ensure};
use camino_tempfile::{tempdir, Utf8TempDir};
use log::*;
use postgres::types::PgLsn;
use postgres::Client;
@@ -9,6 +8,7 @@ use std::cmp::Ordering;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
use tempfile::{tempdir, TempDir};
macro_rules! xlog_utils_test {
($version:ident) => {
@@ -33,7 +33,7 @@ pub struct Conf {
pub struct PostgresServer {
process: std::process::Child,
_unix_socket_dir: Utf8TempDir,
_unix_socket_dir: TempDir,
client_config: postgres::Config,
}

View File

@@ -13,7 +13,6 @@ aws-types.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
aws-credential-types.workspace = true
camino.workspace = true
hyper = { workspace = true, features = ["stream"] }
serde.workspace = true
serde_json.workspace = true
@@ -28,6 +27,6 @@ pin-project-lite.workspace = true
workspace_hack.workspace = true
[dev-dependencies]
camino-tempfile.workspace = true
tempfile.workspace = true
test-context.workspace = true
rand.workspace = true

View File

@@ -13,12 +13,12 @@ use std::{
collections::HashMap,
fmt::Debug,
num::{NonZeroU32, NonZeroUsize},
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
};
use anyhow::{bail, Context};
use camino::{Utf8Path, Utf8PathBuf};
use serde::{Deserialize, Serialize};
use tokio::io;
@@ -52,7 +52,7 @@ const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
/// The prefix is an implementation detail, that allows representing local paths
/// as the remote ones, stripping the local storage prefix away.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct RemotePath(Utf8PathBuf);
pub struct RemotePath(PathBuf);
impl Serialize for RemotePath {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
@@ -69,18 +69,18 @@ impl<'de> Deserialize<'de> for RemotePath {
D: serde::Deserializer<'de>,
{
let str = String::deserialize(deserializer)?;
Ok(Self(Utf8PathBuf::from(&str)))
Ok(Self(PathBuf::from(&str)))
}
}
impl std::fmt::Display for RemotePath {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(&self.0, f)
write!(f, "{}", self.0.display())
}
}
impl RemotePath {
pub fn new(relative_path: &Utf8Path) -> anyhow::Result<Self> {
pub fn new(relative_path: &Path) -> anyhow::Result<Self> {
anyhow::ensure!(
relative_path.is_relative(),
"Path {relative_path:?} is not relative"
@@ -89,30 +89,30 @@ impl RemotePath {
}
pub fn from_string(relative_path: &str) -> anyhow::Result<Self> {
Self::new(Utf8Path::new(relative_path))
Self::new(Path::new(relative_path))
}
pub fn with_base(&self, base_path: &Utf8Path) -> Utf8PathBuf {
pub fn with_base(&self, base_path: &Path) -> PathBuf {
base_path.join(&self.0)
}
pub fn object_name(&self) -> Option<&str> {
self.0.file_name()
self.0.file_name().and_then(|os_str| os_str.to_str())
}
pub fn join(&self, segment: &Utf8Path) -> Self {
pub fn join(&self, segment: &Path) -> Self {
Self(self.0.join(segment))
}
pub fn get_path(&self) -> &Utf8PathBuf {
pub fn get_path(&self) -> &PathBuf {
&self.0
}
pub fn extension(&self) -> Option<&str> {
self.0.extension()
self.0.extension()?.to_str()
}
pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Utf8Path, std::path::StripPrefixError> {
pub fn strip_prefix(&self, p: &RemotePath) -> Result<&Path, std::path::StripPrefixError> {
self.0.strip_prefix(&p.0)
}
}
@@ -311,7 +311,7 @@ impl GenericRemoteStorage {
pub fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
Ok(match &storage_config.storage {
RemoteStorageKind::LocalFs(root) => {
info!("Using fs root '{root}' as a remote storage");
info!("Using fs root '{}' as a remote storage", root.display());
Self::LocalFs(LocalFs::new(root.clone())?)
}
RemoteStorageKind::AwsS3(s3_config) => {
@@ -379,7 +379,7 @@ pub struct RemoteStorageConfig {
pub enum RemoteStorageKind {
/// Storage based on local file system.
/// Specify a root folder to place all stored files into.
LocalFs(Utf8PathBuf),
LocalFs(PathBuf),
/// AWS S3 based storage, storing all files in the S3 bucket
/// specified by the config
AwsS3(S3Config),
@@ -474,7 +474,7 @@ impl RemoteStorageConfig {
concurrency_limit,
max_keys_per_list_response,
}),
(Some(local_path), None, None) => RemoteStorageKind::LocalFs(Utf8PathBuf::from(
(Some(local_path), None, None) => RemoteStorageKind::LocalFs(PathBuf::from(
parse_toml_string("local_path", local_path)?,
)),
(Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"),
@@ -519,23 +519,23 @@ mod tests {
#[test]
fn test_object_name() {
let k = RemotePath::new(Utf8Path::new("a/b/c")).unwrap();
let k = RemotePath::new(Path::new("a/b/c")).unwrap();
assert_eq!(k.object_name(), Some("c"));
let k = RemotePath::new(Utf8Path::new("a/b/c/")).unwrap();
let k = RemotePath::new(Path::new("a/b/c/")).unwrap();
assert_eq!(k.object_name(), Some("c"));
let k = RemotePath::new(Utf8Path::new("a/")).unwrap();
let k = RemotePath::new(Path::new("a/")).unwrap();
assert_eq!(k.object_name(), Some("a"));
// XXX is it impossible to have an empty key?
let k = RemotePath::new(Utf8Path::new("")).unwrap();
let k = RemotePath::new(Path::new("")).unwrap();
assert_eq!(k.object_name(), None);
}
#[test]
fn rempte_path_cannot_be_created_from_absolute_ones() {
let err = RemotePath::new(Utf8Path::new("/")).expect_err("Should fail on absolute paths");
let err = RemotePath::new(Path::new("/")).expect_err("Should fail on absolute paths");
assert_eq!(err.to_string(), "Path \"/\" is not relative");
}
}

View File

@@ -4,10 +4,15 @@
//! This storage used in tests, but can also be used in cases when a certain persistent
//! volume is mounted to the local FS.
use std::{borrow::Cow, future::Future, io::ErrorKind, pin::Pin};
use std::{
borrow::Cow,
future::Future,
io::ErrorKind,
path::{Path, PathBuf},
pin::Pin,
};
use anyhow::{bail, ensure, Context};
use camino::{Utf8Path, Utf8PathBuf};
use tokio::{
fs,
io::{self, AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
@@ -23,20 +28,20 @@ const LOCAL_FS_TEMP_FILE_SUFFIX: &str = "___temp";
#[derive(Debug, Clone)]
pub struct LocalFs {
storage_root: Utf8PathBuf,
storage_root: PathBuf,
}
impl LocalFs {
/// Attempts to create local FS storage, along with its root directory.
/// Storage root will be created (if does not exist) and transformed into an absolute path (if passed as relative).
pub fn new(mut storage_root: Utf8PathBuf) -> anyhow::Result<Self> {
pub fn new(mut storage_root: PathBuf) -> anyhow::Result<Self> {
if !storage_root.exists() {
std::fs::create_dir_all(&storage_root).with_context(|| {
format!("Failed to create all directories in the given root path {storage_root:?}")
})?;
}
if !storage_root.is_absolute() {
storage_root = storage_root.canonicalize_utf8().with_context(|| {
storage_root = storage_root.canonicalize().with_context(|| {
format!("Failed to represent path {storage_root:?} as an absolute path")
})?;
}
@@ -45,7 +50,7 @@ impl LocalFs {
}
// mirrors S3Bucket::s3_object_to_relative_path
fn local_file_to_relative_path(&self, key: Utf8PathBuf) -> RemotePath {
fn local_file_to_relative_path(&self, key: PathBuf) -> RemotePath {
let relative_path = key
.strip_prefix(&self.storage_root)
.expect("relative path must contain storage_root as prefix");
@@ -54,18 +59,22 @@ impl LocalFs {
async fn read_storage_metadata(
&self,
file_path: &Utf8Path,
file_path: &Path,
) -> anyhow::Result<Option<StorageMetadata>> {
let metadata_path = storage_metadata_path(file_path);
if metadata_path.exists() && metadata_path.is_file() {
let metadata_string = fs::read_to_string(&metadata_path).await.with_context(|| {
format!("Failed to read metadata from the local storage at '{metadata_path}'")
format!(
"Failed to read metadata from the local storage at '{}'",
metadata_path.display()
)
})?;
serde_json::from_str(&metadata_string)
.with_context(|| {
format!(
"Failed to deserialize metadata from the local storage at '{metadata_path}'",
"Failed to deserialize metadata from the local storage at '{}'",
metadata_path.display()
)
})
.map(|metadata| Some(StorageMetadata(metadata)))
@@ -162,21 +171,25 @@ impl RemoteStorage for LocalFs {
}
}
// Note that Utf8PathBuf starts_with only considers full path segments, but
// Note that PathBuf starts_with only considers full path segments, but
// object prefixes are arbitrary strings, so we need the strings for doing
// starts_with later.
let prefix = full_path.as_str();
let prefix = full_path.to_string_lossy();
let mut files = vec![];
let mut directory_queue = vec![initial_dir];
let mut directory_queue = vec![initial_dir.clone()];
while let Some(cur_folder) = directory_queue.pop() {
let mut entries = cur_folder.read_dir_utf8()?;
while let Some(Ok(entry)) = entries.next() {
let file_name = entry.file_name();
let full_file_name = cur_folder.join(file_name);
if full_file_name.as_str().starts_with(prefix) {
let mut entries = fs::read_dir(cur_folder.clone()).await?;
while let Some(entry) = entries.next_entry().await? {
let file_name: PathBuf = entry.file_name().into();
let full_file_name = cur_folder.clone().join(&file_name);
if full_file_name
.to_str()
.map(|s| s.starts_with(prefix.as_ref()))
.unwrap_or(false)
{
let file_remote_path = self.local_file_to_relative_path(full_file_name.clone());
files.push(file_remote_path);
files.push(file_remote_path.clone());
if full_file_name.is_dir() {
directory_queue.push(full_file_name);
}
@@ -217,7 +230,10 @@ impl RemoteStorage for LocalFs {
.open(&temp_file_path)
.await
.with_context(|| {
format!("Failed to open target fs destination at '{target_file_path}'")
format!(
"Failed to open target fs destination at '{}'",
target_file_path.display()
)
})?,
);
@@ -228,7 +244,8 @@ impl RemoteStorage for LocalFs {
.await
.with_context(|| {
format!(
"Failed to upload file (write temp) to the local storage at '{temp_file_path}'",
"Failed to upload file (write temp) to the local storage at '{}'",
temp_file_path.display()
)
})?;
@@ -245,7 +262,8 @@ impl RemoteStorage for LocalFs {
destination.flush().await.with_context(|| {
format!(
"Failed to upload (flush temp) file to the local storage at '{temp_file_path}'",
"Failed to upload (flush temp) file to the local storage at '{}'",
temp_file_path.display()
)
})?;
@@ -253,7 +271,8 @@ impl RemoteStorage for LocalFs {
.await
.with_context(|| {
format!(
"Failed to upload (rename) file to the local storage at '{target_file_path}'",
"Failed to upload (rename) file to the local storage at '{}'",
target_file_path.display()
)
})?;
@@ -267,7 +286,8 @@ impl RemoteStorage for LocalFs {
.await
.with_context(|| {
format!(
"Failed to write metadata to the local storage at '{storage_metadata_path}'",
"Failed to write metadata to the local storage at '{}'",
storage_metadata_path.display()
)
})?;
}
@@ -373,16 +393,16 @@ impl RemoteStorage for LocalFs {
}
}
fn storage_metadata_path(original_path: &Utf8Path) -> Utf8PathBuf {
fn storage_metadata_path(original_path: &Path) -> PathBuf {
path_with_suffix_extension(original_path, "metadata")
}
fn get_all_files<'a, P>(
directory_path: P,
recursive: bool,
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<Utf8PathBuf>>> + Send + Sync + 'a>>
) -> Pin<Box<dyn Future<Output = anyhow::Result<Vec<PathBuf>>> + Send + Sync + 'a>>
where
P: AsRef<Utf8Path> + Send + Sync + 'a,
P: AsRef<Path> + Send + Sync + 'a,
{
Box::pin(async move {
let directory_path = directory_path.as_ref();
@@ -392,13 +412,7 @@ where
let mut dir_contents = fs::read_dir(directory_path).await?;
while let Some(dir_entry) = dir_contents.next_entry().await? {
let file_type = dir_entry.file_type().await?;
let entry_path =
Utf8PathBuf::from_path_buf(dir_entry.path()).map_err(|pb| {
anyhow::Error::msg(format!(
"non-Unicode path: {}",
pb.to_string_lossy()
))
})?;
let entry_path = dir_entry.path();
if file_type.is_symlink() {
debug!("{entry_path:?} is a symlink, skipping")
} else if file_type.is_dir() {
@@ -421,10 +435,13 @@ where
})
}
async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<()> {
async fn create_target_directory(target_file_path: &Path) -> anyhow::Result<()> {
let target_dir = match target_file_path.parent() {
Some(parent_dir) => parent_dir,
None => bail!("File path '{target_file_path}' has no parent directory"),
None => bail!(
"File path '{}' has no parent directory",
target_file_path.display()
),
};
if !target_dir.exists() {
fs::create_dir_all(target_dir).await?;
@@ -432,9 +449,13 @@ async fn create_target_directory(target_file_path: &Utf8Path) -> anyhow::Result<
Ok(())
}
fn file_exists(file_path: &Utf8Path) -> anyhow::Result<bool> {
fn file_exists(file_path: &Path) -> anyhow::Result<bool> {
if file_path.exists() {
ensure!(file_path.is_file(), "file path '{file_path}' is not a file");
ensure!(
file_path.is_file(),
"file path '{}' is not a file",
file_path.display()
);
Ok(true)
} else {
Ok(false)
@@ -445,13 +466,13 @@ fn file_exists(file_path: &Utf8Path) -> anyhow::Result<bool> {
mod fs_tests {
use super::*;
use camino_tempfile::tempdir;
use std::{collections::HashMap, io::Write};
use tempfile::tempdir;
async fn read_and_assert_remote_file_contents(
storage: &LocalFs,
#[allow(clippy::ptr_arg)]
// have to use &Utf8PathBuf due to `storage.local_path` parameter requirements
// have to use &PathBuf due to `storage.local_path` parameter requirements
remote_storage_path: &RemotePath,
expected_metadata: Option<&StorageMetadata>,
) -> anyhow::Result<String> {
@@ -498,7 +519,7 @@ mod fs_tests {
async fn upload_file_negatives() -> anyhow::Result<()> {
let storage = create_storage()?;
let id = RemotePath::new(Utf8Path::new("dummy"))?;
let id = RemotePath::new(Path::new("dummy"))?;
let content = std::io::Cursor::new(b"12345");
// Check that you get an error if the size parameter doesn't match the actual
@@ -523,8 +544,7 @@ mod fs_tests {
}
fn create_storage() -> anyhow::Result<LocalFs> {
let storage_root = tempdir()?.path().to_path_buf();
LocalFs::new(storage_root)
LocalFs::new(tempdir()?.path().to_owned())
}
#[tokio::test]
@@ -541,7 +561,7 @@ mod fs_tests {
);
let non_existing_path = "somewhere/else";
match storage.download(&RemotePath::new(Utf8Path::new(non_existing_path))?).await {
match storage.download(&RemotePath::new(Path::new(non_existing_path))?).await {
Err(DownloadError::NotFound) => {} // Should get NotFound for non existing keys
other => panic!("Should get a NotFound error when downloading non-existing storage files, but got: {other:?}"),
}
@@ -755,7 +775,7 @@ mod fs_tests {
}
async fn create_file_for_upload(
path: &Utf8Path,
path: &Path,
contents: &str,
) -> anyhow::Result<(io::BufReader<fs::File>, usize)> {
std::fs::create_dir_all(path.parent().unwrap())?;

View File

@@ -47,47 +47,10 @@ pub struct S3Bucket {
bucket_name: String,
prefix_in_bucket: Option<String>,
max_keys_per_list_response: Option<i32>,
concurrency_limiter: ConcurrencyLimiter,
}
struct ConcurrencyLimiter {
// Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded.
// Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold.
// The helps to ensure we don't exceed the thresholds.
write: Arc<Semaphore>,
read: Arc<Semaphore>,
}
impl ConcurrencyLimiter {
fn for_kind(&self, kind: RequestKind) -> &Arc<Semaphore> {
match kind {
RequestKind::Get => &self.read,
RequestKind::Put => &self.write,
RequestKind::List => &self.read,
RequestKind::Delete => &self.write,
}
}
async fn acquire(
&self,
kind: RequestKind,
) -> Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
self.for_kind(kind).acquire().await
}
async fn acquire_owned(
&self,
kind: RequestKind,
) -> Result<tokio::sync::OwnedSemaphorePermit, tokio::sync::AcquireError> {
Arc::clone(self.for_kind(kind)).acquire_owned().await
}
fn new(limit: usize) -> ConcurrencyLimiter {
Self {
read: Arc::new(Semaphore::new(limit)),
write: Arc::new(Semaphore::new(limit)),
}
}
concurrency_limiter: Arc<Semaphore>,
}
#[derive(Default)]
@@ -154,7 +117,7 @@ impl S3Bucket {
bucket_name: aws_config.bucket_name.clone(),
max_keys_per_list_response: aws_config.max_keys_per_list_response,
prefix_in_bucket,
concurrency_limiter: ConcurrencyLimiter::new(aws_config.concurrency_limit.get()),
concurrency_limiter: Arc::new(Semaphore::new(aws_config.concurrency_limit.get())),
})
}
@@ -180,11 +143,12 @@ impl S3Bucket {
assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
let path_string = path
.get_path()
.as_str()
.trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
.to_string_lossy()
.trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR)
.to_string();
match &self.prefix_in_bucket {
Some(prefix) => prefix.clone() + "/" + path_string,
None => path_string.to_string(),
Some(prefix) => prefix.clone() + "/" + &path_string,
None => path_string,
}
}
@@ -192,7 +156,7 @@ impl S3Bucket {
let started_at = start_counting_cancelled_wait(kind);
let permit = self
.concurrency_limiter
.acquire(kind)
.acquire()
.await
.expect("semaphore is never closed");
@@ -208,7 +172,8 @@ impl S3Bucket {
let started_at = start_counting_cancelled_wait(kind);
let permit = self
.concurrency_limiter
.acquire_owned(kind)
.clone()
.acquire_owned()
.await
.expect("semaphore is never closed");
@@ -600,8 +565,8 @@ fn start_measuring_requests(
#[cfg(test)]
mod tests {
use camino::Utf8Path;
use std::num::NonZeroUsize;
use std::path::Path;
use crate::{RemotePath, S3Bucket, S3Config};
@@ -610,7 +575,7 @@ mod tests {
let all_paths = ["", "some/path", "some/path/"];
let all_paths: Vec<RemotePath> = all_paths
.iter()
.map(|x| RemotePath::new(Utf8Path::new(x)).expect("bad path"))
.map(|x| RemotePath::new(Path::new(x)).expect("bad path"))
.collect();
let prefixes = [
None,

View File

@@ -2,12 +2,11 @@ use std::collections::HashSet;
use std::env;
use std::num::{NonZeroU32, NonZeroUsize};
use std::ops::ControlFlow;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::UNIX_EPOCH;
use anyhow::Context;
use camino::Utf8Path;
use once_cell::sync::OnceCell;
use remote_storage::{
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
@@ -56,7 +55,7 @@ async fn s3_pagination_should_work(ctx: &mut MaybeEnabledS3WithTestBlobs) -> any
let test_client = Arc::clone(&ctx.enabled.client);
let expected_remote_prefixes = ctx.remote_prefixes.clone();
let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix))
let base_prefix = RemotePath::new(Path::new(ctx.enabled.base_prefix))
.context("common_prefix construction")?;
let root_remote_prefixes = test_client
.list_prefixes(None)
@@ -109,7 +108,7 @@ async fn s3_list_files_works(ctx: &mut MaybeEnabledS3WithSimpleTestBlobs) -> any
};
let test_client = Arc::clone(&ctx.enabled.client);
let base_prefix =
RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?;
RemotePath::new(Path::new("folder1")).context("common_prefix construction")?;
let root_files = test_client
.list_files(None)
.await
@@ -130,9 +129,9 @@ async fn s3_list_files_works(ctx: &mut MaybeEnabledS3WithSimpleTestBlobs) -> any
let trim_remote_blobs: HashSet<_> = ctx
.remote_blobs
.iter()
.map(|x| x.get_path())
.map(|x| x.get_path().to_str().expect("must be valid name"))
.filter(|x| x.starts_with("folder1"))
.map(|x| RemotePath::new(x).expect("must be valid path"))
.map(|x| RemotePath::new(Path::new(x)).expect("must be valid name"))
.collect();
assert_eq!(
nested_remote_files, trim_remote_blobs,
@@ -149,9 +148,10 @@ async fn s3_delete_non_exising_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result
MaybeEnabledS3::Disabled => return Ok(()),
};
let path = RemotePath::new(Utf8Path::new(
format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(),
))
let path = RemotePath::new(&PathBuf::from(format!(
"{}/for_sure_there_is_nothing_there_really",
ctx.base_prefix,
)))
.with_context(|| "RemotePath conversion")?;
ctx.client.delete(&path).await.expect("should succeed");
@@ -167,13 +167,13 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
MaybeEnabledS3::Disabled => return Ok(()),
};
let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
let path1 = RemotePath::new(&PathBuf::from(format!("{}/path1", ctx.base_prefix,)))
.with_context(|| "RemotePath conversion")?;
let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
let path2 = RemotePath::new(&PathBuf::from(format!("{}/path2", ctx.base_prefix,)))
.with_context(|| "RemotePath conversion")?;
let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
let path3 = RemotePath::new(&PathBuf::from(format!("{}/path3", ctx.base_prefix,)))
.with_context(|| "RemotePath conversion")?;
let data1 = "remote blob data1".as_bytes();
@@ -427,10 +427,10 @@ async fn upload_s3_data(
for i in 1..upload_tasks_count + 1 {
let task_client = Arc::clone(client);
upload_tasks.spawn(async move {
let prefix = format!("{base_prefix_str}/sub_prefix_{i}/");
let blob_prefix = RemotePath::new(Utf8Path::new(&prefix))
let prefix = PathBuf::from(format!("{base_prefix_str}/sub_prefix_{i}/"));
let blob_prefix = RemotePath::new(&prefix)
.with_context(|| format!("{prefix:?} to RemotePath conversion"))?;
let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}")));
let blob_path = blob_prefix.join(Path::new(&format!("blob_{i}")));
debug!("Creating remote item {i} at path {blob_path:?}");
let data = format!("remote blob data {i}").into_bytes();
@@ -512,10 +512,8 @@ async fn upload_simple_s3_data(
let task_client = Arc::clone(client);
upload_tasks.spawn(async move {
let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i));
let blob_path = RemotePath::new(
Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"),
)
.with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
let blob_path = RemotePath::new(&blob_path)
.with_context(|| format!("{blob_path:?} to RemotePath conversion"))?;
debug!("Creating remote item {i} at path {blob_path:?}");
let data = format!("remote blob data {i}").into_bytes();

View File

@@ -10,7 +10,6 @@ async-trait.workspace = true
anyhow.workspace = true
bincode.workspace = true
bytes.workspace = true
camino.workspace = true
chrono.workspace = true
heapless.workspace = true
hex = { workspace = true, features = ["serde"] }
@@ -54,7 +53,7 @@ byteorder.workspace = true
bytes.workspace = true
criterion.workspace = true
hex-literal.workspace = true
camino-tempfile.workspace = true
tempfile.workspace = true
[[bench]]
name = "benchmarks"

View File

@@ -2,9 +2,9 @@
use serde;
use std::fs;
use std::path::Path;
use anyhow::Result;
use camino::Utf8Path;
use jsonwebtoken::{
decode, encode, Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation,
};
@@ -65,7 +65,7 @@ impl JwtAuth {
}
}
pub fn from_key_path(key_path: &Utf8Path) -> Result<Self> {
pub fn from_key_path(key_path: &Path) -> Result<Self> {
let public_key = fs::read(key_path)?;
Ok(Self::new(DecodingKey::from_ed_pem(&public_key)?))
}

View File

@@ -1,14 +1,14 @@
use std::{
borrow::Cow,
ffi::OsStr,
fs::{self, File},
io,
path::{Path, PathBuf},
};
use camino::{Utf8Path, Utf8PathBuf};
/// Similar to [`std::fs::create_dir`], except we fsync the
/// created directory and its parent.
pub fn create_dir(path: impl AsRef<Utf8Path>) -> io::Result<()> {
pub fn create_dir(path: impl AsRef<Path>) -> io::Result<()> {
let path = path.as_ref();
fs::create_dir(path)?;
@@ -18,7 +18,7 @@ pub fn create_dir(path: impl AsRef<Utf8Path>) -> io::Result<()> {
/// Similar to [`std::fs::create_dir_all`], except we fsync all
/// newly created directories and the pre-existing parent.
pub fn create_dir_all(path: impl AsRef<Utf8Path>) -> io::Result<()> {
pub fn create_dir_all(path: impl AsRef<Path>) -> io::Result<()> {
let mut path = path.as_ref();
let mut dirs_to_create = Vec::new();
@@ -30,7 +30,7 @@ pub fn create_dir_all(path: impl AsRef<Utf8Path>) -> io::Result<()> {
Ok(_) => {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
format!("non-directory found in path: {path}"),
format!("non-directory found in path: {}", path.display()),
));
}
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {}
@@ -44,7 +44,7 @@ pub fn create_dir_all(path: impl AsRef<Utf8Path>) -> io::Result<()> {
None => {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!("can't find parent of path '{path}'"),
format!("can't find parent of path '{}'", path.display()).as_str(),
));
}
}
@@ -70,18 +70,21 @@ pub fn create_dir_all(path: impl AsRef<Utf8Path>) -> io::Result<()> {
/// Adds a suffix to the file(directory) name, either appending the suffix to the end of its extension,
/// or if there's no extension, creates one and puts a suffix there.
pub fn path_with_suffix_extension(
original_path: impl AsRef<Utf8Path>,
suffix: &str,
) -> Utf8PathBuf {
let new_extension = match original_path.as_ref().extension() {
pub fn path_with_suffix_extension(original_path: impl AsRef<Path>, suffix: &str) -> PathBuf {
let new_extension = match original_path
.as_ref()
.extension()
.map(OsStr::to_string_lossy)
{
Some(extension) => Cow::Owned(format!("{extension}.{suffix}")),
None => Cow::Borrowed(suffix),
};
original_path.as_ref().with_extension(new_extension)
original_path
.as_ref()
.with_extension(new_extension.as_ref())
}
pub fn fsync_file_and_parent(file_path: &Utf8Path) -> io::Result<()> {
pub fn fsync_file_and_parent(file_path: &Path) -> io::Result<()> {
let parent = file_path.parent().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
@@ -94,7 +97,7 @@ pub fn fsync_file_and_parent(file_path: &Utf8Path) -> io::Result<()> {
Ok(())
}
pub fn fsync(path: &Utf8Path) -> io::Result<()> {
pub fn fsync(path: &Path) -> io::Result<()> {
File::open(path)
.map_err(|e| io::Error::new(e.kind(), format!("Failed to open the file {path:?}: {e}")))
.and_then(|file| {
@@ -108,18 +111,19 @@ pub fn fsync(path: &Utf8Path) -> io::Result<()> {
.map_err(|e| io::Error::new(e.kind(), format!("Failed to fsync file {path:?}: {e}")))
}
pub async fn fsync_async(path: impl AsRef<Utf8Path>) -> Result<(), std::io::Error> {
tokio::fs::File::open(path.as_ref()).await?.sync_all().await
pub async fn fsync_async(path: impl AsRef<std::path::Path>) -> Result<(), std::io::Error> {
tokio::fs::File::open(path).await?.sync_all().await
}
#[cfg(test)]
mod tests {
use tempfile::tempdir;
use super::*;
#[test]
fn test_create_dir_fsyncd() {
let dir = camino_tempfile::tempdir().unwrap();
let dir = tempdir().unwrap();
let existing_dir_path = dir.path();
let err = create_dir(existing_dir_path).unwrap_err();
@@ -135,7 +139,7 @@ mod tests {
#[test]
fn test_create_dir_all_fsyncd() {
let dir = camino_tempfile::tempdir().unwrap();
let dir = tempdir().unwrap();
let existing_dir_path = dir.path();
create_dir_all(existing_dir_path).unwrap();
@@ -162,29 +166,29 @@ mod tests {
#[test]
fn test_path_with_suffix_extension() {
let p = Utf8PathBuf::from("/foo/bar");
let p = PathBuf::from("/foo/bar");
assert_eq!(
&path_with_suffix_extension(p, "temp").to_string(),
&path_with_suffix_extension(p, "temp").to_string_lossy(),
"/foo/bar.temp"
);
let p = Utf8PathBuf::from("/foo/bar");
let p = PathBuf::from("/foo/bar");
assert_eq!(
&path_with_suffix_extension(p, "temp.temp").to_string(),
&path_with_suffix_extension(p, "temp.temp").to_string_lossy(),
"/foo/bar.temp.temp"
);
let p = Utf8PathBuf::from("/foo/bar.baz");
let p = PathBuf::from("/foo/bar.baz");
assert_eq!(
&path_with_suffix_extension(p, "temp.temp").to_string(),
&path_with_suffix_extension(p, "temp.temp").to_string_lossy(),
"/foo/bar.baz.temp.temp"
);
let p = Utf8PathBuf::from("/foo/bar.baz");
let p = PathBuf::from("/foo/bar.baz");
assert_eq!(
&path_with_suffix_extension(p, ".temp").to_string(),
&path_with_suffix_extension(p, ".temp").to_string_lossy(),
"/foo/bar.baz..temp"
);
let p = Utf8PathBuf::from("/foo/bar/dir/");
let p = PathBuf::from("/foo/bar/dir/");
assert_eq!(
&path_with_suffix_extension(p, ".temp").to_string(),
&path_with_suffix_extension(p, ".temp").to_string_lossy(),
"/foo/bar/dir..temp"
);
}

View File

@@ -55,6 +55,8 @@ where
#[cfg(test)]
mod test {
use std::path::PathBuf;
use crate::fs_ext::{is_directory_empty, list_dir};
use super::ignore_absent_files;
@@ -63,7 +65,7 @@ mod test {
fn is_empty_dir() {
use super::PathExt;
let dir = camino_tempfile::tempdir().unwrap();
let dir = tempfile::tempdir().unwrap();
let dir_path = dir.path();
// test positive case
@@ -73,7 +75,7 @@ mod test {
);
// invoke on a file to ensure it returns an error
let file_path = dir_path.join("testfile");
let file_path: PathBuf = dir_path.join("testfile");
let f = std::fs::File::create(&file_path).unwrap();
drop(f);
assert!(file_path.is_empty_dir().is_err());
@@ -85,7 +87,7 @@ mod test {
#[tokio::test]
async fn is_empty_dir_async() {
let dir = camino_tempfile::tempdir().unwrap();
let dir = tempfile::tempdir().unwrap();
let dir_path = dir.path();
// test positive case
@@ -95,7 +97,7 @@ mod test {
);
// invoke on a file to ensure it returns an error
let file_path = dir_path.join("testfile");
let file_path: PathBuf = dir_path.join("testfile");
let f = std::fs::File::create(&file_path).unwrap();
drop(f);
assert!(is_directory_empty(&file_path).await.is_err());
@@ -107,9 +109,10 @@ mod test {
#[test]
fn ignore_absent_files_works() {
let dir = camino_tempfile::tempdir().unwrap();
let dir = tempfile::tempdir().unwrap();
let dir_path = dir.path();
let file_path = dir.path().join("testfile");
let file_path: PathBuf = dir_path.join("testfile");
ignore_absent_files(|| std::fs::remove_file(&file_path)).expect("should execute normally");
@@ -123,17 +126,17 @@ mod test {
#[tokio::test]
async fn list_dir_works() {
let dir = camino_tempfile::tempdir().unwrap();
let dir = tempfile::tempdir().unwrap();
let dir_path = dir.path();
assert!(list_dir(dir_path).await.unwrap().is_empty());
let file_path = dir_path.join("testfile");
let file_path: PathBuf = dir_path.join("testfile");
let _ = std::fs::File::create(&file_path).unwrap();
assert_eq!(&list_dir(dir_path).await.unwrap(), &["testfile"]);
let another_dir_path = dir_path.join("testdir");
let another_dir_path: PathBuf = dir_path.join("testdir");
std::fs::create_dir(another_dir_path).unwrap();
let expected = &["testdir", "testfile"];

View File

@@ -24,9 +24,6 @@ pub enum ApiError {
#[error("Precondition failed: {0}")]
PreconditionFailed(Box<str>),
#[error("Resource temporarily unavailable: {0}")]
ResourceUnavailable(String),
#[error("Shutting down")]
ShuttingDown,
@@ -62,10 +59,6 @@ impl ApiError {
"Shutting down".to_string(),
StatusCode::SERVICE_UNAVAILABLE,
),
ApiError::ResourceUnavailable(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
),
ApiError::InternalServerError(err) => HttpErrorBody::response_from_msg_and_status(
err.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,

View File

@@ -1,3 +1,4 @@
use std::ffi::OsStr;
use std::{fmt, str::FromStr};
use anyhow::Context;
@@ -214,11 +215,12 @@ pub struct TimelineId(Id);
id_newtype!(TimelineId);
impl TryFrom<Option<&str>> for TimelineId {
impl TryFrom<Option<&OsStr>> for TimelineId {
type Error = anyhow::Error;
fn try_from(value: Option<&str>) -> Result<Self, Self::Error> {
fn try_from(value: Option<&OsStr>) -> Result<Self, Self::Error> {
value
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TimelineId>()
.with_context(|| format!("Could not parse timeline id from {:?}", value))

View File

@@ -11,10 +11,10 @@ use std::{
io::{Read, Write},
ops::Deref,
os::unix::prelude::AsRawFd,
path::{Path, PathBuf},
};
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use nix::{errno::Errno::EAGAIN, fcntl};
use crate::crashsafe;
@@ -23,7 +23,7 @@ use crate::crashsafe;
/// Returned by [`create_exclusive`].
#[must_use]
pub struct UnwrittenLockFile {
path: Utf8PathBuf,
path: PathBuf,
file: fs::File,
}
@@ -60,7 +60,7 @@ impl UnwrittenLockFile {
///
/// It is not an error if the file already exists.
/// It is an error if the file is already locked.
pub fn create_exclusive(lock_file_path: &Utf8Path) -> anyhow::Result<UnwrittenLockFile> {
pub fn create_exclusive(lock_file_path: &Path) -> anyhow::Result<UnwrittenLockFile> {
let lock_file = fs::OpenOptions::new()
.create(true) // O_CREAT
.write(true)
@@ -101,7 +101,7 @@ pub enum LockFileRead {
/// Open & try to lock the lock file at the given `path`, returning a [handle][`LockFileRead`] to
/// inspect its content. It is not an `Err(...)` if the file does not exist or is already locked.
/// Check the [`LockFileRead`] variants for details.
pub fn read_and_hold_lock_file(path: &Utf8Path) -> anyhow::Result<LockFileRead> {
pub fn read_and_hold_lock_file(path: &Path) -> anyhow::Result<LockFileRead> {
let res = fs::OpenOptions::new().read(true).open(path);
let mut lock_file = match res {
Ok(f) => f,

View File

@@ -228,12 +228,6 @@ impl SecretString {
}
}
impl From<String> for SecretString {
fn from(s: String) -> Self {
Self(s)
}
}
impl std::fmt::Debug for SecretString {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[SECRET]")

View File

@@ -1,9 +1,9 @@
#![warn(missing_docs)]
use camino::Utf8Path;
use serde::{Deserialize, Serialize};
use std::fmt;
use std::ops::{Add, AddAssign};
use std::path::Path;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
@@ -44,9 +44,11 @@ impl Lsn {
/// Parse an LSN from a filename in the form `0000000000000000`
pub fn from_filename<F>(filename: F) -> Result<Self, LsnParseError>
where
F: AsRef<Utf8Path>,
F: AsRef<Path>,
{
Lsn::from_hex(filename.as_ref().as_str())
let filename: &Path = filename.as_ref();
let filename = filename.to_str().ok_or(LsnParseError)?;
Lsn::from_hex(filename)
}
/// Parse an LSN from a string in the form `0000000000000000`

View File

@@ -49,10 +49,9 @@
//! At this point, `B` and `C` are running, which is hazardous.
//! Morale of the story: don't unlink pidfiles, ever.
use std::ops::Deref;
use std::{ops::Deref, path::Path};
use anyhow::Context;
use camino::Utf8Path;
use nix::unistd::Pid;
use crate::lock_file::{self, LockFileRead};
@@ -85,7 +84,7 @@ impl Deref for PidFileGuard {
/// The claim ends as soon as the returned guard object is dropped.
/// To maintain the claim for the remaining lifetime of the current process,
/// use [`std::mem::forget`] or similar.
pub fn claim_for_current_process(path: &Utf8Path) -> anyhow::Result<PidFileGuard> {
pub fn claim_for_current_process(path: &Path) -> anyhow::Result<PidFileGuard> {
let unwritten_lock_file = lock_file::create_exclusive(path).context("lock file")?;
// if any of the next steps fail, we drop the file descriptor and thereby release the lock
let guard = unwritten_lock_file
@@ -133,7 +132,7 @@ pub enum PidFileRead {
///
/// On success, this function returns a [`PidFileRead`].
/// Check its docs for a description of the meaning of its different variants.
pub fn read(pidfile: &Utf8Path) -> anyhow::Result<PidFileRead> {
pub fn read(pidfile: &Path) -> anyhow::Result<PidFileRead> {
let res = lock_file::read_and_hold_lock_file(pidfile).context("read and hold pid file")?;
let ret = match res {
LockFileRead::NotExist => PidFileRead::NotExist,

View File

@@ -4,9 +4,9 @@
//! This is the "Monitor" part of the monitor binary and is the main entrypoint for
//! all functionality.
use std::fmt::Debug;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{fmt::Debug, mem};
use anyhow::{bail, Context};
use axum::extract::ws::{Message, WebSocket};
@@ -141,6 +141,14 @@ impl Runner {
);
state.cgroup = Some(cgroup);
} else {
// *NOTE*: We need to forget the sender so that its drop impl does not get ran.
// This allows us to poll it in `Monitor::run` regardless of whether we
// are managing a cgroup or not. If we don't forget it, all receives will
// immediately return an error because the sender is droped and it will
// claim all select! statements, effectively turning `Monitor::run` into
// `loop { fail to receive }`.
mem::forget(requesting_send);
}
let mut file_cache_reserved_bytes = 0;
@@ -409,7 +417,7 @@ impl Runner {
}
}
// we need to propagate an upscale request
request = self.dispatcher.request_upscale_events.recv(), if self.cgroup.is_some() => {
request = self.dispatcher.request_upscale_events.recv() => {
if request.is_none() {
bail!("failed to listen for upscale event from cgroup")
}

View File

@@ -17,8 +17,6 @@ async-stream.workspace = true
async-trait.workspace = true
byteorder.workspace = true
bytes.workspace = true
camino.workspace = true
camino-tempfile.workspace = true
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["string"] }
close_fds.workspace = true
@@ -82,6 +80,8 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
tempfile.workspace = true
async-channel = "1.9.0"
[dev-dependencies]
criterion.workspace = true

View File

@@ -25,7 +25,7 @@ fn redo_scenarios(c: &mut Criterion) {
// input to the stderr.
// utils::logging::init(utils::logging::LogFormat::Plain).unwrap();
let repo_dir = camino_tempfile::tempdir_in(env!("CARGO_TARGET_TMPDIR")).unwrap();
let repo_dir = tempfile::tempdir_in(env!("CARGO_TARGET_TMPDIR")).unwrap();
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let conf = Box::leak(Box::new(conf));

View File

@@ -9,7 +9,6 @@ license.workspace = true
[dependencies]
anyhow.workspace = true
bytes.workspace = true
camino.workspace = true
clap = { workspace = true, features = ["string"] }
git-version.workspace = true
pageserver = { path = ".." }

View File

@@ -3,14 +3,13 @@
//! Currently it only analyzes holes, which are regions within the layer range that the layer contains no updates for. In the future it might do more analysis (maybe key quantiles?) but it should never return sensitive data.
use anyhow::Result;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::Range;
use std::{fs, str};
use std::{fs, path::Path, str};
use pageserver::page_cache::PAGE_SZ;
use pageserver::repository::{Key, KEY_SIZE};
@@ -99,7 +98,7 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
}
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
async fn get_holes(path: &Utf8Path, max_holes: usize, ctx: &RequestContext) -> Result<Vec<Hole>> {
async fn get_holes(path: &Path, max_holes: usize, ctx: &RequestContext) -> Result<Vec<Hole>> {
let file = FileBlockReader::new(VirtualFile::open(path).await?);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
@@ -168,9 +167,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
parse_filename(&layer.file_name().into_string().unwrap())
{
if layer_file.is_delta {
let layer_path =
Utf8PathBuf::from_path_buf(layer.path()).expect("non-Unicode path");
layer_file.holes = get_holes(&layer_path, max_holes, &ctx).await?;
layer_file.holes = get_holes(&layer.path(), max_holes, &ctx).await?;
n_deltas += 1;
}
layers.push(layer_file);

View File

@@ -1,7 +1,6 @@
use std::path::{Path, PathBuf};
use anyhow::Result;
use camino::Utf8Path;
use clap::Subcommand;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
@@ -48,7 +47,7 @@ pub(crate) enum LayerCmd {
}
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
let path = Utf8Path::from_path(path.as_ref()).expect("non-Unicode path");
let path = path.as_ref();
virtual_file::init(10);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path).await?);

View File

@@ -8,7 +8,6 @@ mod draw_timeline_dir;
mod layer_map_analyzer;
mod layers;
use camino::{Utf8Path, Utf8PathBuf};
use clap::{Parser, Subcommand};
use layers::LayerCmd;
use pageserver::{
@@ -19,6 +18,7 @@ use pageserver::{
virtual_file,
};
use postgres_ffi::ControlFileData;
use std::path::{Path, PathBuf};
use utils::{lsn::Lsn, project_git_version};
project_git_version!(GIT_VERSION);
@@ -49,7 +49,7 @@ enum Commands {
#[derive(Parser)]
struct MetadataCmd {
/// Input metadata file path
metadata_path: Utf8PathBuf,
metadata_path: PathBuf,
/// Replace disk consistent Lsn
disk_consistent_lsn: Option<Lsn>,
/// Replace previous record Lsn
@@ -61,13 +61,13 @@ struct MetadataCmd {
#[derive(Parser)]
struct PrintLayerFileCmd {
/// Pageserver data path
path: Utf8PathBuf,
path: PathBuf,
}
#[derive(Parser)]
struct AnalyzeLayerMapCmd {
/// Pageserver data path
path: Utf8PathBuf,
path: PathBuf,
/// Max holes
max_holes: Option<usize>,
}
@@ -102,7 +102,7 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}
fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
fn read_pg_control_file(control_file_path: &Path) -> anyhow::Result<()> {
let control_file = ControlFileData::decode(&std::fs::read(control_file_path)?)?;
println!("{control_file:?}");
let control_file_initdb = Lsn(control_file.checkPoint);
@@ -114,7 +114,7 @@ fn read_pg_control_file(control_file_path: &Utf8Path) -> anyhow::Result<()> {
Ok(())
}
async fn print_layerfile(path: &Utf8Path) -> anyhow::Result<()> {
async fn print_layerfile(path: &Path) -> anyhow::Result<()> {
// Basic initialization of things that don't change after startup
virtual_file::init(10);
page_cache::init(100);

View File

@@ -2,10 +2,9 @@
use std::env::{var, VarError};
use std::sync::Arc;
use std::{env, ops::ControlFlow, str::FromStr};
use std::{env, ops::ControlFlow, path::Path, str::FromStr};
use anyhow::{anyhow, Context};
use camino::Utf8Path;
use clap::{Arg, ArgAction, Command};
use metrics::launch_timestamp::{set_launch_timestamp_metric, LaunchTimestamp};
@@ -66,17 +65,21 @@ fn main() -> anyhow::Result<()> {
let workdir = arg_matches
.get_one::<String>("workdir")
.map(Utf8Path::new)
.unwrap_or_else(|| Utf8Path::new(".neon"));
.map(Path::new)
.unwrap_or_else(|| Path::new(".neon"));
let workdir = workdir
.canonicalize_utf8()
.with_context(|| format!("Error opening workdir '{workdir}'"))?;
.canonicalize()
.with_context(|| format!("Error opening workdir '{}'", workdir.display()))?;
let cfg_file_path = workdir.join("pageserver.toml");
// Set CWD to workdir for non-daemon modes
env::set_current_dir(&workdir)
.with_context(|| format!("Failed to set application's current dir to '{workdir}'"))?;
env::set_current_dir(&workdir).with_context(|| {
format!(
"Failed to set application's current dir to '{}'",
workdir.display()
)
})?;
let conf = match initialize_config(&cfg_file_path, arg_matches, &workdir)? {
ControlFlow::Continue(conf) => conf,
@@ -112,8 +115,12 @@ fn main() -> anyhow::Result<()> {
let tenants_path = conf.tenants_path();
if !tenants_path.exists() {
utils::crashsafe::create_dir_all(conf.tenants_path())
.with_context(|| format!("Failed to create tenants root dir at '{tenants_path}'"))?;
utils::crashsafe::create_dir_all(conf.tenants_path()).with_context(|| {
format!(
"Failed to create tenants root dir at '{}'",
tenants_path.display()
)
})?;
}
// Initialize up failpoints support
@@ -130,9 +137,9 @@ fn main() -> anyhow::Result<()> {
}
fn initialize_config(
cfg_file_path: &Utf8Path,
cfg_file_path: &Path,
arg_matches: clap::ArgMatches,
workdir: &Utf8Path,
workdir: &Path,
) -> anyhow::Result<ControlFlow<(), &'static PageServerConf>> {
let init = arg_matches.get_flag("init");
let update_config = init || arg_matches.get_flag("update-config");
@@ -140,22 +147,33 @@ fn initialize_config(
let (mut toml, config_file_exists) = if cfg_file_path.is_file() {
if init {
anyhow::bail!(
"Config file '{cfg_file_path}' already exists, cannot init it, use --update-config to update it",
"Config file '{}' already exists, cannot init it, use --update-config to update it",
cfg_file_path.display()
);
}
// Supplement the CLI arguments with the config file
let cfg_file_contents = std::fs::read_to_string(cfg_file_path)
.with_context(|| format!("Failed to read pageserver config at '{cfg_file_path}'"))?;
let cfg_file_contents = std::fs::read_to_string(cfg_file_path).with_context(|| {
format!(
"Failed to read pageserver config at '{}'",
cfg_file_path.display()
)
})?;
(
cfg_file_contents
.parse::<toml_edit::Document>()
.with_context(|| {
format!("Failed to parse '{cfg_file_path}' as pageserver config")
format!(
"Failed to parse '{}' as pageserver config",
cfg_file_path.display()
)
})?,
true,
)
} else if cfg_file_path.exists() {
anyhow::bail!("Config file '{cfg_file_path}' exists but is not a regular file");
anyhow::bail!(
"Config file '{}' exists but is not a regular file",
cfg_file_path.display()
);
} else {
// We're initializing the tenant, so there's no config file yet
(
@@ -174,7 +192,7 @@ fn initialize_config(
for (key, item) in doc.iter() {
if config_file_exists && update_config && key == "id" && toml.contains_key(key) {
anyhow::bail!("Pageserver config file exists at '{cfg_file_path}' and has node id already, it cannot be overridden");
anyhow::bail!("Pageserver config file exists at '{}' and has node id already, it cannot be overridden", cfg_file_path.display());
}
toml.insert(key, item.clone());
}
@@ -186,11 +204,18 @@ fn initialize_config(
.context("Failed to parse pageserver configuration")?;
if update_config {
info!("Writing pageserver config to '{cfg_file_path}'");
info!("Writing pageserver config to '{}'", cfg_file_path.display());
std::fs::write(cfg_file_path, toml.to_string())
.with_context(|| format!("Failed to write pageserver config to '{cfg_file_path}'"))?;
info!("Config successfully written to '{cfg_file_path}'")
std::fs::write(cfg_file_path, toml.to_string()).with_context(|| {
format!(
"Failed to write pageserver config to '{}'",
cfg_file_path.display()
)
})?;
info!(
"Config successfully written to '{}'",
cfg_file_path.display()
)
}
Ok(if init {
@@ -580,6 +605,31 @@ fn start_pageserver(
);
}
task_mgr::spawn(
BACKGROUND_RUNTIME.handle(),
TaskKind::BackgroundRuntimeTurnaroundMeasure,
None,
None,
"background runtime turnaround measure",
true,
async move {
let server = hyper::Server::try_bind(&"0.0.0.0:2342".parse().unwrap()).expect("bind");
let server = server
.serve(hyper::service::make_service_fn(|_| async move {
Ok::<_, std::convert::Infallible>(hyper::service::service_fn(
move |_: hyper::Request<hyper::Body>| async move {
Ok::<_, std::convert::Infallible>(hyper::Response::new(
hyper::Body::from(format!("alive")),
))
},
))
}))
.with_graceful_shutdown(task_mgr::shutdown_watcher());
server.await?;
Ok(())
},
);
let mut shutdown_pageserver = Some(shutdown_pageserver.drop_guard());
// All started up! Now just sit and wait for shutdown signal.

View File

@@ -16,13 +16,13 @@ use utils::logging::SecretString;
use once_cell::sync::OnceCell;
use reqwest::Url;
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use toml_edit;
use toml_edit::{Document, Item};
use camino::{Utf8Path, Utf8PathBuf};
use postgres_backend::AuthType;
use utils::{
id::{NodeId, TenantId, TimelineId},
@@ -153,9 +153,9 @@ pub struct PageServerConf {
// that during unit testing, because the current directory is global
// to the process but different unit tests work on different
// repositories.
pub workdir: Utf8PathBuf,
pub workdir: PathBuf,
pub pg_distrib_dir: Utf8PathBuf,
pub pg_distrib_dir: PathBuf,
// Authentication
/// authentication method for the HTTP mgmt API
@@ -164,7 +164,7 @@ pub struct PageServerConf {
pub pg_auth_type: AuthType,
/// Path to a file containing public key for verifying JWT tokens.
/// Used for both mgmt and compute auth, if enabled.
pub auth_validation_public_key_path: Option<Utf8PathBuf>,
pub auth_validation_public_key_path: Option<PathBuf>,
pub remote_storage_config: Option<RemoteStorageConfig>,
@@ -253,15 +253,15 @@ struct PageServerConfigBuilder {
page_cache_size: BuilderValue<usize>,
max_file_descriptors: BuilderValue<usize>,
workdir: BuilderValue<Utf8PathBuf>,
workdir: BuilderValue<PathBuf>,
pg_distrib_dir: BuilderValue<Utf8PathBuf>,
pg_distrib_dir: BuilderValue<PathBuf>,
http_auth_type: BuilderValue<AuthType>,
pg_auth_type: BuilderValue<AuthType>,
//
auth_validation_public_key_path: BuilderValue<Option<Utf8PathBuf>>,
auth_validation_public_key_path: BuilderValue<Option<PathBuf>>,
remote_storage_config: BuilderValue<Option<RemoteStorageConfig>>,
id: BuilderValue<NodeId>,
@@ -305,12 +305,10 @@ impl Default for PageServerConfigBuilder {
superuser: Set(DEFAULT_SUPERUSER.to_string()),
page_cache_size: Set(DEFAULT_PAGE_CACHE_SIZE),
max_file_descriptors: Set(DEFAULT_MAX_FILE_DESCRIPTORS),
workdir: Set(Utf8PathBuf::new()),
pg_distrib_dir: Set(Utf8PathBuf::from_path_buf(
env::current_dir().expect("cannot access current directory"),
)
.expect("non-Unicode path")
.join("pg_install")),
workdir: Set(PathBuf::new()),
pg_distrib_dir: Set(env::current_dir()
.expect("cannot access current directory")
.join("pg_install")),
http_auth_type: Set(AuthType::Trust),
pg_auth_type: Set(AuthType::Trust),
auth_validation_public_key_path: Set(None),
@@ -392,11 +390,11 @@ impl PageServerConfigBuilder {
self.max_file_descriptors = BuilderValue::Set(max_file_descriptors)
}
pub fn workdir(&mut self, workdir: Utf8PathBuf) {
pub fn workdir(&mut self, workdir: PathBuf) {
self.workdir = BuilderValue::Set(workdir)
}
pub fn pg_distrib_dir(&mut self, pg_distrib_dir: Utf8PathBuf) {
pub fn pg_distrib_dir(&mut self, pg_distrib_dir: PathBuf) {
self.pg_distrib_dir = BuilderValue::Set(pg_distrib_dir)
}
@@ -410,7 +408,7 @@ impl PageServerConfigBuilder {
pub fn auth_validation_public_key_path(
&mut self,
auth_validation_public_key_path: Option<Utf8PathBuf>,
auth_validation_public_key_path: Option<PathBuf>,
) {
self.auth_validation_public_key_path = BuilderValue::Set(auth_validation_public_key_path)
}
@@ -487,10 +485,6 @@ impl PageServerConfigBuilder {
self.control_plane_api = BuilderValue::Set(api)
}
pub fn control_plane_api_token(&mut self, token: Option<SecretString>) {
self.control_plane_api_token = BuilderValue::Set(token)
}
pub fn build(self) -> anyhow::Result<PageServerConf> {
let concurrent_tenant_size_logical_size_queries = self
.concurrent_tenant_size_logical_size_queries
@@ -591,15 +585,15 @@ impl PageServerConf {
// Repository paths, relative to workdir.
//
pub fn tenants_path(&self) -> Utf8PathBuf {
pub fn tenants_path(&self) -> PathBuf {
self.workdir.join(TENANTS_SEGMENT_NAME)
}
pub fn deletion_prefix(&self) -> Utf8PathBuf {
pub fn deletion_prefix(&self) -> PathBuf {
self.workdir.join("deletion")
}
pub fn deletion_list_path(&self, sequence: u64) -> Utf8PathBuf {
pub fn deletion_list_path(&self, sequence: u64) -> PathBuf {
// Encode a version in the filename, so that if we ever switch away from JSON we can
// increment this.
const VERSION: u8 = 1;
@@ -608,7 +602,7 @@ impl PageServerConf {
.join(format!("{sequence:016x}-{VERSION:02x}.list"))
}
pub fn deletion_header_path(&self) -> Utf8PathBuf {
pub fn deletion_header_path(&self) -> PathBuf {
// Encode a version in the filename, so that if we ever switch away from JSON we can
// increment this.
const VERSION: u8 = 1;
@@ -616,30 +610,30 @@ impl PageServerConf {
self.deletion_prefix().join(format!("header-{VERSION:02x}"))
}
pub fn tenant_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
pub fn tenant_path(&self, tenant_id: &TenantId) -> PathBuf {
self.tenants_path().join(tenant_id.to_string())
}
pub fn tenant_attaching_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
pub fn tenant_attaching_mark_file_path(&self, tenant_id: &TenantId) -> PathBuf {
self.tenant_path(tenant_id)
.join(TENANT_ATTACHING_MARKER_FILENAME)
}
pub fn tenant_ignore_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
pub fn tenant_ignore_mark_file_path(&self, tenant_id: &TenantId) -> PathBuf {
self.tenant_path(tenant_id).join(IGNORED_TENANT_FILE_NAME)
}
/// Points to a place in pageserver's local directory,
/// where certain tenant's tenantconf file should be located.
pub fn tenant_config_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
pub fn tenant_config_path(&self, tenant_id: &TenantId) -> PathBuf {
self.tenant_path(tenant_id).join(TENANT_CONFIG_NAME)
}
pub fn timelines_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
pub fn timelines_path(&self, tenant_id: &TenantId) -> PathBuf {
self.tenant_path(tenant_id).join(TIMELINES_SEGMENT_NAME)
}
pub fn timeline_path(&self, tenant_id: &TenantId, timeline_id: &TimelineId) -> Utf8PathBuf {
pub fn timeline_path(&self, tenant_id: &TenantId, timeline_id: &TimelineId) -> PathBuf {
self.timelines_path(tenant_id).join(timeline_id.to_string())
}
@@ -647,7 +641,7 @@ impl PageServerConf {
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Utf8PathBuf {
) -> PathBuf {
path_with_suffix_extension(
self.timeline_path(&tenant_id, &timeline_id),
TIMELINE_UNINIT_MARK_SUFFIX,
@@ -658,19 +652,19 @@ impl PageServerConf {
&self,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Utf8PathBuf {
) -> PathBuf {
path_with_suffix_extension(
self.timeline_path(&tenant_id, &timeline_id),
TIMELINE_DELETE_MARK_SUFFIX,
)
}
pub fn tenant_deleted_mark_file_path(&self, tenant_id: &TenantId) -> Utf8PathBuf {
pub fn tenant_deleted_mark_file_path(&self, tenant_id: &TenantId) -> PathBuf {
self.tenant_path(tenant_id)
.join(TENANT_DELETED_MARKER_FILE_NAME)
}
pub fn traces_path(&self) -> Utf8PathBuf {
pub fn traces_path(&self) -> PathBuf {
self.workdir.join("traces")
}
@@ -679,7 +673,7 @@ impl PageServerConf {
tenant_id: &TenantId,
timeline_id: &TimelineId,
connection_id: &ConnectionId,
) -> Utf8PathBuf {
) -> PathBuf {
self.traces_path()
.join(tenant_id.to_string())
.join(timeline_id.to_string())
@@ -688,20 +682,20 @@ impl PageServerConf {
/// Points to a place in pageserver's local directory,
/// where certain timeline's metadata file should be located.
pub fn metadata_path(&self, tenant_id: &TenantId, timeline_id: &TimelineId) -> Utf8PathBuf {
pub fn metadata_path(&self, tenant_id: &TenantId, timeline_id: &TimelineId) -> PathBuf {
self.timeline_path(tenant_id, timeline_id)
.join(METADATA_FILE_NAME)
}
/// Turns storage remote path of a file into its local path.
pub fn local_path(&self, remote_path: &RemotePath) -> Utf8PathBuf {
pub fn local_path(&self, remote_path: &RemotePath) -> PathBuf {
remote_path.with_base(&self.workdir)
}
//
// Postgres distribution paths
//
pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
pub fn pg_distrib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
let path = self.pg_distrib_dir.clone();
#[allow(clippy::manual_range_patterns)]
@@ -711,10 +705,10 @@ impl PageServerConf {
}
}
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
pub fn pg_bin_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
Ok(self.pg_distrib_dir(pg_version)?.join("bin"))
}
pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<Utf8PathBuf> {
pub fn pg_lib_dir(&self, pg_version: u32) -> anyhow::Result<PathBuf> {
Ok(self.pg_distrib_dir(pg_version)?.join("lib"))
}
@@ -722,7 +716,7 @@ impl PageServerConf {
/// validating the input and failing on errors.
///
/// This leaves any options not present in the file in the built-in defaults.
pub fn parse_and_validate(toml: &Document, workdir: &Utf8Path) -> anyhow::Result<Self> {
pub fn parse_and_validate(toml: &Document, workdir: &Path) -> anyhow::Result<Self> {
let mut builder = PageServerConfigBuilder::default();
builder.workdir(workdir.to_owned());
@@ -741,10 +735,10 @@ impl PageServerConf {
builder.max_file_descriptors(parse_toml_u64(key, item)? as usize)
}
"pg_distrib_dir" => {
builder.pg_distrib_dir(Utf8PathBuf::from(parse_toml_string(key, item)?))
builder.pg_distrib_dir(PathBuf::from(parse_toml_string(key, item)?))
}
"auth_validation_public_key_path" => builder.auth_validation_public_key_path(Some(
Utf8PathBuf::from(parse_toml_string(key, item)?),
PathBuf::from(parse_toml_string(key, item)?),
)),
"http_auth_type" => builder.http_auth_type(parse_toml_from_str(key, item)?),
"pg_auth_type" => builder.pg_auth_type(parse_toml_from_str(key, item)?),
@@ -791,14 +785,6 @@ impl PageServerConf {
builder.control_plane_api(Some(parsed.parse().context("failed to parse control plane URL")?))
}
},
"control_plane_api_token" => {
let parsed = parse_toml_string(key, item)?;
if parsed.is_empty() {
builder.control_plane_api_token(None)
} else {
builder.control_plane_api_token(Some(parsed.into()))
}
},
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
@@ -812,7 +798,8 @@ impl PageServerConf {
ensure!(
auth_validation_public_key_path.exists(),
format!(
"Can't find auth_validation_public_key at '{auth_validation_public_key_path}'",
"Can't find auth_validation_public_key at '{}'",
auth_validation_public_key_path.display()
)
);
}
@@ -928,12 +915,12 @@ impl PageServerConf {
}
#[cfg(test)]
pub fn test_repo_dir(test_name: &str) -> Utf8PathBuf {
Utf8PathBuf::from(format!("../tmp_check/test_{test_name}"))
pub fn test_repo_dir(test_name: &str) -> PathBuf {
PathBuf::from(format!("../tmp_check/test_{test_name}"))
}
pub fn dummy_conf(repo_dir: Utf8PathBuf) -> Self {
let pg_distrib_dir = Utf8PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
pub fn dummy_conf(repo_dir: PathBuf) -> Self {
let pg_distrib_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../pg_install");
PageServerConf {
id: NodeId(0),
@@ -1100,8 +1087,8 @@ mod tests {
num::{NonZeroU32, NonZeroUsize},
};
use camino_tempfile::{tempdir, Utf8TempDir};
use remote_storage::{RemoteStorageKind, S3Config};
use tempfile::{tempdir, TempDir};
use utils::serde_percent::Percent;
use super::*;
@@ -1140,7 +1127,8 @@ background_task_maximum_delay = '334 s'
let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
// we have to create dummy values to overcome the validation errors
let config_string = format!(
"pg_distrib_dir='{pg_distrib_dir}'\nid=10\nbroker_endpoint = '{broker_endpoint}'",
"pg_distrib_dir='{}'\nid=10\nbroker_endpoint = '{broker_endpoint}'",
pg_distrib_dir.display()
);
let toml = config_string.parse()?;
@@ -1206,7 +1194,8 @@ background_task_maximum_delay = '334 s'
let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
let config_string = format!(
"{ALL_BASE_VALUES_TOML}pg_distrib_dir='{pg_distrib_dir}'\nbroker_endpoint = '{broker_endpoint}'",
"{ALL_BASE_VALUES_TOML}pg_distrib_dir='{}'\nbroker_endpoint = '{broker_endpoint}'",
pg_distrib_dir.display()
);
let toml = config_string.parse()?;
@@ -1266,18 +1255,23 @@ background_task_maximum_delay = '334 s'
let identical_toml_declarations = &[
format!(
r#"[remote_storage]
local_path = '{local_storage_path}'"#,
local_path = '{}'"#,
local_storage_path.display()
),
format!(
"remote_storage={{local_path='{}'}}",
local_storage_path.display()
),
format!("remote_storage={{local_path='{local_storage_path}'}}"),
];
for remote_storage_config_str in identical_toml_declarations {
let config_string = format!(
r#"{ALL_BASE_VALUES_TOML}
pg_distrib_dir='{pg_distrib_dir}'
pg_distrib_dir='{}'
broker_endpoint = '{broker_endpoint}'
{remote_storage_config_str}"#,
pg_distrib_dir.display(),
);
let toml = config_string.parse()?;
@@ -1340,10 +1334,11 @@ concurrency_limit = {s3_concurrency_limit}"#
for remote_storage_config_str in identical_toml_declarations {
let config_string = format!(
r#"{ALL_BASE_VALUES_TOML}
pg_distrib_dir='{pg_distrib_dir}'
pg_distrib_dir='{}'
broker_endpoint = '{broker_endpoint}'
{remote_storage_config_str}"#,
pg_distrib_dir.display(),
);
let toml = config_string.parse()?;
@@ -1385,11 +1380,12 @@ broker_endpoint = '{broker_endpoint}'
let config_string = format!(
r#"{ALL_BASE_VALUES_TOML}
pg_distrib_dir='{pg_distrib_dir}'
pg_distrib_dir='{}'
broker_endpoint = '{broker_endpoint}'
[tenant_config]
trace_read_requests = {trace_read_requests}"#,
pg_distrib_dir.display(),
);
let toml = config_string.parse()?;
@@ -1409,7 +1405,7 @@ trace_read_requests = {trace_read_requests}"#,
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
let pageserver_conf_toml = format!(
r#"pg_distrib_dir = "{pg_distrib_dir}"
r#"pg_distrib_dir = "{}"
metric_collection_endpoint = "http://sample.url"
metric_collection_interval = "10min"
id = 222
@@ -1427,6 +1423,7 @@ kind = "LayerAccessThreshold"
period = "20m"
threshold = "20m"
"#,
pg_distrib_dir.display(),
);
let toml: Document = pageserver_conf_toml.parse()?;
let conf = PageServerConf::parse_and_validate(&toml, &workdir)?;
@@ -1467,7 +1464,7 @@ threshold = "20m"
Ok(())
}
fn prepare_fs(tempdir: &Utf8TempDir) -> anyhow::Result<(Utf8PathBuf, Utf8PathBuf)> {
fn prepare_fs(tempdir: &TempDir) -> anyhow::Result<(PathBuf, PathBuf)> {
let tempdir_path = tempdir.path();
let workdir = tempdir_path.join("workdir");

View File

@@ -3,11 +3,11 @@
use crate::context::{DownloadBehavior, RequestContext};
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
use crate::tenant::{mgr, LogicalSizeCalculationCause};
use camino::Utf8PathBuf;
use consumption_metrics::EventType;
use pageserver_api::models::TenantState;
use reqwest::Url;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tracing::*;
@@ -41,7 +41,7 @@ pub async fn collect_metrics(
_cached_metric_collection_interval: Duration,
synthetic_size_calculation_interval: Duration,
node_id: NodeId,
local_disk_storage: Utf8PathBuf,
local_disk_storage: PathBuf,
ctx: RequestContext,
) -> anyhow::Result<()> {
if _cached_metric_collection_interval != Duration::ZERO {
@@ -68,7 +68,7 @@ pub async fn collect_metrics(
},
);
let path: Arc<Utf8PathBuf> = Arc::new(local_disk_storage);
let path: Arc<PathBuf> = Arc::new(local_disk_storage);
let cancel = task_mgr::shutdown_token();
@@ -153,7 +153,7 @@ pub async fn collect_metrics(
///
/// Cancellation safe.
async fn restore_and_reschedule(
path: &Arc<Utf8PathBuf>,
path: &Arc<PathBuf>,
metric_collection_interval: Duration,
) -> Cache {
let (cached, earlier_metric_at) = match disk_cache::read_metrics_from_disk(path.clone()).await {

View File

@@ -1,12 +1,10 @@
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use std::path::PathBuf;
use std::sync::Arc;
use super::RawMetric;
pub(super) async fn read_metrics_from_disk(
path: Arc<Utf8PathBuf>,
) -> anyhow::Result<Vec<RawMetric>> {
pub(super) async fn read_metrics_from_disk(path: Arc<PathBuf>) -> anyhow::Result<Vec<RawMetric>> {
// do not add context to each error, callsite will log with full path
let span = tracing::Span::current();
tokio::task::spawn_blocking(move || {
@@ -27,10 +25,10 @@ pub(super) async fn read_metrics_from_disk(
.and_then(|x| x)
}
fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
fn scan_and_delete_with_same_prefix(path: &std::path::Path) -> std::io::Result<()> {
let it = std::fs::read_dir(path.parent().expect("caller checked"))?;
let prefix = path.file_name().expect("caller checked").to_string();
let prefix = path.file_name().expect("caller checked").to_string_lossy();
for entry in it {
let entry = entry?;
@@ -64,7 +62,7 @@ fn scan_and_delete_with_same_prefix(path: &Utf8Path) -> std::io::Result<()> {
pub(super) async fn flush_metrics_to_disk(
current_metrics: &Arc<Vec<RawMetric>>,
path: &Arc<Utf8PathBuf>,
path: &Arc<PathBuf>,
) -> anyhow::Result<()> {
use std::io::Write;
@@ -83,7 +81,7 @@ pub(super) async fn flush_metrics_to_disk(
let parent = path.parent().expect("existence checked");
let file_name = path.file_name().expect("existence checked");
let mut tempfile = camino_tempfile::Builder::new()
let mut tempfile = tempfile::Builder::new()
.prefix(file_name)
.suffix(".tmp")
.tempfile_in(parent)?;

View File

@@ -3,6 +3,7 @@ mod list_writer;
mod validator;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@@ -12,7 +13,6 @@ use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::remote_timeline_client::remote_timeline_path;
use crate::virtual_file::VirtualFile;
use anyhow::Context;
use camino::Utf8PathBuf;
use hex::FromHex;
use remote_storage::{GenericRemoteStorage, RemotePath};
use serde::Deserialize;
@@ -336,6 +336,7 @@ impl DeletionList {
timeline_entry.extend(objects.drain(..).map(|p| {
p.strip_prefix(&timeline_remote_path)
.expect("Timeline paths always start with the timeline prefix")
.to_string_lossy()
.to_string()
}));
true
@@ -349,7 +350,7 @@ impl DeletionList {
result.extend(
timeline_layers
.into_iter()
.map(|l| timeline_remote_path.join(&Utf8PathBuf::from(l))),
.map(|l| timeline_remote_path.join(&PathBuf::from(l))),
);
}
}
@@ -726,9 +727,12 @@ impl DeletionQueue {
#[cfg(test)]
mod test {
use camino::Utf8Path;
use hex_literal::hex;
use std::{io::ErrorKind, time::Duration};
use std::{
io::ErrorKind,
path::{Path, PathBuf},
time::Duration,
};
use tracing::info;
use remote_storage::{RemoteStorageConfig, RemoteStorageKind};
@@ -760,7 +764,7 @@ mod test {
struct TestSetup {
harness: TenantHarness,
remote_fs_dir: Utf8PathBuf,
remote_fs_dir: PathBuf,
storage: GenericRemoteStorage,
mock_control_plane: MockControlPlane,
deletion_queue: DeletionQueue,
@@ -869,7 +873,7 @@ mod test {
// Set up a GenericRemoteStorage targetting a directory
let remote_fs_dir = harness.conf.workdir.join("remote_fs");
std::fs::create_dir_all(remote_fs_dir)?;
let remote_fs_dir = harness.conf.workdir.join("remote_fs").canonicalize_utf8()?;
let remote_fs_dir = std::fs::canonicalize(harness.conf.workdir.join("remote_fs"))?;
let storage_config = RemoteStorageConfig {
max_concurrent_syncs: std::num::NonZeroUsize::new(
remote_storage::DEFAULT_REMOTE_STORAGE_MAX_CONCURRENT_SYNCS,
@@ -905,7 +909,7 @@ mod test {
}
// TODO: put this in a common location so that we can share with remote_timeline_client's tests
fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path) {
fn assert_remote_files(expected: &[&str], remote_path: &Path) {
let mut expected: Vec<String> = expected.iter().map(|x| String::from(*x)).collect();
expected.sort();
@@ -922,7 +926,10 @@ mod test {
unreachable!();
}
} else {
panic!("Unexpected error listing {remote_path}: {e}");
panic!(
"Unexpected error listing {}: {e}",
remote_path.to_string_lossy()
);
}
}
};
@@ -937,7 +944,7 @@ mod test {
assert_eq!(expected, found);
}
fn assert_local_files(expected: &[&str], directory: &Utf8Path) {
fn assert_local_files(expected: &[&str], directory: &Path) {
let dir = match std::fs::read_dir(directory) {
Ok(d) => d,
Err(_) => {

View File

@@ -34,8 +34,6 @@ use crate::deletion_queue::TEMP_SUFFIX;
use crate::metrics;
use crate::tenant::remote_timeline_client::remote_layer_path;
use crate::tenant::storage_layer::LayerFileName;
use crate::virtual_file;
use crate::virtual_file::on_fatal_io_error;
// The number of keys in a DeletionList before we will proactively persist it
// (without reaching a flush deadline). This aims to deliver objects of the order
@@ -182,7 +180,8 @@ impl ListWriter {
Ok(h) => Ok(Some(h.validated_sequence)),
Err(e) => {
warn!(
"Failed to deserialize deletion header, ignoring {header_path}: {e:#}",
"Failed to deserialize deletion header, ignoring {}: {e:#}",
header_path.display()
);
// This should never happen unless we make a mistake with our serialization.
// Ignoring a deletion header is not consequential for correctnes because all deletions
@@ -194,10 +193,13 @@ impl ListWriter {
}
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
debug!("Deletion header {header_path} not found, first start?");
debug!(
"Deletion header {} not found, first start?",
header_path.display()
);
Ok(None)
} else {
on_fatal_io_error(&e);
Err(anyhow::anyhow!(e))
}
}
}
@@ -221,11 +223,14 @@ impl ListWriter {
let mut dir = match tokio::fs::read_dir(&deletion_directory).await {
Ok(d) => d,
Err(e) => {
warn!("Failed to open deletion list directory {deletion_directory}: {e:#}");
warn!(
"Failed to open deletion list directory {}: {e:#}",
deletion_directory.display(),
);
// This is fatal: any failure to read this local directory indicates a
// storage problem or configuration problem of the node.
virtual_file::on_fatal_io_error(&e);
// Give up: if we can't read the deletion list directory, we probably can't
// write lists into it later, so the queue won't work.
return Err(e.into());
}
};
@@ -238,21 +243,21 @@ impl ListWriter {
let file_name = dentry.file_name();
let dentry_str = file_name.to_string_lossy();
if file_name == header_path.file_name().unwrap_or("") {
if Some(file_name.as_os_str()) == header_path.file_name() {
// Don't try and parse the header's name like a list
continue;
}
if dentry_str.ends_with(TEMP_SUFFIX) {
info!("Cleaning up temporary file {dentry_str}");
let absolute_path =
deletion_directory.join(dentry.file_name().to_str().expect("non-Unicode path"));
let absolute_path = deletion_directory.join(dentry.file_name());
if let Err(e) = tokio::fs::remove_file(&absolute_path).await {
// Non-fatal error: we will just leave the file behind but not
// try and load it.
warn!("Failed to clean up temporary file {absolute_path}: {e:#}");
virtual_file::on_fatal_io_error(&e);
warn!(
"Failed to clean up temporary file {}: {e:#}",
absolute_path.display()
);
}
continue;
@@ -265,7 +270,7 @@ impl ListWriter {
.expect("Non optional group should be present")
.as_str()
} else {
warn!("Unexpected filename in deletion queue: {basename}");
warn!("Unexpected key in deletion queue: {basename}");
metrics::DELETION_QUEUE.unexpected_errors.inc();
continue;
};
@@ -293,12 +298,7 @@ impl ListWriter {
for s in seqs {
let list_path = self.conf.deletion_list_path(s);
let list_bytes = match tokio::fs::read(&list_path).await {
Ok(b) => b,
Err(e) => {
virtual_file::on_fatal_io_error(&e);
}
};
let list_bytes = tokio::fs::read(&list_path).await?;
let mut deletion_list = match serde_json::from_slice::<DeletionList>(&list_bytes) {
Ok(l) => l,
@@ -360,7 +360,7 @@ impl ListWriter {
if let Err(e) = create_dir_all(&self.conf.deletion_prefix()) {
tracing::error!(
"Failed to create deletion list directory {}, deletions will not be executed ({e})",
self.conf.deletion_prefix(),
self.conf.deletion_prefix().display()
);
metrics::DELETION_QUEUE.unexpected_errors.inc();
return;

View File

@@ -15,10 +15,10 @@
//! Deletions are passed onward to the Deleter.
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use camino::Utf8PathBuf;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::info;
@@ -28,7 +28,6 @@ use crate::config::PageServerConf;
use crate::control_plane_client::ControlPlaneGenerationsApi;
use crate::control_plane_client::RetryForeverError;
use crate::metrics;
use crate::virtual_file::on_fatal_io_error;
use super::deleter::DeleterMessage;
use super::DeletionHeader;
@@ -117,11 +116,6 @@ where
/// Valid LSN updates propagate back to Timelines immediately, valid DeletionLists
/// go into the queue of ready-to-execute lists.
async fn validate(&mut self) -> Result<(), DeletionQueueError> {
// Figure out for each tenant which generation number to validate.
//
// It is sufficient to validate the max generation number of each tenant because only the
// highest generation number can possibly be valid. Hence this map will collect the
// highest generation pending validation for each tenant.
let mut tenant_generations = HashMap::new();
for list in &self.pending_lists {
for (tenant_id, tenant_list) in &list.tenants {
@@ -252,11 +246,6 @@ where
}
}
// Assert monotonicity of the list sequence numbers we are processing
if let Some(validated) = validated_sequence {
assert!(list.sequence >= validated)
}
validated_sequence = Some(list.sequence);
}
@@ -293,19 +282,18 @@ where
Ok(())
}
async fn cleanup_lists(&mut self, list_paths: Vec<Utf8PathBuf>) {
async fn cleanup_lists(&mut self, list_paths: Vec<PathBuf>) {
for list_path in list_paths {
debug!("Removing deletion list {list_path}");
debug!("Removing deletion list {}", list_path.display());
if let Err(e) = tokio::fs::remove_file(&list_path).await {
// Unexpected: we should have permissions and nothing else should
// be touching these files. We will leave the file behind. Subsequent
// pageservers will try and load it again: hopefully whatever storage
// issue (probably permissions) has been fixed by then.
tracing::error!("Failed to delete {list_path}: {e:#}");
tracing::error!("Failed to delete {}: {e:#}", list_path.display());
metrics::DELETION_QUEUE.unexpected_errors.inc();
on_fatal_io_error(&e);
break;
}
}
}

View File

@@ -43,12 +43,12 @@
use std::{
collections::HashMap,
path::Path,
sync::Arc,
time::{Duration, SystemTime},
};
use anyhow::Context;
use camino::Utf8Path;
use remote_storage::GenericRemoteStorage;
use serde::{Deserialize, Serialize};
use tokio::time::Instant;
@@ -122,7 +122,7 @@ async fn disk_usage_eviction_task(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: GenericRemoteStorage,
tenants_dir: &Utf8Path,
tenants_dir: &Path,
cancel: CancellationToken,
) {
scopeguard::defer! {
@@ -184,7 +184,7 @@ async fn disk_usage_eviction_task_iteration(
state: &State,
task_config: &DiskUsageEvictionTaskConfig,
storage: &GenericRemoteStorage,
tenants_dir: &Utf8Path,
tenants_dir: &Path,
cancel: &CancellationToken,
) -> anyhow::Result<()> {
let usage_pre = filesystem_level_usage::get(tenants_dir, task_config)
@@ -620,8 +620,9 @@ impl std::ops::Deref for TimelineKey {
}
mod filesystem_level_usage {
use std::path::Path;
use anyhow::Context;
use camino::Utf8Path;
use crate::statvfs::Statvfs;
@@ -663,7 +664,7 @@ mod filesystem_level_usage {
}
pub fn get<'a>(
tenants_dir: &Utf8Path,
tenants_dir: &Path,
config: &'a DiskUsageEvictionTaskConfig,
) -> anyhow::Result<Usage<'a>> {
let mock_config = {

View File

@@ -132,7 +132,7 @@ impl From<PageReconstructError> for ApiError {
ApiError::InternalServerError(anyhow::anyhow!("request was cancelled"))
}
PageReconstructError::AncestorStopping(_) => {
ApiError::ResourceUnavailable(format!("{pre}"))
ApiError::InternalServerError(anyhow::Error::new(pre))
}
PageReconstructError::WalRedo(pre) => {
ApiError::InternalServerError(anyhow::Error::new(pre))
@@ -145,7 +145,7 @@ impl From<TenantMapInsertError> for ApiError {
fn from(tmie: TenantMapInsertError) -> ApiError {
match tmie {
TenantMapInsertError::StillInitializing | TenantMapInsertError::ShuttingDown => {
ApiError::ResourceUnavailable(format!("{tmie}"))
ApiError::InternalServerError(anyhow::Error::new(tmie))
}
TenantMapInsertError::TenantAlreadyExists(id, state) => {
ApiError::Conflict(format!("tenant {id} already exists, state: {state:?}"))
@@ -159,12 +159,6 @@ impl From<TenantStateError> for ApiError {
fn from(tse: TenantStateError) -> ApiError {
match tse {
TenantStateError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
TenantStateError::NotActive(_) => {
ApiError::ResourceUnavailable("Tenant not yet active".into())
}
TenantStateError::IsStopping(_) => {
ApiError::ResourceUnavailable("Tenant is stopping".into())
}
_ => ApiError::InternalServerError(anyhow::Error::new(tse)),
}
}
@@ -174,17 +168,14 @@ impl From<GetTenantError> for ApiError {
fn from(tse: GetTenantError) -> ApiError {
match tse {
GetTenantError::NotFound(tid) => ApiError::NotFound(anyhow!("tenant {}", tid).into()),
GetTenantError::Broken(reason) => {
ApiError::InternalServerError(anyhow!("tenant is broken: {}", reason))
}
GetTenantError::NotActive(_) => {
e @ GetTenantError::NotActive(_) => {
// Why is this not `ApiError::NotFound`?
// Because we must be careful to never return 404 for a tenant if it does
// in fact exist locally. If we did, the caller could draw the conclusion
// that it can attach the tenant to another PS and we'd be in split-brain.
//
// (We can produce this variant only in `mgr::get_tenant(..., active=true)` calls).
ApiError::ResourceUnavailable("Tenant not yet active".into())
ApiError::InternalServerError(anyhow::Error::new(e))
}
}
}
@@ -631,9 +622,8 @@ async fn tenant_list_handler(
let response_data = mgr::list_tenants()
.instrument(info_span!("tenant_list"))
.await
.map_err(|_| {
ApiError::ResourceUnavailable("Tenant map is initializing or shutting down".to_string())
})?
.map_err(anyhow::Error::new)
.map_err(ApiError::InternalServerError)?
.iter()
.map(|(id, state)| TenantInfo {
id: *id,

View File

@@ -6,7 +6,6 @@ use std::path::{Path, PathBuf};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use camino::Utf8Path;
use futures::StreamExt;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio_tar::Archive;
@@ -30,7 +29,7 @@ use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
use utils::lsn::Lsn;
// Returns checkpoint LSN from controlfile
pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
pub fn get_lsn_from_controlfile(path: &Path) -> Result<Lsn> {
// Read control file to extract the LSN
let controlfile_path = path.join("global").join("pg_control");
let controlfile = ControlFileData::decode(&std::fs::read(controlfile_path)?)?;
@@ -47,7 +46,7 @@ pub fn get_lsn_from_controlfile(path: &Utf8Path) -> Result<Lsn> {
/// cluster was not shut down cleanly.
pub async fn import_timeline_from_postgres_datadir(
tline: &Timeline,
pgdata_path: &Utf8Path,
pgdata_path: &Path,
pgdata_lsn: Lsn,
ctx: &RequestContext,
) -> Result<()> {
@@ -257,7 +256,7 @@ async fn import_slru(
/// Scan PostgreSQL WAL files in given directory and load all records between
/// 'startpoint' and 'endpoint' into the repository.
async fn import_wal(
walpath: &Utf8Path,
walpath: &Path,
tline: &Timeline,
startpoint: Lsn,
endpoint: Lsn,

View File

@@ -25,8 +25,9 @@ pub mod walredo;
pub mod failpoint_support;
use std::path::Path;
use crate::task_mgr::TaskKind;
use camino::Utf8Path;
use deletion_queue::DeletionQueue;
use tracing::info;
@@ -131,25 +132,25 @@ pub const TIMELINE_DELETE_MARK_SUFFIX: &str = "___delete";
/// Full path: `tenants/<tenant_id>/___ignored_tenant`.
pub const IGNORED_TENANT_FILE_NAME: &str = "___ignored_tenant";
pub fn is_temporary(path: &Utf8Path) -> bool {
pub fn is_temporary(path: &Path) -> bool {
match path.file_name() {
Some(name) => name.ends_with(TEMP_FILE_SUFFIX),
Some(name) => name.to_string_lossy().ends_with(TEMP_FILE_SUFFIX),
None => false,
}
}
fn ends_with_suffix(path: &Utf8Path, suffix: &str) -> bool {
fn ends_with_suffix(path: &Path, suffix: &str) -> bool {
match path.file_name() {
Some(name) => name.ends_with(suffix),
Some(name) => name.to_string_lossy().ends_with(suffix),
None => false,
}
}
pub fn is_uninit_mark(path: &Utf8Path) -> bool {
pub fn is_uninit_mark(path: &Path) -> bool {
ends_with_suffix(path, TIMELINE_UNINIT_MARK_SUFFIX)
}
pub fn is_delete_mark(path: &Utf8Path) -> bool {
pub fn is_delete_mark(path: &Path) -> bool {
ends_with_suffix(path, TIMELINE_DELETE_MARK_SUFFIX)
}

View File

@@ -94,35 +94,15 @@ pub(crate) static READ_NUM_FS_LAYERS: Lazy<Histogram> = Lazy::new(|| {
});
// Metrics collected on operations on the storage repository.
pub(crate) struct ReconstructTimeMetrics {
ok: Histogram,
err: Histogram,
}
pub(crate) static RECONSTRUCT_TIME: Lazy<ReconstructTimeMetrics> = Lazy::new(|| {
let inner = register_histogram_vec!(
pub(crate) static RECONSTRUCT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_getpage_reconstruct_seconds",
"Time spent in reconstruct_value (reconstruct a page from deltas)",
&["result"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric");
ReconstructTimeMetrics {
ok: inner.get_metric_with_label_values(&["ok"]).unwrap(),
err: inner.get_metric_with_label_values(&["err"]).unwrap(),
}
.expect("failed to define a metric")
});
impl ReconstructTimeMetrics {
pub(crate) fn for_result<T, E>(&self, result: &Result<T, E>) -> &Histogram {
match result {
Ok(_) => &self.ok,
Err(_) => &self.err,
}
}
}
pub(crate) static MATERIALIZED_PAGE_CACHE_HIT_DIRECT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_materialized_cache_hits_direct_total",
@@ -1876,6 +1856,7 @@ pub fn preinitialize_metrics() {
// histograms
[
&READ_NUM_FS_LAYERS,
&RECONSTRUCT_TIME,
&WAIT_LSN_TIME,
&WAL_REDO_TIME,
&WAL_REDO_WAIT_TIME,
@@ -1886,7 +1867,4 @@ pub fn preinitialize_metrics() {
.for_each(|h| {
Lazy::force(h);
});
// Custom
Lazy::force(&RECONSTRUCT_TIME);
}

View File

@@ -79,6 +79,7 @@ use std::{
atomic::{AtomicU64, AtomicU8, AtomicUsize, Ordering},
Arc, Weak,
},
task::Poll,
time::Duration,
};
@@ -252,6 +253,11 @@ pub struct PageCache {
/// This is interpreted modulo the page cache size.
next_evict_slot: AtomicUsize,
find_victim_sender:
async_channel::Sender<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
find_victim_waiters:
async_channel::Receiver<(usize, tokio::sync::RwLockWriteGuard<'static, SlotInner>)>,
size_metrics: &'static PageCacheSizeMetrics,
}
@@ -291,18 +297,23 @@ impl AsRef<[u8; PAGE_SZ]> for PageReadGuard<'_> {
/// to initialize.
///
pub struct PageWriteGuard<'i> {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
state: PageWriteGuardState<'i>,
}
_permit: PinnedSlotsPermit,
// Are the page contents currently valid?
// Used to mark pages as invalid that are assigned but not yet filled with data.
valid: bool,
enum PageWriteGuardState<'i> {
Invalid {
inner: tokio::sync::RwLockWriteGuard<'i, SlotInner>,
_permit: PinnedSlotsPermit,
},
Downgraded,
}
impl std::ops::DerefMut for PageWriteGuard<'_> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.buf
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
@@ -310,25 +321,37 @@ impl std::ops::Deref for PageWriteGuard<'_> {
type Target = [u8; PAGE_SZ];
fn deref(&self) -> &Self::Target {
self.inner.buf
match &self.state {
PageWriteGuardState::Invalid { inner, _permit } => &inner.buf,
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
impl AsMut<[u8; PAGE_SZ]> for PageWriteGuard<'_> {
fn as_mut(&mut self) -> &mut [u8; PAGE_SZ] {
self.inner.buf
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => &mut inner.buf,
PageWriteGuardState::Downgraded => todo!(),
}
}
}
impl PageWriteGuard<'_> {
impl<'a> PageWriteGuard<'a> {
/// Mark that the buffer contents are now valid.
pub fn mark_valid(&mut self) {
assert!(self.inner.key.is_some());
assert!(
!self.valid,
"mark_valid called on a buffer that was already valid"
);
self.valid = true;
#[must_use]
pub fn mark_valid(mut self) -> PageReadGuard<'a> {
let prev = std::mem::replace(&mut self.state, PageWriteGuardState::Downgraded);
match prev {
PageWriteGuardState::Invalid { inner, _permit } => {
assert!(inner.key.is_some());
PageReadGuard {
_permit: Arc::new(_permit),
slot_guard: inner.downgrade(),
}
}
PageWriteGuardState::Downgraded => unreachable!(),
}
}
}
@@ -339,11 +362,13 @@ impl Drop for PageWriteGuard<'_> {
/// initializing it, remove the mapping from the page cache.
///
fn drop(&mut self) {
assert!(self.inner.key.is_some());
if !self.valid {
let self_key = self.inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
self.inner.key = None;
match &mut self.state {
PageWriteGuardState::Invalid { inner, _permit } => {
let self_key = inner.key.as_ref().unwrap();
PAGE_CACHE.get().unwrap().remove_mapping(self_key);
inner.key = None;
}
PageWriteGuardState::Downgraded => {}
}
}
}
@@ -356,7 +381,7 @@ pub enum ReadBufResult<'a> {
/// lock_for_write() return value
pub enum WriteBufResult<'a> {
Found(PageWriteGuard<'a>),
Found(PageReadGuard<'a>),
NotFound(PageWriteGuard<'a>),
}
@@ -430,7 +455,7 @@ impl PageCache {
/// Store an image of the given page in the cache.
///
pub async fn memorize_materialized_page(
&self,
&'static self,
tenant_id: TenantId,
timeline_id: TimelineId,
key: Key,
@@ -447,15 +472,15 @@ impl PageCache {
};
match self.lock_for_write(&cache_key).await? {
WriteBufResult::Found(write_guard) => {
WriteBufResult::Found(read_guard) => {
// We already had it in cache. Another thread must've put it there
// concurrently. Check that it had the same contents that we
// replayed.
assert!(*write_guard == img);
assert!(*read_guard == img);
}
WriteBufResult::NotFound(mut write_guard) => {
write_guard.copy_from_slice(img);
write_guard.mark_valid();
let _ = write_guard.mark_valid();
}
}
@@ -465,7 +490,7 @@ impl PageCache {
// Section 1.2: Public interface functions for working with immutable file pages.
pub async fn read_immutable_buf(
&self,
&'static self,
file_id: FileId,
blkno: u32,
ctx: &RequestContext,
@@ -484,26 +509,13 @@ impl PageCache {
// not require changes.
async fn try_get_pinned_slot_permit(&self) -> anyhow::Result<PinnedSlotsPermit> {
let timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
match tokio::time::timeout(
// Choose small timeout, neon_smgr does its own retries.
// https://neondb.slack.com/archives/C04DGM6SMTM/p1694786876476869
Duration::from_secs(10),
Arc::clone(&self.pinned_slots).acquire_owned(),
)
.await
{
Ok(res) => Ok(PinnedSlotsPermit(
res.expect("this semaphore is never closed"),
)),
Err(_timeout) => {
timer.stop_and_discard();
crate::metrics::page_cache_errors_inc(
crate::metrics::PageCacheErrorKind::AcquirePinnedSlotTimeout,
);
anyhow::bail!("timeout: there were page guards alive for all page cache slots")
}
}
let _timer = crate::metrics::PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME.start_timer();
Ok(PinnedSlotsPermit(
Arc::clone(&self.pinned_slots)
.acquire_owned()
.await
.unwrap(),
))
}
/// Look up a page in the cache.
@@ -571,7 +583,7 @@ impl PageCache {
/// ```
///
async fn lock_for_read(
&self,
&'static self,
cache_key: &mut CacheKey,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
@@ -638,41 +650,31 @@ impl PageCache {
);
return Ok(ReadBufResult::NotFound(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: false,
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
}));
}
}
/// Look up a page in the cache and lock it in write mode. If it's not
/// found, returns None.
///
/// When locking a page for writing, the search criteria is always "exact".
// FIXME: the name is wrong.
async fn try_lock_for_write(
&self,
cache_key: &CacheKey,
permit: &mut Option<PinnedSlotsPermit>,
) -> Option<PageWriteGuard> {
) -> Option<PageReadGuard> {
if let Some(slot_idx) = self.search_mapping_for_write(cache_key) {
// The page was found in the mapping. Lock the slot, and re-check
// that it's still what we expected (because we don't released the mapping
// lock already, another thread could have evicted the page)
let slot = &self.slots[slot_idx];
let inner = slot.inner.write().await;
let inner = slot.inner.read().await;
if inner.key.as_ref() == Some(cache_key) {
slot.inc_usage_count();
debug_assert!(
{
let guard = inner.permit.lock().unwrap();
guard.upgrade().is_none()
},
"we hold a write lock, so, no one else should have a permit"
);
return Some(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: true,
return Some(PageReadGuard {
_permit: inner.coalesce_readers_permit(permit.take().unwrap()),
slot_guard: inner,
});
}
}
@@ -683,7 +685,7 @@ impl PageCache {
///
/// Similar to lock_for_read(), but the returned buffer is write-locked and
/// may be modified by the caller even if it's already found in the cache.
async fn lock_for_write(&self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
async fn lock_for_write(&'static self, cache_key: &CacheKey) -> anyhow::Result<WriteBufResult> {
let mut permit = Some(self.try_get_pinned_slot_permit().await?);
loop {
// First check if the key already exists in the cache.
@@ -728,9 +730,10 @@ impl PageCache {
);
return Ok(WriteBufResult::NotFound(PageWriteGuard {
_permit: permit.take().unwrap(),
inner,
valid: false,
state: PageWriteGuardState::Invalid {
_permit: permit.take().unwrap(),
inner,
},
}));
}
}
@@ -882,10 +885,20 @@ impl PageCache {
///
/// On return, the slot is empty and write-locked.
async fn find_victim(
&self,
&'static self,
_permit_witness: &PinnedSlotsPermit,
) -> anyhow::Result<(usize, tokio::sync::RwLockWriteGuard<SlotInner>)> {
let iter_limit = self.slots.len() * 10;
// Get in line.
let mut receiver = self.find_victim_waiters.recv();
// If we get cancelled at the receiver.await below, the victim slot
// remains in the channel. Consume these first before going into
// the loop below.
match futures::poll!(&mut receiver) {
Poll::Ready(Ok(res)) => return Ok(res),
Poll::Ready(Err(_closed)) => unreachable!("we never close the channel"),
Poll::Pending => {} // the regular case where we aren't cancelled below
};
let mut iters = 0;
loop {
iters += 1;
@@ -897,41 +910,8 @@ impl PageCache {
let mut inner = match slot.inner.try_write() {
Ok(inner) => inner,
Err(_err) => {
if iters > iter_limit {
// NB: Even with the permits, there's no hard guarantee that we will find a slot with
// any particular number of iterations: other threads might race ahead and acquire and
// release pins just as we're scanning the array.
//
// Imagine that nslots is 2, and as starting point, usage_count==1 on all
// slots. There are two threads running concurrently, A and B. A has just
// acquired the permit from the semaphore.
//
// A: Look at slot 1. Its usage_count == 1, so decrement it to zero, and continue the search
// B: Acquire permit.
// B: Look at slot 2, decrement its usage_count to zero and continue the search
// B: Look at slot 1. Its usage_count is zero, so pin it and bump up its usage_count to 1.
// B: Release pin and permit again
// B: Acquire permit.
// B: Look at slot 2. Its usage_count is zero, so pin it and bump up its usage_count to 1.
// B: Release pin and permit again
//
// Now we're back in the starting situation that both slots have
// usage_count 1, but A has now been through one iteration of the
// find_victim() loop. This can repeat indefinitely and on each
// iteration, A's iteration count increases by one.
//
// So, even though the semaphore for the permits is fair, the victim search
// itself happens in parallel and is not fair.
// Hence even with a permit, a task can theoretically be starved.
// To avoid this, we'd need tokio to give priority to tasks that are holding
// permits for longer.
// Note that just yielding to tokio during iteration without such
// priority boosting is likely counter-productive. We'd just give more opportunities
// for B to bump usage count, further starving A.
crate::metrics::page_cache_errors_inc(
crate::metrics::PageCacheErrorKind::EvictIterLimit,
);
anyhow::bail!("exceeded evict iter limit");
if iters > self.slots.len() * (MAX_USAGE_COUNT as usize) {
unreachable!("find_victim_waiters prevents starvation");
}
continue;
}
@@ -942,7 +922,10 @@ impl PageCache {
inner.key = None;
}
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
return Ok((slot_idx, inner));
self.find_victim_sender
.try_send((slot_idx, inner))
.expect("we always get in line first");
return Ok(receiver.await.unwrap());
}
}
}
@@ -979,6 +962,7 @@ impl PageCache {
})
.collect();
let (find_victim_sender, find_victim_waiters) = async_channel::bounded(num_pages);
Self {
materialized_page_map: Default::default(),
immutable_page_map: Default::default(),
@@ -986,6 +970,8 @@ impl PageCache {
next_evict_slot: AtomicUsize::new(0),
size_metrics,
pinned_slots: Arc::new(tokio::sync::Semaphore::new(num_pages)),
find_victim_sender,
find_victim_waiters,
}
}
}

View File

@@ -412,53 +412,31 @@ impl PageServerHandler {
// TODO: We could create a new per-request context here, with unique ID.
// Currently we use the same per-timeline context for all requests
let (response, span) = match neon_fe_msg {
let response = match neon_fe_msg {
PagestreamFeMessage::Exists(req) => {
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelExists);
let span = tracing::info_span!("handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.lsn);
(
self.handle_get_rel_exists_request(&timeline, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
self.handle_get_rel_exists_request(&timeline, &req, &ctx)
.await
}
PagestreamFeMessage::Nblocks(req) => {
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetRelSize);
let span = tracing::info_span!("handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.lsn);
(
self.handle_get_nblocks_request(&timeline, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
self.handle_get_nblocks_request(&timeline, &req, &ctx).await
}
PagestreamFeMessage::GetPage(req) => {
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetPageAtLsn);
let span = tracing::info_span!("handle_get_page_at_lsn_request", rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn);
(
self.handle_get_page_at_lsn_request(&timeline, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
self.handle_get_page_at_lsn_request(&timeline, &req, &ctx)
.await
}
PagestreamFeMessage::DbSize(req) => {
let _timer = metrics.start_timer(metrics::SmgrQueryType::GetDbSize);
let span = tracing::info_span!("handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.lsn);
(
self.handle_db_size_request(&timeline, &req, &ctx)
.instrument(span.clone())
.await,
span,
)
self.handle_db_size_request(&timeline, &req, &ctx).await
}
};
let response = response.unwrap_or_else(|e| {
// print the all details to the log with {:#}, but for the client the
// error message is enough
span.in_scope(|| error!("error reading relation or page version: {:#}", e));
error!("error reading relation or page version: {:?}", e);
PagestreamBeMessage::Error(PagestreamErrorResponse {
message: e.to_string(),
})
@@ -649,6 +627,7 @@ impl PageServerHandler {
Ok(lsn)
}
#[instrument(skip(self, timeline, req, ctx), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_rel_exists_request(
&self,
timeline: &Timeline,
@@ -669,6 +648,7 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, timeline, req, ctx), fields(rel = %req.rel, req_lsn = %req.lsn))]
async fn handle_get_nblocks_request(
&self,
timeline: &Timeline,
@@ -687,6 +667,7 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, timeline, req, ctx), fields(dbnode = %req.dbnode, req_lsn = %req.lsn))]
async fn handle_db_size_request(
&self,
timeline: &Timeline,
@@ -708,6 +689,7 @@ impl PageServerHandler {
}))
}
#[instrument(skip(self, timeline, req, ctx), fields(rel = %req.rel, blkno = %req.blkno, req_lsn = %req.lsn))]
async fn handle_get_page_at_lsn_request(
&self,
timeline: &Timeline,
@@ -1283,10 +1265,7 @@ async fn get_active_tenant_with_timeout(
Ok(tenant) => tenant,
Err(e @ GetTenantError::NotFound(_)) => return Err(GetActiveTenantError::NotFound(e)),
Err(GetTenantError::NotActive(_)) => {
unreachable!("we're calling get_tenant with active_only=false")
}
Err(GetTenantError::Broken(_)) => {
unreachable!("we're calling get_tenant with active_only=false")
unreachable!("we're calling get_tenant with active=false")
}
};
let wait_time = Duration::from_secs(30);

View File

@@ -1,6 +1,6 @@
//! Wrapper around nix::sys::statvfs::Statvfs that allows for mocking.
use camino::Utf8Path;
use std::path::Path;
pub enum Statvfs {
Real(nix::sys::statvfs::Statvfs),
@@ -12,13 +12,11 @@ pub enum Statvfs {
// Sincce it should only be a problem on > 2TiB disks, let's ignore
// the problem for now and upcast to u64.
impl Statvfs {
pub fn get(tenants_dir: &Utf8Path, mocked: Option<&mock::Behavior>) -> nix::Result<Self> {
pub fn get(tenants_dir: &Path, mocked: Option<&mock::Behavior>) -> nix::Result<Self> {
if let Some(mocked) = mocked {
Ok(Statvfs::Mock(mock::get(tenants_dir, mocked)?))
} else {
Ok(Statvfs::Real(nix::sys::statvfs::statvfs(
tenants_dir.as_std_path(),
)?))
Ok(Statvfs::Real(nix::sys::statvfs::statvfs(tenants_dir)?))
}
}
@@ -57,8 +55,8 @@ impl Statvfs {
pub mod mock {
use anyhow::Context;
use camino::Utf8Path;
use regex::Regex;
use std::path::Path;
use tracing::log::info;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
@@ -88,7 +86,7 @@ pub mod mock {
}
}
pub fn get(tenants_dir: &Utf8Path, behavior: &Behavior) -> nix::Result<Statvfs> {
pub fn get(tenants_dir: &Path, behavior: &Behavior) -> nix::Result<Statvfs> {
info!("running mocked statvfs");
match behavior {
@@ -121,7 +119,7 @@ pub mod mock {
}
}
fn walk_dir_disk_usage(path: &Utf8Path, name_filter: Option<&Regex>) -> anyhow::Result<u64> {
fn walk_dir_disk_usage(path: &Path, name_filter: Option<&Regex>) -> anyhow::Result<u64> {
let mut total = 0;
for entry in walkdir::WalkDir::new(path) {
let entry = entry?;

View File

@@ -293,6 +293,8 @@ pub enum TaskKind {
DebugTool,
BackgroundRuntimeTurnaroundMeasure,
#[cfg(test)]
UnitTest,
}

View File

@@ -12,7 +12,6 @@
//!
use anyhow::{bail, Context};
use camino::{Utf8Path, Utf8PathBuf};
use futures::FutureExt;
use pageserver_api::models::TimelineState;
use remote_storage::DownloadError;
@@ -35,6 +34,8 @@ use std::fs;
use std::fs::File;
use std::io;
use std::ops::Bound::Included;
use std::path::Path;
use std::path::PathBuf;
use std::process::Command;
use std::process::Stdio;
use std::sync::atomic::AtomicU64;
@@ -771,7 +772,7 @@ impl Tenant {
}
std::fs::remove_file(&marker_file)
.with_context(|| format!("unlink attach marker file {marker_file}"))?;
.with_context(|| format!("unlink attach marker file {}", marker_file.display()))?;
crashsafe::fsync(marker_file.parent().expect("marker file has parent dir"))
.context("fsync tenant directory after unlinking attach marker file")?;
@@ -1023,47 +1024,58 @@ impl Tenant {
let timelines_dir = self.conf.timelines_path(&self.tenant_id);
for entry in timelines_dir
.read_dir_utf8()
.context("list timelines directory for tenant")?
for entry in
std::fs::read_dir(&timelines_dir).context("list timelines directory for tenant")?
{
let entry = entry.context("read timeline dir entry")?;
let timeline_dir = entry.path();
if crate::is_temporary(timeline_dir) {
info!("Found temporary timeline directory, removing: {timeline_dir}");
if let Err(e) = std::fs::remove_dir_all(timeline_dir) {
error!("Failed to remove temporary directory '{timeline_dir}': {e:?}");
if crate::is_temporary(&timeline_dir) {
info!(
"Found temporary timeline directory, removing: {}",
timeline_dir.display()
);
if let Err(e) = std::fs::remove_dir_all(&timeline_dir) {
error!(
"Failed to remove temporary directory '{}': {:?}",
timeline_dir.display(),
e
);
}
} else if is_uninit_mark(timeline_dir) {
} else if is_uninit_mark(&timeline_dir) {
if !timeline_dir.exists() {
warn!("Timeline dir entry become invalid: {timeline_dir}");
warn!(
"Timeline dir entry become invalid: {}",
timeline_dir.display()
);
continue;
}
let timeline_uninit_mark_file = &timeline_dir;
info!(
"Found an uninit mark file {timeline_uninit_mark_file}, removing the timeline and its uninit mark",
"Found an uninit mark file {}, removing the timeline and its uninit mark",
timeline_uninit_mark_file.display()
);
let timeline_id =
TimelineId::try_from(timeline_uninit_mark_file.file_stem())
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline uninit mark name {timeline_uninit_mark_file}",
let timeline_id = TimelineId::try_from(timeline_uninit_mark_file.file_stem())
.with_context(|| {
format!(
"Could not parse timeline id out of the timeline uninit mark name {}",
timeline_uninit_mark_file.display()
)
})?;
})?;
let timeline_dir = self.conf.timeline_path(&self.tenant_id, &timeline_id);
if let Err(e) =
remove_timeline_and_uninit_mark(&timeline_dir, timeline_uninit_mark_file)
{
error!("Failed to clean up uninit marked timeline: {e:?}");
}
} else if crate::is_delete_mark(timeline_dir) {
} else if crate::is_delete_mark(&timeline_dir) {
// If metadata exists, load as usual, continue deletion
let timeline_id = TimelineId::try_from(timeline_dir.file_stem())
.with_context(|| {
let timeline_id =
TimelineId::try_from(timeline_dir.file_stem()).with_context(|| {
format!(
"Could not parse timeline id out of the timeline uninit mark name {timeline_dir}",
"Could not parse timeline id out of the timeline uninit mark name {}",
timeline_dir.display()
)
})?;
@@ -1102,13 +1114,17 @@ impl Tenant {
}
} else {
if !timeline_dir.exists() {
warn!("Timeline dir entry become invalid: {timeline_dir}");
warn!(
"Timeline dir entry become invalid: {}",
timeline_dir.display()
);
continue;
}
let timeline_id = TimelineId::try_from(timeline_dir.file_name())
.with_context(|| {
let timeline_id =
TimelineId::try_from(timeline_dir.file_name()).with_context(|| {
format!(
"Could not parse timeline id out of the timeline dir name {timeline_dir}",
"Could not parse timeline id out of the timeline dir name {}",
timeline_dir.display()
)
})?;
let timeline_uninit_mark_file = self
@@ -1120,7 +1136,7 @@ impl Tenant {
"Found an uninit mark file, removing the timeline and its uninit mark",
);
if let Err(e) =
remove_timeline_and_uninit_mark(timeline_dir, &timeline_uninit_mark_file)
remove_timeline_and_uninit_mark(&timeline_dir, &timeline_uninit_mark_file)
{
error!("Failed to clean up uninit marked timeline: {e:?}");
}
@@ -1136,13 +1152,18 @@ impl Tenant {
}
let file_name = entry.file_name();
if let Ok(timeline_id) = file_name.parse::<TimelineId>() {
if let Ok(timeline_id) =
file_name.to_str().unwrap_or_default().parse::<TimelineId>()
{
let metadata = load_metadata(self.conf, &self.tenant_id, &timeline_id)
.context("failed to load metadata")?;
timelines_to_load.insert(timeline_id, metadata);
} else {
// A file or directory that doesn't look like a timeline ID
warn!("unexpected file or directory in timelines directory: {file_name}");
warn!(
"unexpected file or directory in timelines directory: {}",
file_name.to_string_lossy()
);
}
}
}
@@ -2333,24 +2354,26 @@ impl Tenant {
tenant_id: &TenantId,
) -> anyhow::Result<TenantConfOpt> {
let target_config_path = conf.tenant_config_path(tenant_id);
let target_config_display = target_config_path.display();
info!("loading tenantconf from {target_config_path}");
info!("loading tenantconf from {target_config_display}");
// FIXME If the config file is not found, assume that we're attaching
// a detached tenant and config is passed via attach command.
// https://github.com/neondatabase/neon/issues/1555
// OR: we're loading after incomplete deletion that managed to remove config.
if !target_config_path.exists() {
info!("tenant config not found in {target_config_path}");
info!("tenant config not found in {target_config_display}");
return Ok(TenantConfOpt::default());
}
// load and parse file
let config = fs::read_to_string(&target_config_path)
.with_context(|| format!("Failed to load config from path '{target_config_path}'"))?;
let config = fs::read_to_string(&target_config_path).with_context(|| {
format!("Failed to load config from path '{target_config_display}'")
})?;
let toml = config.parse::<toml_edit::Document>().with_context(|| {
format!("Failed to parse config from file '{target_config_path}' as toml file")
format!("Failed to parse config from file '{target_config_display}' as toml file")
})?;
let mut tenant_conf = TenantConfOpt::default();
@@ -2358,12 +2381,11 @@ impl Tenant {
match key {
"tenant_config" => {
tenant_conf = PageServerConf::parse_toml_tenant_conf(item).with_context(|| {
format!("Failed to parse config from file '{target_config_path}' as pageserver config")
format!("Failed to parse config from file '{target_config_display}' as pageserver config")
})?;
}
_ => bail!(
"config file {target_config_path} has unrecognized pageserver option '{key}'"
),
_ => bail!("config file {target_config_display} has unrecognized pageserver option '{key}'"),
}
}
@@ -2373,11 +2395,11 @@ impl Tenant {
#[tracing::instrument(skip_all, fields(%tenant_id))]
pub(super) async fn persist_tenant_config(
tenant_id: &TenantId,
target_config_path: &Utf8Path,
target_config_path: &Path,
tenant_conf: TenantConfOpt,
) -> anyhow::Result<()> {
// imitate a try-block with a closure
info!("persisting tenantconf to {target_config_path}");
info!("persisting tenantconf to {}", target_config_path.display());
let mut conf_content = r#"# This file contains a specific per-tenant's config.
# It is read in case of pageserver restart.
@@ -2394,7 +2416,12 @@ impl Tenant {
let temp_path = path_with_suffix_extension(target_config_path, TEMP_FILE_SUFFIX);
VirtualFile::crashsafe_overwrite(target_config_path, &temp_path, conf_content)
.await
.with_context(|| format!("write tenant {tenant_id} config to {target_config_path}"))?;
.with_context(|| {
format!(
"write tenant {tenant_id} config to {}",
target_config_path.display()
)
})?;
Ok(())
}
@@ -2761,7 +2788,10 @@ impl Tenant {
// current initdb was not run yet, so remove whatever was left from the previous runs
if initdb_path.exists() {
fs::remove_dir_all(&initdb_path).with_context(|| {
format!("Failed to remove already existing initdb directory: {initdb_path}")
format!(
"Failed to remove already existing initdb directory: {}",
initdb_path.display()
)
})?;
}
// Init temporarily repo to get bootstrap data, this creates a directory in the `initdb_path` path
@@ -2770,7 +2800,7 @@ impl Tenant {
scopeguard::defer! {
if let Err(e) = fs::remove_dir_all(&initdb_path) {
// this is unlikely, but we will remove the directory on pageserver restart or another bootstrap call
error!("Failed to remove temporary initdb directory '{initdb_path}': {e}");
error!("Failed to remove temporary initdb directory '{}': {}", initdb_path.display(), e);
}
}
let pgdata_path = &initdb_path;
@@ -2920,7 +2950,7 @@ impl Tenant {
async fn create_timeline_files(
&self,
timeline_path: &Utf8Path,
timeline_path: &Path,
new_timeline_id: &TimelineId,
new_metadata: &TimelineMetadata,
) -> anyhow::Result<()> {
@@ -2954,7 +2984,8 @@ impl Tenant {
let timeline_path = self.conf.timeline_path(&tenant_id, &timeline_id);
anyhow::ensure!(
!timeline_path.exists(),
"Timeline {timeline_path} already exists, cannot create its uninit mark file",
"Timeline {} already exists, cannot create its uninit mark file",
timeline_path.display()
);
let uninit_mark_path = self
@@ -3046,10 +3077,7 @@ impl Tenant {
}
}
fn remove_timeline_and_uninit_mark(
timeline_dir: &Utf8Path,
uninit_mark: &Utf8Path,
) -> anyhow::Result<()> {
fn remove_timeline_and_uninit_mark(timeline_dir: &Path, uninit_mark: &Path) -> anyhow::Result<()> {
fs::remove_dir_all(timeline_dir)
.or_else(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
@@ -3061,10 +3089,17 @@ fn remove_timeline_and_uninit_mark(
}
})
.with_context(|| {
format!("Failed to remove unit marked timeline directory {timeline_dir}")
format!(
"Failed to remove unit marked timeline directory {}",
timeline_dir.display()
)
})?;
fs::remove_file(uninit_mark)
.with_context(|| format!("Failed to remove timeline uninit mark file {uninit_mark}"))?;
fs::remove_file(uninit_mark).with_context(|| {
format!(
"Failed to remove timeline uninit mark file {}",
uninit_mark.display()
)
})?;
Ok(())
}
@@ -3079,7 +3114,7 @@ pub(crate) async fn create_tenant_files(
tenant_conf: TenantConfOpt,
tenant_id: &TenantId,
mode: CreateTenantFilesMode,
) -> anyhow::Result<Utf8PathBuf> {
) -> anyhow::Result<PathBuf> {
let target_tenant_directory = conf.tenant_path(tenant_id);
anyhow::ensure!(
!target_tenant_directory
@@ -3090,11 +3125,17 @@ pub(crate) async fn create_tenant_files(
let temporary_tenant_dir =
path_with_suffix_extension(&target_tenant_directory, TEMP_FILE_SUFFIX);
debug!("Creating temporary directory structure in {temporary_tenant_dir}");
debug!(
"Creating temporary directory structure in {}",
temporary_tenant_dir.display()
);
// top-level dir may exist if we are creating it through CLI
crashsafe::create_dir_all(&temporary_tenant_dir).with_context(|| {
format!("could not create temporary tenant directory {temporary_tenant_dir}")
format!(
"could not create temporary tenant directory {}",
temporary_tenant_dir.display()
)
})?;
let creation_result = try_create_target_tenant_dir(
@@ -3128,8 +3169,8 @@ async fn try_create_target_tenant_dir(
tenant_conf: TenantConfOpt,
tenant_id: &TenantId,
mode: CreateTenantFilesMode,
temporary_tenant_dir: &Utf8Path,
target_tenant_directory: &Utf8Path,
temporary_tenant_dir: &Path,
target_tenant_directory: &Path,
) -> Result<(), anyhow::Error> {
match mode {
CreateTenantFilesMode::Create => {} // needs no attach marker, writing tenant conf + atomic rename of dir is good enough
@@ -3167,7 +3208,8 @@ async fn try_create_target_tenant_dir(
crashsafe::create_dir(&temporary_tenant_timelines_dir).with_context(|| {
format!(
"create tenant {} temporary timelines directory {}",
tenant_id, temporary_tenant_timelines_dir,
tenant_id,
temporary_tenant_timelines_dir.display()
)
})?;
fail::fail_point!("tenant-creation-before-tmp-rename", |_| {
@@ -3182,34 +3224,35 @@ async fn try_create_target_tenant_dir(
fs::rename(temporary_tenant_dir, target_tenant_directory).with_context(|| {
format!(
"move tenant {} temporary directory {} into the permanent one {}",
tenant_id, temporary_tenant_dir, target_tenant_directory
tenant_id,
temporary_tenant_dir.display(),
target_tenant_directory.display()
)
})?;
let target_dir_parent = target_tenant_directory.parent().with_context(|| {
format!(
"get tenant {} dir parent for {}",
tenant_id, target_tenant_directory,
tenant_id,
target_tenant_directory.display()
)
})?;
crashsafe::fsync(target_dir_parent).with_context(|| {
format!(
"fsync renamed directory's parent {} for tenant {}",
target_dir_parent, tenant_id,
target_dir_parent.display(),
tenant_id,
)
})?;
Ok(())
}
fn rebase_directory(
original_path: &Utf8Path,
base: &Utf8Path,
new_base: &Utf8Path,
) -> anyhow::Result<Utf8PathBuf> {
fn rebase_directory(original_path: &Path, base: &Path, new_base: &Path) -> anyhow::Result<PathBuf> {
let relative_path = original_path.strip_prefix(base).with_context(|| {
format!(
"Failed to strip base prefix '{}' off path '{}'",
base, original_path
base.display(),
original_path.display()
)
})?;
Ok(new_base.join(relative_path))
@@ -3219,18 +3262,20 @@ fn rebase_directory(
/// to get bootstrap data for timeline initialization.
fn run_initdb(
conf: &'static PageServerConf,
initdb_target_dir: &Utf8Path,
initdb_target_dir: &Path,
pg_version: u32,
) -> anyhow::Result<()> {
let initdb_bin_path = conf.pg_bin_dir(pg_version)?.join("initdb");
let initdb_lib_dir = conf.pg_lib_dir(pg_version)?;
info!(
"running {} in {}, libdir: {}",
initdb_bin_path, initdb_target_dir, initdb_lib_dir,
initdb_bin_path.display(),
initdb_target_dir.display(),
initdb_lib_dir.display(),
);
let initdb_output = Command::new(&initdb_bin_path)
.args(["-D", initdb_target_dir.as_ref()])
.args(["-D", &initdb_target_dir.to_string_lossy()])
.args(["-U", &conf.superuser])
.args(["-E", "utf8"])
.arg("--no-instructions")
@@ -3245,7 +3290,8 @@ fn run_initdb(
.with_context(|| {
format!(
"failed to execute {} at target dir {}",
initdb_bin_path, initdb_target_dir,
initdb_bin_path.display(),
initdb_target_dir.display()
)
})?;
if !initdb_output.status.success() {
@@ -3265,7 +3311,7 @@ impl Drop for Tenant {
}
/// Dump contents of a layer file to stdout.
pub async fn dump_layerfile_from_path(
path: &Utf8Path,
path: &Path,
verbose: bool,
ctx: &RequestContext,
) -> anyhow::Result<()> {
@@ -3298,8 +3344,8 @@ pub async fn dump_layerfile_from_path(
pub mod harness {
use bytes::{Bytes, BytesMut};
use once_cell::sync::OnceCell;
use std::fs;
use std::sync::Arc;
use std::{fs, path::PathBuf};
use utils::logging;
use utils::lsn::Lsn;
@@ -3364,7 +3410,7 @@ pub mod harness {
pub tenant_id: TenantId,
pub generation: Generation,
pub remote_storage: GenericRemoteStorage,
pub remote_fs_dir: Utf8PathBuf,
pub remote_fs_dir: PathBuf,
pub deletion_queue: MockDeletionQueue,
}
@@ -3463,7 +3509,7 @@ pub mod harness {
Ok(tenant)
}
pub fn timeline_path(&self, timeline_id: &TimelineId) -> Utf8PathBuf {
pub fn timeline_path(&self, timeline_id: &TimelineId) -> PathBuf {
self.conf.timeline_path(&self.tenant_id, timeline_id)
}
}

View File

@@ -234,21 +234,18 @@ impl BlobWriter<false> {
#[cfg(test)]
mod tests {
use super::*;
use crate::{
context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef,
virtual_file::Error,
};
use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
use rand::{Rng, SeedableRng};
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
let temp_dir = camino_tempfile::tempdir()?;
let pathbuf = temp_dir.path().join("file");
let temp_dir = tempfile::tempdir()?;
let path = temp_dir.path().join("file");
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
// Write part (in block to drop the file)
let mut offsets = Vec::new();
{
let file = VirtualFile::create(pathbuf.as_path()).await?;
let file = VirtualFile::create(&path).await?;
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let offs = wtr.write_blob(blob).await?;
@@ -261,7 +258,7 @@ mod tests {
wtr.flush_buffer().await?;
}
let file = VirtualFile::open(pathbuf.as_path()).await?;
let file = VirtualFile::open(&path).await?;
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new(rdr);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {

View File

@@ -6,7 +6,7 @@ use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::{self, VirtualFile};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::ops::{Deref, DerefMut};
@@ -96,7 +96,7 @@ impl<'a> BlockReaderRef<'a> {
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
VirtualFile(r) => r.read_blk(blknum).await.map_err(virtual_file::Error::into),
VirtualFile(r) => r.read_blk(blknum).await,
}
}
}
@@ -174,7 +174,6 @@ impl FileBlockReader {
self.file
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
.await
.map_err(virtual_file::Error::into)
}
/// Read a block.
///
@@ -187,27 +186,22 @@ impl FileBlockReader {
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => break Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Failed to read immutable buf: {e:#}"),
)
})? {
ReadBufResult::Found(guard) => return Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
return Ok(write_guard.mark_valid().into());
}
};
}
}

View File

@@ -1,7 +1,9 @@
use std::sync::Arc;
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::models::TenantState;
use remote_storage::{DownloadError, GenericRemoteStorage, RemotePath};
use tokio::sync::OwnedMutexGuard;
@@ -60,7 +62,7 @@ fn remote_tenant_delete_mark_path(
.context("Failed to strip workdir prefix")
.and_then(RemotePath::new)
.context("tenant path")?;
Ok(tenant_remote_path.join(Utf8Path::new("deleted")))
Ok(tenant_remote_path.join(Path::new("deleted")))
}
async fn create_remote_delete_mark(
@@ -146,7 +148,7 @@ async fn schedule_ordered_timeline_deletions(
Ok(already_running_deletions)
}
async fn ensure_timelines_dir_empty(timelines_path: &Utf8Path) -> Result<(), DeleteTenantError> {
async fn ensure_timelines_dir_empty(timelines_path: &Path) -> Result<(), DeleteTenantError> {
// Assert timelines dir is empty.
if !fs_ext::is_directory_empty(timelines_path).await? {
// Display first 10 items in directory
@@ -186,14 +188,17 @@ async fn cleanup_remaining_fs_traces(
conf: &PageServerConf,
tenant_id: &TenantId,
) -> Result<(), DeleteTenantError> {
let rm = |p: Utf8PathBuf, is_dir: bool| async move {
let rm = |p: PathBuf, is_dir: bool| async move {
if is_dir {
tokio::fs::remove_dir(&p).await
} else {
tokio::fs::remove_file(&p).await
}
.or_else(fs_ext::ignore_not_found)
.with_context(|| format!("failed to delete {p}"))
.with_context(|| {
let to_display = p.display();
format!("failed to delete {to_display}")
})
};
rm(conf.tenant_config_path(tenant_id), false).await?;

View File

@@ -6,11 +6,11 @@ use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::VirtualFile;
use camino::Utf8PathBuf;
use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use tracing::*;
use utils::id::{TenantId, TimelineId};
@@ -40,9 +40,7 @@ impl EphemeralFile {
let filename = conf
.timeline_path(&tenant_id, &timeline_id)
.join(Utf8PathBuf::from(format!(
"ephemeral-{filename_disambiguator}"
)));
.join(PathBuf::from(format!("ephemeral-{filename_disambiguator}")));
let file = VirtualFile::open_with_options(
&filename,
@@ -72,36 +70,34 @@ impl EphemeralFile {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum, self.file.path, e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
write_guard.mark_valid();
// Swap for read lock
continue;
}
};
}
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.await
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
// order path before error because error is anyhow::Error => might have many contexts
format!(
"ephemeral file: read immutable page #{}: {}: {:#}",
blknum,
self.file.path.display(),
e,
),
)
})? {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));
}
};
} else {
debug_assert_eq!(blknum as u64, self.len / PAGE_SZ as u64);
Ok(BlockLease::EphemeralFileMutableTail(&self.mutable_tail))
@@ -171,7 +167,7 @@ impl EphemeralFile {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
buf.copy_from_slice(&self.ephemeral_file.mutable_tail);
write_guard.mark_valid();
let _ = write_guard.mark_valid();
// pre-warm successful
}
Err(e) => {
@@ -195,7 +191,7 @@ impl EphemeralFile {
"ephemeral_file: write_blob: write-back full tail blk #{}: {:#}: {}",
self.blknum,
e,
self.ephemeral_file.file.path,
self.ephemeral_file.file.path.display(),
),
));
}
@@ -258,7 +254,8 @@ impl Drop for EphemeralFile {
// not found files might also be related to https://github.com/neondatabase/neon/issues/2442
error!(
"could not remove ephemeral file '{}': {}",
self.file.path, e
self.file.path.display(),
e
);
}
}

View File

@@ -1,9 +1,10 @@
//! This module acts as a switchboard to access different repositories managed by this
//! page server.
use camino::{Utf8Path, Utf8PathBuf};
use rand::{distributions::Alphanumeric, Rng};
use std::collections::{hash_map, HashMap};
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs;
@@ -72,12 +73,12 @@ impl TenantsMap {
///
/// This is pageserver-specific, as it relies on future processes after a crash to check
/// for TEMP_FILE_SUFFIX when loading things.
async fn safe_remove_tenant_dir_all(path: impl AsRef<Utf8Path>) -> std::io::Result<()> {
async fn safe_remove_tenant_dir_all(path: impl AsRef<Path>) -> std::io::Result<()> {
let tmp_path = safe_rename_tenant_dir(path).await?;
fs::remove_dir_all(tmp_path).await
}
async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<Utf8PathBuf> {
async fn safe_rename_tenant_dir(path: impl AsRef<Path>) -> std::io::Result<PathBuf> {
let parent = path
.as_ref()
.parent()
@@ -94,7 +95,7 @@ async fn safe_rename_tenant_dir(path: impl AsRef<Utf8Path>) -> std::io::Result<U
.collect::<String>()
+ TEMP_FILE_SUFFIX;
let tmp_path = path_with_suffix_extension(&path, &rand_suffix);
fs::rename(path.as_ref(), &tmp_path).await?;
fs::rename(&path, &tmp_path).await?;
fs::File::open(parent).await?.sync_all().await?;
Ok(tmp_path)
}
@@ -145,25 +146,29 @@ pub async fn init_tenant_mgr(
None
};
let mut dir_entries = tenants_dir
.read_dir_utf8()
let mut dir_entries = fs::read_dir(&tenants_dir)
.await
.with_context(|| format!("Failed to list tenants dir {tenants_dir:?}"))?;
let ctx = RequestContext::todo_child(TaskKind::Startup, DownloadBehavior::Warn);
loop {
match dir_entries.next() {
None => break,
Some(Ok(dir_entry)) => {
let tenant_dir_path = dir_entry.path().to_path_buf();
match dir_entries.next_entry().await {
Ok(None) => break,
Ok(Some(dir_entry)) => {
let tenant_dir_path = dir_entry.path();
if crate::is_temporary(&tenant_dir_path) {
info!("Found temporary tenant directory, removing: {tenant_dir_path}");
info!(
"Found temporary tenant directory, removing: {}",
tenant_dir_path.display()
);
// No need to use safe_remove_tenant_dir_all because this is already
// a temporary path
if let Err(e) = fs::remove_dir_all(&tenant_dir_path).await {
error!(
"Failed to remove temporary directory '{}': {:?}",
tenant_dir_path, e
tenant_dir_path.display(),
e
);
}
} else {
@@ -178,7 +183,7 @@ pub async fn init_tenant_mgr(
if let Err(e) = fs::remove_dir(&tenant_dir_path).await {
error!(
"Failed to remove empty tenant directory '{}': {e:#}",
tenant_dir_path
tenant_dir_path.display()
)
}
continue;
@@ -192,6 +197,7 @@ pub async fn init_tenant_mgr(
let tenant_id = match tenant_dir_path
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.parse::<TenantId>()
{
@@ -199,7 +205,7 @@ pub async fn init_tenant_mgr(
Err(_) => {
warn!(
"Invalid tenant path (garbage in our repo directory?): {}",
tenant_dir_path
tenant_dir_path.display()
);
continue;
}
@@ -215,7 +221,8 @@ pub async fn init_tenant_mgr(
if let Err(e) = safe_remove_tenant_dir_all(&tenant_dir_path).await {
error!(
"Failed to remove detached tenant directory '{}': {:?}",
tenant_dir_path, e
tenant_dir_path.display(),
e
);
}
continue;
@@ -225,7 +232,7 @@ pub async fn init_tenant_mgr(
// on local disk may activate
info!(
"Starting tenant {} in legacy mode, no generation",
tenant_dir_path
tenant_dir_path.display()
);
Generation::none()
};
@@ -249,7 +256,7 @@ pub async fn init_tenant_mgr(
}
}
}
Some(Err(e)) => {
Err(e) => {
// On error, print it, but continue with the other tenants. If we error out
// here, the pageserver startup fails altogether, causing outage for *all*
// tenants. That seems worse.
@@ -272,7 +279,7 @@ pub async fn init_tenant_mgr(
pub(crate) fn schedule_local_tenant_processing(
conf: &'static PageServerConf,
tenant_id: TenantId,
tenant_path: &Utf8Path,
tenant_path: &Path,
generation: Generation,
resources: TenantSharedResources,
init_order: Option<InitializationOrder>,
@@ -503,11 +510,6 @@ pub enum GetTenantError {
NotFound(TenantId),
#[error("Tenant {0} is not active")]
NotActive(TenantId),
/// Broken is logically a subset of NotActive, but a distinct error is useful as
/// NotActive is usually a retryable state for API purposes, whereas Broken
/// is a stuck error state
#[error("Tenant is broken: {0}")]
Broken(String),
}
/// Gets the tenant from the in-memory data, erroring if it's absent or is not fitting to the query.
@@ -522,20 +524,10 @@ pub async fn get_tenant(
let tenant = m
.get(&tenant_id)
.ok_or(GetTenantError::NotFound(tenant_id))?;
match tenant.current_state() {
TenantState::Broken {
reason,
backtrace: _,
} if active_only => Err(GetTenantError::Broken(reason)),
TenantState::Active => Ok(Arc::clone(tenant)),
_ => {
if active_only {
Err(GetTenantError::NotActive(tenant_id))
} else {
Ok(Arc::clone(tenant))
}
}
if active_only && !tenant.is_active() {
Err(GetTenantError::NotActive(tenant_id))
} else {
Ok(Arc::clone(tenant))
}
}
@@ -608,7 +600,7 @@ async fn detach_tenant0(
tenants: &tokio::sync::RwLock<TenantsMap>,
tenant_id: TenantId,
detach_ignored: bool,
) -> Result<Utf8PathBuf, TenantStateError> {
) -> Result<PathBuf, TenantStateError> {
let tenant_dir_rename_operation = |tenant_id_to_clean| async move {
let local_tenant_directory = conf.tenant_path(&tenant_id_to_clean);
safe_rename_tenant_dir(&local_tenant_directory)

View File

@@ -1,17 +1,16 @@
use std::{
io,
path::{Path, PathBuf},
sync::atomic::{AtomicUsize, Ordering},
};
use camino::{Utf8Path, Utf8PathBuf};
fn fsync_path(path: &Utf8Path) -> io::Result<()> {
fn fsync_path(path: &Path) -> io::Result<()> {
// TODO use VirtualFile::fsync_all once we fully go async.
let file = std::fs::File::open(path)?;
file.sync_all()
}
fn parallel_worker(paths: &[Utf8PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
fn parallel_worker(paths: &[PathBuf], next_path_idx: &AtomicUsize) -> io::Result<()> {
while let Some(path) = paths.get(next_path_idx.fetch_add(1, Ordering::Relaxed)) {
fsync_path(path)?;
}
@@ -19,7 +18,7 @@ fn parallel_worker(paths: &[Utf8PathBuf], next_path_idx: &AtomicUsize) -> io::Re
Ok(())
}
fn fsync_in_thread_pool(paths: &[Utf8PathBuf]) -> io::Result<()> {
fn fsync_in_thread_pool(paths: &[PathBuf]) -> io::Result<()> {
// TODO: remove this function in favor of `par_fsync_async` once we asyncify everything.
/// Use at most this number of threads.
@@ -48,7 +47,7 @@ fn fsync_in_thread_pool(paths: &[Utf8PathBuf]) -> io::Result<()> {
}
/// Parallel fsync all files. Can be used in non-async context as it is using rayon thread pool.
pub fn par_fsync(paths: &[Utf8PathBuf]) -> io::Result<()> {
pub fn par_fsync(paths: &[PathBuf]) -> io::Result<()> {
if paths.len() == 1 {
fsync_path(&paths[0])?;
return Ok(());
@@ -59,7 +58,7 @@ pub fn par_fsync(paths: &[Utf8PathBuf]) -> io::Result<()> {
/// Parallel fsync asynchronously. If number of files are less than PARALLEL_PATH_THRESHOLD, fsync is done in the current
/// execution thread. Otherwise, we will spawn_blocking and run it in tokio.
pub async fn par_fsync_async(paths: &[Utf8PathBuf]) -> io::Result<()> {
pub async fn par_fsync_async(paths: &[PathBuf]) -> io::Result<()> {
const MAX_CONCURRENT_FSYNC: usize = 64;
let mut next = paths.iter().peekable();
let mut js = tokio::task::JoinSet::new();

View File

@@ -209,7 +209,6 @@ pub mod index;
mod upload;
use anyhow::Context;
use camino::Utf8Path;
use chrono::{NaiveDateTime, Utc};
// re-export these
pub use download::{is_temp_download_file, list_remote_timelines};
@@ -220,6 +219,7 @@ use utils::backoff::{
};
use std::collections::{HashMap, VecDeque};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};
@@ -924,7 +924,7 @@ impl RemoteTimelineClient {
))?
});
let index_file_path = timeline_storage_path.join(Utf8Path::new(IndexPart::FILE_NAME));
let index_file_path = timeline_storage_path.join(Path::new(IndexPart::FILE_NAME));
debug!("enqueuing index part deletion");
self.deletion_queue_client
@@ -1409,7 +1409,7 @@ pub fn remote_timelines_path(tenant_id: &TenantId) -> RemotePath {
}
pub fn remote_timeline_path(tenant_id: &TenantId, timeline_id: &TimelineId) -> RemotePath {
remote_timelines_path(tenant_id).join(Utf8Path::new(&timeline_id.to_string()))
remote_timelines_path(tenant_id).join(&PathBuf::from(timeline_id.to_string()))
}
pub fn remote_layer_path(
@@ -1452,7 +1452,14 @@ pub(crate) fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
}
};
match file_name.split_once('-') {
let file_name_str = match file_name.to_str() {
Some(s) => s,
None => {
tracing::warn!("Malformed index key {:?}", path);
return None;
}
};
match file_name_str.split_once('-') {
Some((_, gen_suffix)) => Generation::parse_suffix(gen_suffix),
None => None,
}
@@ -1464,16 +1471,20 @@ pub(crate) fn parse_remote_index_path(path: RemotePath) -> Option<Generation> {
/// Errors if the path provided does not start from pageserver's workdir.
pub fn remote_path(
conf: &PageServerConf,
local_path: &Utf8Path,
local_path: &Path,
generation: Generation,
) -> anyhow::Result<RemotePath> {
let stripped = local_path
.strip_prefix(&conf.workdir)
.context("Failed to strip workdir prefix")?;
let suffixed = format!("{0}{1}", stripped, generation.get_suffix());
let suffixed = format!(
"{0}{1}",
stripped.to_string_lossy(),
generation.get_suffix()
);
RemotePath::new(Utf8Path::new(&suffixed)).with_context(|| {
RemotePath::new(&PathBuf::from(suffixed)).with_context(|| {
format!(
"to resolve remote part of path {:?} for base {:?}",
local_path, conf.workdir
@@ -1493,7 +1504,7 @@ mod tests {
DEFAULT_PG_VERSION,
};
use std::collections::HashSet;
use std::{collections::HashSet, path::Path};
use utils::lsn::Lsn;
pub(super) fn dummy_contents(name: &str) -> Vec<u8> {
@@ -1527,7 +1538,7 @@ mod tests {
assert_eq!(avec, bvec);
}
fn assert_remote_files(expected: &[&str], remote_path: &Utf8Path, generation: Generation) {
fn assert_remote_files(expected: &[&str], remote_path: &Path, generation: Generation) {
let mut expected: Vec<String> = expected
.iter()
.map(|x| format!("{}{}", x, generation.get_suffix()))
@@ -1646,12 +1657,12 @@ mod tests {
let timeline_path = harness.timeline_path(&TIMELINE_ID);
println!("workdir: {}", harness.conf.workdir);
println!("workdir: {}", harness.conf.workdir.display());
let remote_timeline_dir = harness
.remote_fs_dir
.join(timeline_path.strip_prefix(&harness.conf.workdir).unwrap());
println!("remote_timeline_dir: {remote_timeline_dir}");
println!("remote_timeline_dir: {}", remote_timeline_dir.display());
let generation = harness.generation;
@@ -1898,7 +1909,7 @@ mod tests {
let index_path = test_state.harness.remote_fs_dir.join(
remote_index_path(&test_state.harness.tenant_id, &TIMELINE_ID, generation).get_path(),
);
eprintln!("Writing {index_path}");
eprintln!("Writing {}", index_path.display());
std::fs::write(&index_path, index_part_bytes).unwrap();
example_index_part
}

View File

@@ -5,10 +5,10 @@
use std::collections::HashSet;
use std::future::Future;
use std::path::Path;
use std::time::Duration;
use anyhow::{anyhow, Context};
use camino::Utf8Path;
use tokio::fs;
use tokio::io::AsyncWriteExt;
use tokio_util::sync::CancellationToken;
@@ -74,7 +74,12 @@ pub async fn download_layer_file<'a>(
// TODO: this doesn't use the cached fd for some reason?
let mut destination_file = fs::File::create(&temp_file_path)
.await
.with_context(|| format!("create a destination file for layer '{temp_file_path}'"))
.with_context(|| {
format!(
"create a destination file for layer '{}'",
temp_file_path.display()
)
})
.map_err(DownloadError::Other)?;
let mut download = storage
.download(&remote_path)
@@ -116,7 +121,7 @@ pub async fn download_layer_file<'a>(
destination_file
.flush()
.await
.with_context(|| format!("flush source file at {temp_file_path}"))
.with_context(|| format!("flush source file at {}", temp_file_path.display()))
.map_err(DownloadError::Other)?;
let expected = layer_metadata.file_size();
@@ -130,7 +135,12 @@ pub async fn download_layer_file<'a>(
destination_file
.sync_all()
.await
.with_context(|| format!("failed to fsync source file at {temp_file_path}"))
.with_context(|| {
format!(
"failed to fsync source file at {}",
temp_file_path.display()
)
})
.map_err(DownloadError::Other)?;
drop(destination_file);
@@ -142,23 +152,27 @@ pub async fn download_layer_file<'a>(
fs::rename(&temp_file_path, &local_path)
.await
.with_context(|| format!("rename download layer file to {local_path}"))
.with_context(|| format!("rename download layer file to {}", local_path.display(),))
.map_err(DownloadError::Other)?;
crashsafe::fsync_async(&local_path)
.await
.with_context(|| format!("fsync layer file {local_path}"))
.with_context(|| format!("fsync layer file {}", local_path.display(),))
.map_err(DownloadError::Other)?;
tracing::debug!("download complete: {local_path}");
tracing::debug!("download complete: {}", local_path.display());
Ok(bytes_amount)
}
const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download";
pub fn is_temp_download_file(path: &Utf8Path) -> bool {
let extension = path.extension();
pub fn is_temp_download_file(path: &Path) -> bool {
let extension = path.extension().map(|pname| {
pname
.to_str()
.expect("paths passed to this function must be valid Rust strings")
});
match extension {
Some(TEMP_DOWNLOAD_EXTENSION) => true,
Some(_) => false,

View File

@@ -1,9 +1,8 @@
//! Helper functions to upload files to remote storage with a RemoteStorage
use anyhow::{bail, Context};
use camino::Utf8Path;
use fail::fail_point;
use std::io::ErrorKind;
use std::{io::ErrorKind, path::Path};
use tokio::fs;
use super::Generation;
@@ -51,7 +50,7 @@ pub(super) async fn upload_index_part<'a>(
pub(super) async fn upload_timeline_layer<'a>(
conf: &'static PageServerConf,
storage: &'a GenericRemoteStorage,
source_path: &'a Utf8Path,
source_path: &'a Path,
known_metadata: &'a LayerFileMetadata,
generation: Generation,
) -> anyhow::Result<()> {
@@ -69,7 +68,7 @@ pub(super) async fn upload_timeline_layer<'a>(
// upload. However, a nonexistent file can also be indicative of
// something worse, like when a file is scheduled for upload before
// it has been written to disk yet.
info!(path = %source_path, "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
info!(path = %source_path.display(), "File to upload doesn't exist. Likely the file has been deleted and an upload is not required any more.");
return Ok(());
}
Err(e) => {
@@ -94,7 +93,7 @@ pub(super) async fn upload_timeline_layer<'a>(
storage
.upload(source_file, fs_size, &storage_path, None)
.await
.with_context(|| format!("upload layer from local path '{source_path}'"))?;
.with_context(|| format!("upload layer from local path '{}'", source_path.display()))?;
Ok(())
}

View File

@@ -14,7 +14,6 @@ use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use bytes::Bytes;
use camino::Utf8PathBuf;
use enum_map::EnumMap;
use enumset::EnumSet;
use once_cell::sync::Lazy;
@@ -23,6 +22,7 @@ use pageserver_api::models::{
HistoricLayerInfo, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus,
};
use std::ops::Range;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::warn;
@@ -378,7 +378,7 @@ pub trait PersistentLayer: Layer + AsLayerDesc {
// Path to the layer file in the local filesystem.
// `None` for `RemoteLayer`.
fn local_path(&self) -> Option<Utf8PathBuf>;
fn local_path(&self) -> Option<PathBuf>;
/// Permanently remove this layer from disk.
fn delete_resident_layer_file(&self) -> Result<()>;
@@ -456,7 +456,7 @@ pub mod tests {
/// config. In that case, we use the Path variant to hold the full path to the file on
/// disk.
enum PathOrConf {
Path(Utf8PathBuf),
Path(PathBuf),
Conf(&'static PageServerConf),
}

View File

@@ -41,7 +41,6 @@ use crate::virtual_file::VirtualFile;
use crate::{walrecord, TEMP_FILE_SUFFIX};
use crate::{DELTA_FILE_MAGIC, STORAGE_FORMAT_VERSION};
use anyhow::{bail, ensure, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
use serde::{Deserialize, Serialize};
@@ -49,6 +48,7 @@ use std::fs::{self, File};
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::OnceCell;
use tracing::*;
@@ -267,7 +267,7 @@ impl PersistentLayer for DeltaLayer {
Some(self)
}
fn local_path(&self) -> Option<Utf8PathBuf> {
fn local_path(&self) -> Option<PathBuf> {
self.local_path()
}
@@ -374,7 +374,7 @@ impl DeltaLayer {
.await
}
pub(crate) fn local_path(&self) -> Option<Utf8PathBuf> {
pub(crate) fn local_path(&self) -> Option<PathBuf> {
Some(self.path())
}
@@ -409,7 +409,7 @@ impl DeltaLayer {
tenant_id: &TenantId,
timeline_id: &TimelineId,
fname: &DeltaFileName,
) -> Utf8PathBuf {
) -> PathBuf {
match path_or_conf {
PathOrConf::Path(path) => path.clone(),
PathOrConf::Conf(conf) => conf
@@ -424,7 +424,7 @@ impl DeltaLayer {
timeline_id: &TimelineId,
key_start: Key,
lsn_range: &Range<Lsn>,
) -> Utf8PathBuf {
) -> PathBuf {
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
@@ -455,7 +455,7 @@ impl DeltaLayer {
self.inner
.get_or_try_init(|| self.load_inner(ctx))
.await
.with_context(|| format!("Failed to load delta layer {}", self.path()))
.with_context(|| format!("Failed to load delta layer {}", self.path().display()))
}
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
@@ -471,7 +471,7 @@ impl DeltaLayer {
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
let actual_filename = path.file_name().unwrap().to_owned();
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
let expected_filename = self.filename().file_name();
if actual_filename != expected_filename {
@@ -510,7 +510,7 @@ impl DeltaLayer {
/// Create a DeltaLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
pub fn new_for_path(path: &Utf8Path, file: File) -> Result<Self> {
pub fn new_for_path(path: &Path, file: File) -> Result<Self> {
let mut summary_buf = Vec::new();
summary_buf.resize(PAGE_SZ, 0);
file.read_exact_at(&mut summary_buf, 0)?;
@@ -538,7 +538,7 @@ impl DeltaLayer {
self.desc.delta_file_name()
}
/// Path to the layer file in pageserver workdir.
pub fn path(&self) -> Utf8PathBuf {
pub fn path(&self) -> PathBuf {
Self::path_for(
&self.path_or_conf,
&self.desc.tenant_id,
@@ -573,7 +573,7 @@ impl DeltaLayer {
///
struct DeltaLayerWriterInner {
conf: &'static PageServerConf,
pub path: Utf8PathBuf,
pub path: PathBuf,
timeline_id: TimelineId,
tenant_id: TenantId,
@@ -711,7 +711,7 @@ impl DeltaLayerWriterInner {
ensure!(
metadata.len() <= S3_UPLOAD_LIMIT,
"Created delta layer file at {} of size {} above limit {S3_UPLOAD_LIMIT}!",
file.path,
file.path.display(),
metadata.len()
);
@@ -748,7 +748,7 @@ impl DeltaLayerWriterInner {
);
std::fs::rename(self.path, &final_path)?;
trace!("created delta layer {final_path}");
trace!("created delta layer {}", final_path.display());
Ok(layer)
}
@@ -847,13 +847,13 @@ impl Drop for DeltaLayerWriter {
impl DeltaLayerInner {
pub(super) async fn load(
path: &Utf8Path,
path: &std::path::Path,
summary: Option<Summary>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.await
.with_context(|| format!("Failed to open file '{path}'"))?;
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
@@ -864,11 +864,11 @@ impl DeltaLayerInner {
expected_summary.index_start_blk = actual_summary.index_start_blk;
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
}
}
@@ -933,12 +933,15 @@ impl DeltaLayerInner {
.read_blob_into_buf(pos, &mut buf, ctx)
.await
.with_context(|| {
format!("Failed to read blob from virtual file {}", file.file.path)
format!(
"Failed to read blob from virtual file {}",
file.file.path.display()
)
})?;
let val = Value::des(&buf).with_context(|| {
format!(
"Failed to deserialize file blob from virtual file {}",
file.file.path
file.file.path.display()
)
})?;
match val {

View File

@@ -37,7 +37,6 @@ use crate::virtual_file::VirtualFile;
use crate::{IMAGE_FILE_MAGIC, STORAGE_FORMAT_VERSION, TEMP_FILE_SUFFIX};
use anyhow::{bail, ensure, Context, Result};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use hex;
use pageserver_api::models::{HistoricLayerInfo, LayerAccessKind};
use rand::{distributions::Alphanumeric, Rng};
@@ -46,6 +45,7 @@ use std::fs::{self, File};
use std::io::SeekFrom;
use std::ops::Range;
use std::os::unix::prelude::FileExt;
use std::path::{Path, PathBuf};
use tokio::sync::OnceCell;
use tracing::*;
@@ -195,7 +195,7 @@ impl AsLayerDesc for ImageLayer {
}
impl PersistentLayer for ImageLayer {
fn local_path(&self) -> Option<Utf8PathBuf> {
fn local_path(&self) -> Option<PathBuf> {
self.local_path()
}
@@ -269,10 +269,10 @@ impl ImageLayer {
.get_value_reconstruct_data(key, reconstruct_state, ctx)
.await
// FIXME: makes no sense to dump paths
.with_context(|| format!("read {}", self.path()))
.with_context(|| format!("read {}", self.path().display()))
}
pub(crate) fn local_path(&self) -> Option<Utf8PathBuf> {
pub(crate) fn local_path(&self) -> Option<PathBuf> {
Some(self.path())
}
@@ -304,7 +304,7 @@ impl ImageLayer {
timeline_id: TimelineId,
tenant_id: TenantId,
fname: &ImageFileName,
) -> Utf8PathBuf {
) -> PathBuf {
match path_or_conf {
PathOrConf::Path(path) => path.to_path_buf(),
PathOrConf::Conf(conf) => conf
@@ -318,7 +318,7 @@ impl ImageLayer {
timeline_id: TimelineId,
tenant_id: TenantId,
fname: &ImageFileName,
) -> Utf8PathBuf {
) -> PathBuf {
let rand_string: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(8)
@@ -342,7 +342,7 @@ impl ImageLayer {
self.inner
.get_or_try_init(|| self.load_inner(ctx))
.await
.with_context(|| format!("Failed to load image layer {}", self.path()))
.with_context(|| format!("Failed to load image layer {}", self.path().display()))
}
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
@@ -359,7 +359,7 @@ impl ImageLayer {
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
let actual_filename = path.file_name().unwrap().to_owned();
let actual_filename = path.file_name().unwrap().to_str().unwrap().to_owned();
let expected_filename = self.filename().file_name();
if actual_filename != expected_filename {
@@ -399,7 +399,7 @@ impl ImageLayer {
/// Create an ImageLayer struct representing an existing file on disk.
///
/// This variant is only used for debugging purposes, by the 'pagectl' binary.
pub fn new_for_path(path: &Utf8Path, file: File) -> Result<ImageLayer> {
pub fn new_for_path(path: &Path, file: File) -> Result<ImageLayer> {
let mut summary_buf = Vec::new();
summary_buf.resize(PAGE_SZ, 0);
file.read_exact_at(&mut summary_buf, 0)?;
@@ -427,7 +427,7 @@ impl ImageLayer {
}
/// Path to the layer file in pageserver workdir.
pub fn path(&self) -> Utf8PathBuf {
pub fn path(&self) -> PathBuf {
Self::path_for(
&self.path_or_conf,
self.desc.timeline_id,
@@ -439,14 +439,14 @@ impl ImageLayer {
impl ImageLayerInner {
pub(super) async fn load(
path: &Utf8Path,
path: &std::path::Path,
lsn: Lsn,
summary: Option<Summary>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
@@ -457,11 +457,11 @@ impl ImageLayerInner {
expected_summary.index_root_blk = actual_summary.index_root_blk;
if actual_summary != expected_summary {
bail!(
"in-file summary does not match expected summary. actual = {:?} expected = {:?}",
actual_summary,
expected_summary
);
// bail!(
// "in-file summary does not match expected summary. actual = {:?} expected = {:?}",
// actual_summary,
// expected_summary
// );
}
}
@@ -526,7 +526,7 @@ impl ImageLayerInner {
///
struct ImageLayerWriterInner {
conf: &'static PageServerConf,
path: Utf8PathBuf,
path: PathBuf,
timeline_id: TimelineId,
tenant_id: TenantId,
key_range: Range<Key>,
@@ -558,7 +558,7 @@ impl ImageLayerWriterInner {
lsn,
},
);
info!("new image layer {path}");
info!("new image layer {}", path.display());
let mut file = VirtualFile::open_with_options(
&path,
std::fs::OpenOptions::new().write(true).create_new(true),
@@ -685,7 +685,7 @@ impl ImageLayerWriterInner {
);
std::fs::rename(self.path, final_path)?;
trace!("created image layer {}", layer.path());
trace!("created image layer {}", layer.path().display());
Ok(layer)
}

View File

@@ -8,9 +8,9 @@ use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::timeline::layer_manager::LayerManager;
use anyhow::{bail, Result};
use camino::Utf8PathBuf;
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
use std::path::PathBuf;
use std::sync::Arc;
use utils::{
@@ -92,7 +92,7 @@ impl AsLayerDesc for RemoteLayer {
}
impl PersistentLayer for RemoteLayer {
fn local_path(&self) -> Option<Utf8PathBuf> {
fn local_path(&self) -> Option<PathBuf> {
None
}

View File

@@ -9,7 +9,6 @@ mod walreceiver;
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use fail::fail_point;
use futures::StreamExt;
use itertools::Itertools;
@@ -30,6 +29,7 @@ use utils::id::TenantTimelineId;
use std::cmp::{max, min, Ordering};
use std::collections::{BinaryHeap, HashMap, HashSet};
use std::ops::{Deref, Range};
use std::path::{Path, PathBuf};
use std::pin::pin;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::{Arc, Mutex, RwLock, Weak};
@@ -56,7 +56,7 @@ use crate::config::PageServerConf;
use crate::keyspace::{KeyPartitioning, KeySpace, KeySpaceRandomAccum};
use crate::metrics::{
TimelineMetrics, MATERIALIZED_PAGE_CACHE_HIT, MATERIALIZED_PAGE_CACHE_HIT_DIRECT,
UNEXPECTED_ONDEMAND_DOWNLOADS,
RECONSTRUCT_TIME, UNEXPECTED_ONDEMAND_DOWNLOADS,
};
use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::pgdatadir_mapping::{is_rel_fsm_block_key, is_rel_vm_block_key};
@@ -496,39 +496,13 @@ impl Timeline {
};
let timer = crate::metrics::GET_RECONSTRUCT_DATA_TIME.start_timer();
let path = self
.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
.await?;
timer.stop_and_record();
let start = Instant::now();
let res = self.reconstruct_value(key, lsn, reconstruct_state).await;
let elapsed = start.elapsed();
crate::metrics::RECONSTRUCT_TIME
.for_result(&res)
.observe(elapsed.as_secs_f64());
if cfg!(feature = "testing") && res.is_err() {
// it can only be walredo issue
use std::fmt::Write;
let mut msg = String::new();
path.into_iter().for_each(|(res, cont_lsn, layer)| {
writeln!(
msg,
"- layer traversal: result {res:?}, cont_lsn {cont_lsn}, layer: {}",
layer(),
)
.expect("string grows")
});
// this is to rule out or provide evidence that we could in some cases read a duplicate
// walrecord
tracing::info!("walredo failed, path:\n{msg}");
}
res
RECONSTRUCT_TIME
.observe_closure_duration(|| self.reconstruct_value(key, lsn, reconstruct_state))
.await
}
/// Get last or prev record separately. Same as get_last_record_rlsn().last/prev.
@@ -681,38 +655,38 @@ impl Timeline {
) -> anyhow::Result<()> {
const ROUNDS: usize = 2;
static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
once_cell::sync::Lazy::new(|| {
let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
let permits = usize::max(
1,
// while a lot of the work is done on spawn_blocking, we still do
// repartitioning in the async context. this should give leave us some workers
// unblocked to be blocked on other work, hopefully easing any outside visible
// effects of restarts.
//
// 6/8 is a guess; previously we ran with unlimited 8 and more from
// spawn_blocking.
(total_threads * 3).checked_div(4).unwrap_or(0),
);
assert_ne!(permits, 0, "we will not be adding in permits later");
assert!(
permits < total_threads,
"need threads avail for shorter work"
);
tokio::sync::Semaphore::new(permits)
});
// static CONCURRENT_COMPACTIONS: once_cell::sync::Lazy<tokio::sync::Semaphore> =
// once_cell::sync::Lazy::new(|| {
// let total_threads = *task_mgr::BACKGROUND_RUNTIME_WORKER_THREADS;
// let permits = usize::max(
// 1,
// // while a lot of the work is done on spawn_blocking, we still do
// // repartitioning in the async context. this should give leave us some workers
// // unblocked to be blocked on other work, hopefully easing any outside visible
// // effects of restarts.
// //
// // 6/8 is a guess; previously we ran with unlimited 8 and more from
// // spawn_blocking.
// (total_threads * 3).checked_div(4).unwrap_or(0),
// );
// assert_ne!(permits, 0, "we will not be adding in permits later");
// assert!(
// permits < total_threads,
// "need threads avail for shorter work"
// );
// tokio::sync::Semaphore::new(permits)
// });
// this wait probably never needs any "long time spent" logging, because we already nag if
// compaction task goes over it's period (20s) which is quite often in production.
let _permit = tokio::select! {
permit = CONCURRENT_COMPACTIONS.acquire() => {
permit
},
_ = cancel.cancelled() => {
return Ok(());
}
};
// // this wait probably never needs any "long time spent" logging, because we already nag if
// // compaction task goes over it's period (20s) which is quite often in production.
// let _permit = tokio::select! {
// permit = CONCURRENT_COMPACTIONS.acquire() => {
// permit
// },
// _ = cancel.cancelled() => {
// return Ok(());
// }
// };
let last_record_lsn = self.get_last_record_lsn();
@@ -1736,7 +1710,7 @@ impl Timeline {
Discovered::Temporary(name) => (name, "temporary timeline file"),
Discovered::TemporaryDownload(name) => (name, "temporary download"),
};
path.push(Utf8Path::new(&name));
path.push(name);
init::cleanup(&path, kind)?;
path.pop();
}
@@ -2217,10 +2191,10 @@ impl TraversalLayerExt for Arc<dyn PersistentLayer> {
let timeline_id = self.layer_desc().timeline_id;
match self.local_path() {
Some(local_path) => {
debug_assert!(local_path.to_string().contains(&format!("{}", timeline_id)),
debug_assert!(local_path.to_str().unwrap().contains(&format!("{}", timeline_id)),
"need timeline ID to uniquely identify the layer when traversal crosses ancestor boundary",
);
format!("{local_path}")
format!("{}", local_path.display())
}
None => {
format!("remote {}/{self}", timeline_id)
@@ -2250,7 +2224,7 @@ impl Timeline {
request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<Vec<TraversalPathItem>, PageReconstructError> {
) -> Result<(), PageReconstructError> {
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
@@ -2281,12 +2255,12 @@ impl Timeline {
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
ValueReconstructResult::Complete => return Ok(traversal_path),
ValueReconstructResult::Complete => return Ok(()),
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
return Ok(traversal_path);
return Ok(());
}
if prev_lsn <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
@@ -3722,11 +3696,6 @@ impl Timeline {
});
writer.as_mut().unwrap().put_value(key, lsn, value).await?;
if !new_layers.is_empty() {
fail_point!("after-timeline-compacted-first-L1");
}
prev_key = Some(key);
}
if let Some(writer) = writer {
@@ -3748,7 +3717,7 @@ impl Timeline {
);
}
}
let mut layer_paths: Vec<Utf8PathBuf> = new_layers.iter().map(|l| l.path()).collect();
let mut layer_paths: Vec<PathBuf> = new_layers.iter().map(|l| l.path()).collect();
// Fsync all the layer files and directory using multiple threads to
// minimize latency.
@@ -3858,7 +3827,10 @@ impl Timeline {
let new_delta_path = l.path();
let metadata = new_delta_path.metadata().with_context(|| {
format!("read file metadata for new created layer {new_delta_path}")
format!(
"read file metadata for new created layer {}",
new_delta_path.display()
)
})?;
if let Some(remote_client) = &self.remote_client {
@@ -3881,7 +3853,6 @@ impl Timeline {
);
let l = l as Arc<dyn PersistentLayer>;
if guard.contains(&l) {
tracing::error!(layer=%l, "duplicated L1 layer");
duplicated_layers.insert(l.layer_desc().key());
} else {
if LayerMap::is_l0(l.layer_desc()) {
@@ -4793,10 +4764,11 @@ fn is_send() {
/// Add a suffix to a layer file's name: .{num}.old
/// Uses the first available num (starts at 0)
fn rename_to_backup(path: &Utf8Path) -> anyhow::Result<()> {
fn rename_to_backup(path: &Path) -> anyhow::Result<()> {
let filename = path
.file_name()
.ok_or_else(|| anyhow!("Path {path} don't have a file name"))?;
.ok_or_else(|| anyhow!("Path {} don't have a file name", path.display()))?
.to_string_lossy();
let mut new_path = path.to_owned();
for i in 0u32.. {

View File

@@ -12,8 +12,7 @@ use crate::{
METADATA_FILE_NAME,
};
use anyhow::Context;
use camino::Utf8Path;
use std::{collections::HashMap, str::FromStr};
use std::{collections::HashMap, ffi::OsString, path::Path, str::FromStr};
use utils::lsn::Lsn;
/// Identified files in the timeline directory.
@@ -21,43 +20,46 @@ pub(super) enum Discovered {
/// The only one we care about
Layer(LayerFileName, u64),
/// Old ephmeral files from previous launches, should be removed
Ephemeral(String),
Ephemeral(OsString),
/// Old temporary timeline files, unsure what these really are, should be removed
Temporary(String),
Temporary(OsString),
/// Temporary on-demand download files, should be removed
TemporaryDownload(String),
TemporaryDownload(OsString),
/// "metadata" file we persist locally and include in `index_part.json`
Metadata,
/// Backup file from previously future layers
IgnoredBackup,
/// Unrecognized, warn about these
Unknown(String),
Unknown(OsString),
}
/// Scans the timeline directory for interesting files.
pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result<Vec<Discovered>> {
pub(super) fn scan_timeline_dir(path: &Path) -> anyhow::Result<Vec<Discovered>> {
let mut ret = Vec::new();
for direntry in path.read_dir_utf8()? {
for direntry in std::fs::read_dir(path)? {
let direntry = direntry?;
let file_name = direntry.file_name().to_string();
let direntry_path = direntry.path();
let file_name = direntry.file_name();
let discovered = match LayerFileName::from_str(&file_name) {
let fname = file_name.to_string_lossy();
let discovered = match LayerFileName::from_str(&fname) {
Ok(file_name) => {
let file_size = direntry.metadata()?.len();
Discovered::Layer(file_name, file_size)
}
Err(_) => {
if file_name == METADATA_FILE_NAME {
if fname == METADATA_FILE_NAME {
Discovered::Metadata
} else if file_name.ends_with(".old") {
} else if fname.ends_with(".old") {
// ignore these
Discovered::IgnoredBackup
} else if remote_timeline_client::is_temp_download_file(direntry.path()) {
} else if remote_timeline_client::is_temp_download_file(&direntry_path) {
Discovered::TemporaryDownload(file_name)
} else if is_ephemeral_file(&file_name) {
} else if is_ephemeral_file(&fname) {
Discovered::Ephemeral(file_name)
} else if is_temporary(direntry.path()) {
} else if is_temporary(&direntry_path) {
Discovered::Temporary(file_name)
} else {
Discovered::Unknown(file_name)
@@ -160,14 +162,15 @@ pub(super) fn reconcile(
.collect::<Vec<_>>()
}
pub(super) fn cleanup(path: &Utf8Path, kind: &str) -> anyhow::Result<()> {
pub(super) fn cleanup(path: &Path, kind: &str) -> anyhow::Result<()> {
let file_name = path.file_name().expect("must be file path");
tracing::debug!(kind, ?file_name, "cleaning up");
std::fs::remove_file(path).with_context(|| format!("failed to remove {kind} at {path}"))
std::fs::remove_file(path)
.with_context(|| format!("failed to remove {kind} at {}", path.display()))
}
pub(super) fn cleanup_local_file_for_remote(
path: &Utf8Path,
path: &Path,
local: &LayerFileMetadata,
remote: &LayerFileMetadata,
) -> anyhow::Result<()> {
@@ -179,7 +182,8 @@ pub(super) fn cleanup_local_file_for_remote(
if let Err(err) = crate::tenant::timeline::rename_to_backup(path) {
assert!(
path.exists(),
"we would leave the local_layer without a file if this does not hold: {path}",
"we would leave the local_layer without a file if this does not hold: {}",
path.display()
);
Err(err)
} else {
@@ -188,7 +192,7 @@ pub(super) fn cleanup_local_file_for_remote(
}
pub(super) fn cleanup_future_layer(
path: &Utf8Path,
path: &Path,
name: &LayerFileName,
disk_consistent_lsn: Lsn,
) -> anyhow::Result<()> {

View File

@@ -1,7 +1,6 @@
use std::{collections::hash_map::Entry, fs, sync::Arc};
use std::{collections::hash_map::Entry, fs, path::PathBuf, sync::Arc};
use anyhow::Context;
use camino::Utf8PathBuf;
use tracing::{error, info, info_span, warn};
use utils::{crashsafe, fs_ext, id::TimelineId, lsn::Lsn};
@@ -156,12 +155,12 @@ pub(crate) fn cleanup_timeline_directory(uninit_mark: TimelineUninitMark) {
#[must_use]
pub(crate) struct TimelineUninitMark {
uninit_mark_deleted: bool,
uninit_mark_path: Utf8PathBuf,
pub(crate) timeline_path: Utf8PathBuf,
uninit_mark_path: PathBuf,
pub(crate) timeline_path: PathBuf,
}
impl TimelineUninitMark {
pub(crate) fn new(uninit_mark_path: Utf8PathBuf, timeline_path: Utf8PathBuf) -> Self {
pub(crate) fn new(uninit_mark_path: PathBuf, timeline_path: PathBuf) -> Self {
Self {
uninit_mark_deleted: false,
uninit_mark_path,
@@ -198,13 +197,14 @@ impl Drop for TimelineUninitMark {
if self.timeline_path.exists() {
error!(
"Uninit mark {} is not removed, timeline {} stays uninitialized",
self.uninit_mark_path, self.timeline_path
self.uninit_mark_path.display(),
self.timeline_path.display()
)
} else {
// unblock later timeline creation attempts
warn!(
"Removing intermediate uninit mark file {}",
self.uninit_mark_path
self.uninit_mark_path.display()
);
if let Err(e) = self.delete_mark_file_if_present() {
error!("Failed to remove the uninit mark file: {e}")

View File

@@ -253,7 +253,7 @@ impl std::fmt::Display for UploadOp {
write!(f, "UploadMetadata(lsn: {})", lsn)
}
UploadOp::Delete(delete) => {
write!(f, "Delete({} layers)", delete.layers.len())
write!(f, "Delete({} layers)", delete.layers.len(),)
}
UploadOp::Barrier(_) => write!(f, "Barrier"),
}

View File

@@ -1,8 +1,8 @@
use bytes::Bytes;
use camino::Utf8PathBuf;
use std::{
fs::{create_dir_all, File},
io::{BufWriter, Write},
path::PathBuf,
};
pub struct Tracer {
@@ -16,7 +16,7 @@ impl Drop for Tracer {
}
impl Tracer {
pub fn new(path: Utf8PathBuf) -> Self {
pub fn new(path: PathBuf) -> Self {
let parent = path.parent().expect("failed to parse parent path");
create_dir_all(parent).expect("failed to create trace dir");

View File

@@ -12,13 +12,14 @@
//!
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
use crate::tenant::TENANTS_SEGMENT_NAME;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::io::{ErrorKind, Seek, SeekFrom};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::unix::fs::FileExt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
///
/// A virtual file descriptor. You can use this just like std::fs::File, but internally
@@ -51,7 +52,7 @@ pub struct VirtualFile {
/// if a new file is created, we only pass the create flag when it's initially
/// opened, in the VirtualFile::create() function, and strip the flag before
/// storing it here.
pub path: Utf8PathBuf,
pub path: PathBuf,
open_options: OpenOptions,
// These are strings becase we only use them for metrics, and those expect strings.
@@ -110,7 +111,7 @@ impl OpenFiles {
///
/// On return, we hold a lock on the slot, and its 'tag' has been updated
/// recently_used has been set. It's all ready for reuse.
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
//
// Run the clock algorithm to find a slot to replace.
//
@@ -142,7 +143,7 @@ impl OpenFiles {
}
retries += 1;
} else {
slot_guard = slot.inner.write().unwrap();
slot_guard = slot.inner.write().await;
index = next;
break;
}
@@ -153,7 +154,7 @@ impl OpenFiles {
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
// the normal path of dropping VirtualFile uses `Close`, use `CloseByReplace` here to
// distinguish the two.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::CloseByReplace)
@@ -173,148 +174,78 @@ impl OpenFiles {
}
}
/// Call this when the local filesystem gives us an error with an external
/// cause: this includes EIO, EROFS, and EACCESS: all these indicate either
/// bad storage or bad configuration, and we can't fix that from inside
/// a running process.
pub(crate) fn on_fatal_io_error(e: &std::io::Error) -> ! {
tracing::error!("Fatal I/O error: {}", &e);
std::process::abort();
#[derive(Debug, thiserror::Error)]
pub enum CrashsafeOverwriteError {
#[error("final path has no parent dir")]
FinalPathHasNoParentDir,
#[error("remove tempfile: {0}")]
RemovePreviousTempfile(#[source] std::io::Error),
#[error("create tempfile: {0}")]
CreateTempfile(#[source] std::io::Error),
#[error("write tempfile: {0}")]
WriteContents(#[source] std::io::Error),
#[error("sync tempfile: {0}")]
SyncTempfile(#[source] std::io::Error),
#[error("rename tempfile to final path: {0}")]
RenameTempfileToFinalPath(#[source] std::io::Error),
#[error("open final path parent dir: {0}")]
OpenFinalPathParentDir(#[source] std::io::Error),
#[error("sync final path parent dir: {0}")]
SyncFinalPathParentDir(#[source] std::io::Error),
}
/// Identify error types that should alwways terminate the process. Other
/// error types may be elegible for retry.
pub(crate) fn is_fatal_io_error(e: &std::io::Error) -> bool {
use nix::errno::Errno::*;
match e.raw_os_error().map(nix::errno::from_i32) {
Some(EIO) => {
// Terminate on EIO because we no longer trust the device to store
// data safely, or to uphold persistence guarantees on fsync.
true
}
Some(EROFS) => {
// Terminate on EROFS because a filesystem is usually remounted
// readonly when it has experienced some critical issue, so the same
// logic as EIO applies.
true
}
Some(EACCES) => {
// Terminate on EACCESS because we should always have permissions
// for our own data dir: if we don't, then we can't do our job and
// need administrative intervention to fix permissions. Terminating
// is the best way to make sure we stop cleanly rather than going
// into infinite retry loops, and will make it clear to the outside
// world that we need help.
true
}
_ => {
// Treat all other local file I/O errors are retryable. This includes:
// - ENOSPC: we stay up and wait for eviction to free some space
// - EINVAL, EBADF, EBADFD: this is a code bug, not a filesystem/hardware issue
// - WriteZero, Interrupted: these are used internally VirtualFile
false
impl CrashsafeOverwriteError {
/// Returns true iff the new contents are durably stored.
pub fn are_new_contents_durable(&self) -> bool {
match self {
Self::FinalPathHasNoParentDir => false,
Self::RemovePreviousTempfile(_) => false,
Self::CreateTempfile(_) => false,
Self::WriteContents(_) => false,
Self::SyncTempfile(_) => false,
Self::RenameTempfileToFinalPath(_) => false,
Self::OpenFinalPathParentDir(_) => false,
Self::SyncFinalPathParentDir(_) => true,
}
}
}
/// Wrap std::io::Error with a behavior where we will terminate the process
/// on most I/O errors from local storage. The rational for terminating is:
/// - EIO means we can't trust the drive any more
/// - EROFS means the local filesystem or drive is damaged, we shouldn't use it any more
/// - EACCESS means something is fatally misconfigured about the pageserver, such
/// as running the process as the wrong user, or the filesystem having the wrong
/// ownership or permission bits. We terminate so that it's obvious to
/// the operator why the pageserver isn't working, and they can restart it when
/// they've fixed the problem.
#[derive(thiserror::Error, Debug)]
pub struct Error {
inner: std::io::Error,
context: Option<String>,
/// Observe duration for the given storage I/O operation
///
/// Unlike `observe_closure_duration`, this supports async,
/// where "support" means that we measure wall clock time.
macro_rules! observe_duration {
($op:expr, $($body:tt)*) => {{
let instant = Instant::now();
let result = $($body)*;
let elapsed = instant.elapsed().as_secs_f64();
STORAGE_IO_TIME_METRIC
.get($op)
.observe(elapsed);
result
}}
}
impl Error {
/// Wrap a io::Error with some context & terminate
/// the process if the io::Error matches our policy for termination
fn new_with_context(e: std::io::Error, context: &str) -> Self {
Self::build(e, Some(context.to_string()))
}
fn context(e: Self, context: &str) -> Self {
Self {
inner: e.inner,
context: Some(context.to_string()),
}
}
fn new(e: std::io::Error) -> Self {
Self::build(e, None)
}
fn invalid(reason: &str) -> Self {
Self::new(std::io::Error::new(ErrorKind::InvalidInput, reason))
}
fn build(e: std::io::Error, context: Option<String>) -> Self {
// Construct instance early so that we have it for
// using Display in termination message.
let instance = Self { inner: e, context };
// Maybe terminate: this violates the usual expectation that callers
// should make their own decisions about how to handle an Error, but
// it's worthwhile to avoid every single user of the local filesystem
// having to apply the same "terminate on errors" behavior.
if is_fatal_io_error(&instance.inner) {
on_fatal_io_error(&instance.inner);
}
instance
}
fn kind(&self) -> ErrorKind {
self.inner.kind()
}
}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Self::build(e, None)
}
}
impl From<Error> for std::io::Error {
fn from(e: Error) -> std::io::Error {
e.inner
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.context {
Some(context) => {
write!(f, "{}: {}", context, self.inner)
}
None => self.inner.fmt(f),
}
}
macro_rules! with_file {
($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
let $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
}
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, Error> {
Self::open_with_options(path, OpenOptions::new().read(true))
.await
.map_err(Error::from)
pub async fn open(path: &Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true)).await
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, Error> {
pub async fn create(path: &Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await
.map_err(Error::from)
}
/// Open a file with given options.
@@ -323,10 +254,10 @@ impl VirtualFile {
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
pub async fn open_with_options(
path: &Utf8Path,
path: &Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, Error> {
let path_str = path.to_string();
) -> Result<VirtualFile, std::io::Error> {
let path_str = path.to_string_lossy();
let parts = path_str.split('/').collect::<Vec<&str>>();
let tenant_id;
let timeline_id;
@@ -337,11 +268,9 @@ impl VirtualFile {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| open_options.open(path))?;
let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?;
// Strip all options other than read and write.
//
@@ -374,19 +303,17 @@ impl VirtualFile {
/// atomic, a crash during the write operation will never leave behind a
/// partially written file.
pub async fn crashsafe_overwrite(
final_path: &Utf8Path,
tmp_path: &Utf8Path,
final_path: &Path,
tmp_path: &Path,
content: &[u8],
) -> Result<(), Error> {
let final_path_parent = final_path.parent().ok_or(std::io::Error::new(
ErrorKind::InvalidInput,
"Path must be absolute",
))?;
) -> Result<(), CrashsafeOverwriteError> {
let Some(final_path_parent) = final_path.parent() else {
return Err(CrashsafeOverwriteError::FinalPathHasNoParentDir);
};
match std::fs::remove_file(tmp_path) {
Ok(()) => {}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
Err(e) => return Err(Error::new_with_context(e, "removing tempfile")),
Err(e) => return Err(CrashsafeOverwriteError::RemovePreviousTempfile(e)),
}
let mut file = Self::open_with_options(
tmp_path,
@@ -397,17 +324,17 @@ impl VirtualFile {
.create_new(true),
)
.await
.map_err(|e| Error::context(e, "create tempfile"))?;
.map_err(CrashsafeOverwriteError::CreateTempfile)?;
file.write_all(content)
.await
.map_err(|e| Error::context(e, "write contents"))?;
.map_err(CrashsafeOverwriteError::WriteContents)?;
file.sync_all()
.await
.map_err(|e| Error::context(e, "sync tempfile"))?;
.map_err(CrashsafeOverwriteError::SyncTempfile)?;
drop(file); // before the rename, that's important!
// renames are atomic
std::fs::rename(tmp_path, final_path)
.map_err(|e| Error::new_with_context(e, "rename tempfile to final path"))?;
.map_err(CrashsafeOverwriteError::RenameTempfileToFinalPath)?;
// Only open final path parent dirfd now, so that this operation only
// ever holds one VirtualFile fd at a time. That's important because
// the current `find_victim_slot` impl might pick the same slot for both
@@ -416,34 +343,34 @@ impl VirtualFile {
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true))
.await
.map_err(|e| Error::context(e, "open final path parent"))?;
.map_err(CrashsafeOverwriteError::OpenFinalPathParentDir)?;
final_parent_dirfd
.sync_all()
.await
.map_err(|e| Error::context(e, "sync final path parent"))?;
.map_err(CrashsafeOverwriteError::SyncFinalPathParentDir)?;
Ok(())
}
/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
.await?
.map_err(Error::new)
with_file!(self, StorageIoOperation::Fsync, |file| file
.as_ref()
.sync_all())
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
.await?
.map_err(Error::new)
with_file!(self, StorageIoOperation::Metadata, |file| file
.as_ref()
.metadata())
}
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
async fn with_file<F, R>(&self, op: StorageIoOperation, mut func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
{
/// Helper function internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.
///
/// We are doing it via a macro as Rust doesn't support async closures that
/// take on parameters with lifetimes.
async fn lock_file(&self) -> Result<FileGuard<'_>, Error> {
let open_files = get_open_files();
let mut handle_guard = {
@@ -453,27 +380,23 @@ impl VirtualFile {
// We only need to hold the handle lock while we read the current handle. If
// another thread closes the file and recycles the slot for a different file,
// we will notice that the handle we read is no longer valid and retry.
let mut handle = *self.handle.read().unwrap();
let mut handle = *self.handle.read().await;
loop {
// Check if the slot contains our File
{
let slot = &open_files.slots[handle.index];
let slot_guard = slot.inner.read().unwrap();
if slot_guard.tag == handle.tag {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(file)));
}
let slot_guard = slot.inner.read().await;
if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(FileGuard { slot_guard });
}
}
// The slot didn't contain our File. We will have to open it ourselves,
// but before that, grab a write lock on handle in the VirtualFile, so
// that no other thread will try to concurrently open the same file.
let handle_guard = self.handle.write().unwrap();
let handle_guard = self.handle.write().await;
// If another thread changed the handle while we were not holding the lock,
// then the handle might now be valid again. Loop back to retry.
@@ -487,17 +410,10 @@ impl VirtualFile {
// We need to open the file ourselves. The handle in the VirtualFile is
// now locked in write-mode. Find a free slot to put it in.
let (handle, mut slot_guard) = open_files.find_victim_slot();
let (handle, mut slot_guard) = open_files.find_victim_slot().await;
// Open the physical file
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
let result = STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(&file));
let file = observe_duration!(StorageIoOperation::Open, self.open_options.open(&self.path))?;
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
@@ -505,7 +421,9 @@ impl VirtualFile {
*handle_guard = handle;
Ok(result)
return Ok(FileGuard {
slot_guard: slot_guard.downgrade(),
});
}
pub fn remove(self) {
@@ -520,19 +438,20 @@ impl VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = self
.with_file(StorageIoOperation::Seek, |mut file| {
file.seek(SeekFrom::End(offset))
})
.await??
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
.as_ref()
.seek(SeekFrom::End(offset)))?
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
if pos < 0 {
return Err(Error::invalid("offset would be negative"));
return Err(Error::new(
ErrorKind::InvalidInput,
"offset would be negative",
));
}
if pos > u64::MAX as i128 {
return Err(Error::invalid("offset overflow"));
return Err(Error::new(ErrorKind::InvalidInput, "offset overflow"));
}
self.pos = pos as u64;
}
@@ -545,11 +464,10 @@ impl VirtualFile {
while !buf.is_empty() {
match self.read_at(buf, offset).await {
Ok(0) => {
return Err(std::io::Error::new(
return Err(Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
)
.into())
))
}
Ok(n) => {
buf = &mut buf[n..];
@@ -567,11 +485,10 @@ impl VirtualFile {
while !buf.is_empty() {
match self.write_at(buf, offset).await {
Ok(0) => {
return Err(std::io::Error::new(
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)
.into());
));
}
Ok(n) => {
buf = &buf[n..];
@@ -588,11 +505,10 @@ impl VirtualFile {
while !buf.is_empty() {
match self.write(buf).await {
Ok(0) => {
return Err(std::io::Error::new(
return Err(Error::new(
std::io::ErrorKind::WriteZero,
"failed to write whole buffer",
)
.into());
));
}
Ok(n) => {
buf = &buf[n..];
@@ -604,7 +520,7 @@ impl VirtualFile {
Ok(())
}
async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
async fn write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
let pos = self.pos;
let n = self.write_at(buf, pos).await?;
self.pos += n as u64;
@@ -612,27 +528,39 @@ impl VirtualFile {
}
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset))
.await?;
let result = with_file!(self, StorageIoOperation::Read, |file| file
.as_ref()
.read_at(buf, offset));
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
result.map_err(Error::new)
result
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset))
.await?;
let result = with_file!(self, StorageIoOperation::Write, |file| file
.as_ref()
.write_at(buf, offset));
if let Ok(size) = result {
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
result.map_err(Error::new)
result
}
}
struct FileGuard<'a> {
slot_guard: RwLockReadGuard<'a, SlotInner>,
}
impl<'a> AsRef<File> for FileGuard<'a> {
fn as_ref(&self) -> &File {
// This unwrap is safe because we only create `FileGuard`s
// if we know that the file is Some.
self.slot_guard.file.as_ref().unwrap()
}
}
@@ -641,7 +569,7 @@ impl VirtualFile {
pub(crate) async fn read_blk(
&self,
blknum: u32,
) -> Result<crate::tenant::block_io::BlockLease<'_>, Error> {
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
use crate::page_cache::PAGE_SZ;
let mut buf = [0; PAGE_SZ];
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
@@ -668,20 +596,39 @@ impl VirtualFile {
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
let handle = self.handle.get_mut();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(slot_guard.file.take()));
fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
if slot_guard.tag == tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also the `CloseByReplace` operation for closes done on eviction for
// comparison.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(slot_guard.file.take()));
}
}
// We don't have async drop so we cannot directly await the lock here.
// Instead, first do a best-effort attempt at closing the underlying
// file descriptor by using `try_write`, and if that fails, spawn
// a tokio task to do it asynchronously: we just want it to be
// cleaned up eventually.
// Most of the time, the `try_lock` should succeed though,
// as we have `&mut self` access. In other words, if the slot
// is still occupied by our file, there should be no access from
// other I/O operations; the only other possible place to lock
// the slot is the lock algorithm looking for free slots.
let slot = &get_open_files().slots[handle.index];
if let Ok(slot_guard) = slot.inner.try_write() {
clean_slot(slot, slot_guard, handle.tag);
} else {
let tag = handle.tag;
tokio::spawn(async move {
let slot_guard = slot.inner.write().await;
clean_slot(slot, slot_guard, tag);
});
};
}
}
@@ -757,25 +704,25 @@ mod tests {
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset).map_err(Error::new),
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
}
}
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset).map_err(Error::new),
MaybeVirtualFile::File(file) => file.write_all_at(buf, offset),
}
}
async fn seek(&mut self, pos: SeekFrom) -> Result<u64, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.seek(pos).await,
MaybeVirtualFile::File(file) => file.seek(pos).map_err(Error::new),
MaybeVirtualFile::File(file) => file.seek(pos),
}
}
async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await,
MaybeVirtualFile::File(file) => file.write_all(buf).map_err(Error::new),
MaybeVirtualFile::File(file) => file.write_all(buf),
}
}
@@ -831,7 +778,7 @@ mod tests {
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> Result<(), Error>
where
OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
OF: Fn(PathBuf, OpenOptions) -> FT,
FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
{
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
@@ -984,7 +931,7 @@ mod tests {
hdls.push(hdl);
}
for hdl in hdls {
hdl.await.expect("joining")
hdl.await?;
}
std::mem::forget(rt);

View File

@@ -38,9 +38,6 @@ use tracing::*;
use utils::crashsafe::path_with_suffix_extension;
use utils::{bin_ser::BeSer, id::TenantId, lsn::Lsn, nonblock::set_nonblock};
#[cfg(feature = "testing")]
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::metrics::{
WAL_REDO_BYTES_HISTOGRAM, WAL_REDO_RECORDS_HISTOGRAM, WAL_REDO_RECORD_COUNTER, WAL_REDO_TIME,
WAL_REDO_WAIT_TIME,
@@ -116,9 +113,6 @@ struct ProcessOutput {
pub struct PostgresRedoManager {
tenant_id: TenantId,
conf: &'static PageServerConf,
/// Counter to separate same sized walredo inputs failing at the same millisecond.
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize,
stdout: Mutex<Option<ProcessOutput>>,
stdin: Mutex<Option<ProcessInput>>,
@@ -230,8 +224,6 @@ impl PostgresRedoManager {
PostgresRedoManager {
tenant_id,
conf,
#[cfg(feature = "testing")]
dump_sequence: AtomicUsize::default(),
stdin: Mutex::new(None),
stdout: Mutex::new(None),
stderr: Mutex::new(None),
@@ -298,27 +290,25 @@ impl PostgresRedoManager {
WAL_REDO_BYTES_HISTOGRAM.observe(nbytes as f64);
debug!(
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
len,
nbytes,
duration.as_micros(),
lsn
);
"postgres applied {} WAL records ({} bytes) in {} us to reconstruct page image at LSN {}",
len,
nbytes,
duration.as_micros(),
lsn
);
// If something went wrong, don't try to reuse the process. Kill it, and
// next request will launch a new one.
if let Err(e) = result.as_ref() {
if result.is_err() {
error!(
n_attempts,
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}: {}",
records.len(),
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
nbytes,
base_img_lsn,
lsn,
utils::error::report_compact_sources(e),
);
"error applying {} WAL records {}..{} ({} bytes) to base image with LSN {} to reconstruct page image at LSN {}",
records.len(),
records.first().map(|p| p.0).unwrap_or(Lsn(0)),
records.last().map(|p| p.0).unwrap_or(Lsn(0)),
nbytes,
base_img_lsn,
lsn
);
// self.stdin only holds stdin & stderr as_raw_fd().
// Dropping it as part of take() doesn't close them.
// The owning objects (ChildStdout and ChildStderr) are stored in
@@ -335,8 +325,6 @@ impl PostgresRedoManager {
if let Some(proc) = self.stdin.lock().unwrap().take() {
proc.child.kill_and_wait();
}
} else if n_attempts != 0 {
info!(n_attempts, "retried walredo succeeded");
}
n_attempts += 1;
if n_attempts > MAX_RETRY_ATTEMPTS || result.is_ok() {
@@ -754,7 +742,7 @@ impl PostgresRedoManager {
#[instrument(skip_all, fields(tenant_id=%self.tenant_id, pid=%input.as_ref().unwrap().child.id()))]
fn apply_wal_records(
&self,
input: MutexGuard<Option<ProcessInput>>,
mut input: MutexGuard<Option<ProcessInput>>,
tag: BufferTag,
base_img: &Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
@@ -791,23 +779,6 @@ impl PostgresRedoManager {
build_get_page_msg(tag, &mut writebuf);
WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64);
let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout);
if res.is_err() {
// not all of these can be caused by this particular input, however these are so rare
// in tests so capture all.
self.record_and_log(&writebuf);
}
res
}
fn apply_wal_records0(
&self,
writebuf: &[u8],
mut input: MutexGuard<Option<ProcessInput>>,
wal_redo_timeout: Duration,
) -> Result<Bytes, std::io::Error> {
let proc = input.as_mut().unwrap();
let mut nwrite = 0usize;
let stdout_fd = proc.stdout_fd;
@@ -1013,38 +984,6 @@ impl PostgresRedoManager {
}
Ok(res)
}
#[cfg(feature = "testing")]
fn record_and_log(&self, writebuf: &[u8]) {
let millis = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis();
let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed);
// these files will be collected to an allure report
let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len());
let path = self.conf.tenant_path(&self.tenant_id).join(&filename);
let res = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.read(true)
.open(path)
.and_then(|mut f| f.write_all(writebuf));
// trip up allowed_errors
if let Err(e) = res {
tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}");
} else {
tracing::error!(filename, "erroring walredo input saved");
}
}
#[cfg(not(feature = "testing"))]
fn record_and_log(&self, _: &[u8]) {}
}
/// Wrapper type around `std::process::Child` which guarantees that the child
@@ -1278,13 +1217,13 @@ mod tests {
struct RedoHarness {
// underscored because unused, except for removal at drop
_repo_dir: camino_tempfile::Utf8TempDir,
_repo_dir: tempfile::TempDir,
manager: PostgresRedoManager,
}
impl RedoHarness {
fn new() -> anyhow::Result<Self> {
let repo_dir = camino_tempfile::tempdir()?;
let repo_dir = tempfile::tempdir()?;
let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf());
let conf = Box::leak(Box::new(conf));
let tenant_id = TenantId::generate();

72
poetry.lock generated
View File

@@ -2415,18 +2415,18 @@ files = [
[[package]]
name = "urllib3"
version = "1.26.17"
version = "1.26.11"
description = "HTTP library with thread-safe connection pooling, file post, and more."
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*"
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*, <4"
files = [
{file = "urllib3-1.26.17-py2.py3-none-any.whl", hash = "sha256:94a757d178c9be92ef5539b8840d48dc9cf1b2709c9d6b588232a055c524458b"},
{file = "urllib3-1.26.17.tar.gz", hash = "sha256:24d6a242c28d29af46c3fae832c36db3bbebcc533dd1bb549172cd739c82df21"},
{file = "urllib3-1.26.11-py2.py3-none-any.whl", hash = "sha256:c33ccba33c819596124764c23a97d25f32b28433ba0dedeb77d873a38722c9bc"},
{file = "urllib3-1.26.11.tar.gz", hash = "sha256:ea6e8fb210b19d950fab93b60c9009226c63a28808bc8386e05301e25883ac0a"},
]
[package.extras]
brotli = ["brotli (==1.0.9)", "brotli (>=1.0.9)", "brotlicffi (>=0.8.0)", "brotlipy (>=0.6.0)"]
secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"]
brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)", "brotlipy (>=0.6.0)"]
secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress", "pyOpenSSL (>=0.14)"]
socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"]
[[package]]
@@ -2648,65 +2648,7 @@ files = [
docs = ["jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx"]
testing = ["func-timeout", "jaraco.itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)"]
[[package]]
name = "zstandard"
version = "0.21.0"
description = "Zstandard bindings for Python"
optional = false
python-versions = ">=3.7"
files = [
{file = "zstandard-0.21.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:649a67643257e3b2cff1c0a73130609679a5673bf389564bc6d4b164d822a7ce"},
{file = "zstandard-0.21.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:144a4fe4be2e747bf9c646deab212666e39048faa4372abb6a250dab0f347a29"},
{file = "zstandard-0.21.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b72060402524ab91e075881f6b6b3f37ab715663313030d0ce983da44960a86f"},
{file = "zstandard-0.21.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8257752b97134477fb4e413529edaa04fc0457361d304c1319573de00ba796b1"},
{file = "zstandard-0.21.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:c053b7c4cbf71cc26808ed67ae955836232f7638444d709bfc302d3e499364fa"},
{file = "zstandard-0.21.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:2769730c13638e08b7a983b32cb67775650024632cd0476bf1ba0e6360f5ac7d"},
{file = "zstandard-0.21.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:7d3bc4de588b987f3934ca79140e226785d7b5e47e31756761e48644a45a6766"},
{file = "zstandard-0.21.0-cp310-cp310-win32.whl", hash = "sha256:67829fdb82e7393ca68e543894cd0581a79243cc4ec74a836c305c70a5943f07"},
{file = "zstandard-0.21.0-cp310-cp310-win_amd64.whl", hash = "sha256:e6048a287f8d2d6e8bc67f6b42a766c61923641dd4022b7fd3f7439e17ba5a4d"},
{file = "zstandard-0.21.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:7f2afab2c727b6a3d466faee6974a7dad0d9991241c498e7317e5ccf53dbc766"},
{file = "zstandard-0.21.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:ff0852da2abe86326b20abae912d0367878dd0854b8931897d44cfeb18985472"},
{file = "zstandard-0.21.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d12fa383e315b62630bd407477d750ec96a0f438447d0e6e496ab67b8b451d39"},
{file = "zstandard-0.21.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1b9703fe2e6b6811886c44052647df7c37478af1b4a1a9078585806f42e5b15"},
{file = "zstandard-0.21.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:df28aa5c241f59a7ab524f8ad8bb75d9a23f7ed9d501b0fed6d40ec3064784e8"},
{file = "zstandard-0.21.0-cp311-cp311-win32.whl", hash = "sha256:0aad6090ac164a9d237d096c8af241b8dcd015524ac6dbec1330092dba151657"},
{file = "zstandard-0.21.0-cp311-cp311-win_amd64.whl", hash = "sha256:48b6233b5c4cacb7afb0ee6b4f91820afbb6c0e3ae0fa10abbc20000acdf4f11"},
{file = "zstandard-0.21.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:e7d560ce14fd209db6adacce8908244503a009c6c39eee0c10f138996cd66d3e"},
{file = "zstandard-0.21.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1e6e131a4df2eb6f64961cea6f979cdff22d6e0d5516feb0d09492c8fd36f3bc"},
{file = "zstandard-0.21.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e1e0c62a67ff425927898cf43da2cf6b852289ebcc2054514ea9bf121bec10a5"},
{file = "zstandard-0.21.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:1545fb9cb93e043351d0cb2ee73fa0ab32e61298968667bb924aac166278c3fc"},
{file = "zstandard-0.21.0-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fe6c821eb6870f81d73bf10e5deed80edcac1e63fbc40610e61f340723fd5f7c"},
{file = "zstandard-0.21.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:ddb086ea3b915e50f6604be93f4f64f168d3fc3cef3585bb9a375d5834392d4f"},
{file = "zstandard-0.21.0-cp37-cp37m-win32.whl", hash = "sha256:57ac078ad7333c9db7a74804684099c4c77f98971c151cee18d17a12649bc25c"},
{file = "zstandard-0.21.0-cp37-cp37m-win_amd64.whl", hash = "sha256:1243b01fb7926a5a0417120c57d4c28b25a0200284af0525fddba812d575f605"},
{file = "zstandard-0.21.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:ea68b1ba4f9678ac3d3e370d96442a6332d431e5050223626bdce748692226ea"},
{file = "zstandard-0.21.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:8070c1cdb4587a8aa038638acda3bd97c43c59e1e31705f2766d5576b329e97c"},
{file = "zstandard-0.21.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4af612c96599b17e4930fe58bffd6514e6c25509d120f4eae6031b7595912f85"},
{file = "zstandard-0.21.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cff891e37b167bc477f35562cda1248acc115dbafbea4f3af54ec70821090965"},
{file = "zstandard-0.21.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:a9fec02ce2b38e8b2e86079ff0b912445495e8ab0b137f9c0505f88ad0d61296"},
{file = "zstandard-0.21.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0bdbe350691dec3078b187b8304e6a9c4d9db3eb2d50ab5b1d748533e746d099"},
{file = "zstandard-0.21.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:b69cccd06a4a0a1d9fb3ec9a97600055cf03030ed7048d4bcb88c574f7895773"},
{file = "zstandard-0.21.0-cp38-cp38-win32.whl", hash = "sha256:9980489f066a391c5572bc7dc471e903fb134e0b0001ea9b1d3eff85af0a6f1b"},
{file = "zstandard-0.21.0-cp38-cp38-win_amd64.whl", hash = "sha256:0e1e94a9d9e35dc04bf90055e914077c80b1e0c15454cc5419e82529d3e70728"},
{file = "zstandard-0.21.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:d2d61675b2a73edcef5e327e38eb62bdfc89009960f0e3991eae5cc3d54718de"},
{file = "zstandard-0.21.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:25fbfef672ad798afab12e8fd204d122fca3bc8e2dcb0a2ba73bf0a0ac0f5f07"},
{file = "zstandard-0.21.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:62957069a7c2626ae80023998757e27bd28d933b165c487ab6f83ad3337f773d"},
{file = "zstandard-0.21.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:14e10ed461e4807471075d4b7a2af51f5234c8f1e2a0c1d37d5ca49aaaad49e8"},
{file = "zstandard-0.21.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:9cff89a036c639a6a9299bf19e16bfb9ac7def9a7634c52c257166db09d950e7"},
{file = "zstandard-0.21.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:52b2b5e3e7670bd25835e0e0730a236f2b0df87672d99d3bf4bf87248aa659fb"},
{file = "zstandard-0.21.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:b1367da0dde8ae5040ef0413fb57b5baeac39d8931c70536d5f013b11d3fc3a5"},
{file = "zstandard-0.21.0-cp39-cp39-win32.whl", hash = "sha256:db62cbe7a965e68ad2217a056107cc43d41764c66c895be05cf9c8b19578ce9c"},
{file = "zstandard-0.21.0-cp39-cp39-win_amd64.whl", hash = "sha256:a8d200617d5c876221304b0e3fe43307adde291b4a897e7b0617a61611dfff6a"},
{file = "zstandard-0.21.0.tar.gz", hash = "sha256:f08e3a10d01a247877e4cb61a82a319ea746c356a3786558bed2481e6c405546"},
]
[package.dependencies]
cffi = {version = ">=1.11", markers = "platform_python_implementation == \"PyPy\""}
[package.extras]
cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "c5981d8d7c2deadd47c823bc35f86f830c8e320b653d2d3718bade1f4d2dabca"
content-hash = "c40f62277e788011920f4edb6f7392046ee440f792a104c903097415def9a916"

View File

@@ -168,11 +168,6 @@ async fn task_main(
.instrument(tracing::info_span!("handle_client", ?session_id))
);
}
Some(Err(e)) = connections.join_next(), if !connections.is_empty() => {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
_ = cancellation_token.cancelled() => {
drop(listener);
break;

View File

@@ -45,8 +45,7 @@ enum Payload {
Batch(BatchQueryData),
}
const MAX_RESPONSE_SIZE: usize = 10 * 1024 * 1024; // 10 MiB
const MAX_REQUEST_SIZE: u64 = 10 * 1024 * 1024; // 10 MiB
const MAX_REQUEST_SIZE: u64 = 1024 * 1024; // 1 MB
static RAW_TEXT_OUTPUT: HeaderName = HeaderName::from_static("neon-raw-text-output");
static ARRAY_MODE: HeaderName = HeaderName::from_static("neon-array-mode");
@@ -263,8 +262,6 @@ async fn handle_inner(
None => MAX_REQUEST_SIZE + 1,
};
// we don't have a streaming request support yet so this is to prevent OOM
// from a malicious user sending an extremely large request body
if request_content_length > MAX_REQUEST_SIZE {
return Err(anyhow::anyhow!(
"request is too large (max is {MAX_REQUEST_SIZE} bytes)"
@@ -387,13 +384,6 @@ async fn query_to_json<T: GenericClient>(
let row = row?;
*current_size += row.body_len();
rows.push(row);
// we don't have a streaming response support yet so this is to prevent OOM
// from a malicious query (eg a cross join)
if *current_size > MAX_RESPONSE_SIZE {
return Err(anyhow::anyhow!(
"response is too large (max is {MAX_RESPONSE_SIZE} bytes)"
));
}
}
// grab the command tag and number of rows affected

View File

@@ -130,11 +130,6 @@ pub async fn task_main(
}),
);
}
Some(Err(e)) = connections.join_next(), if !connections.is_empty() => {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
_ = cancellation_token.cancelled() => {
drop(listener);
break;

View File

@@ -37,7 +37,6 @@ aiohttp = "3.8.5"
pytest-rerunfailures = "^11.1.2"
types-pytest-lazy-fixture = "^0.6.3.3"
pytest-split = "^0.8.1"
zstandard = "^0.21.0"
[tool.poetry.group.dev.dependencies]
black = "^23.3.0"

View File

@@ -10,8 +10,6 @@ anyhow.workspace = true
async-trait.workspace = true
byteorder.workspace = true
bytes.workspace = true
camino.workspace = true
camino-tempfile.workspace = true
chrono.workspace = true
clap = { workspace = true, features = ["derive"] }
const_format.workspace = true
@@ -38,6 +36,7 @@ tokio = { workspace = true, features = ["fs"] }
tokio-io-timeout.workspace = true
tokio-postgres.workspace = true
toml_edit.workspace = true
tempfile.workspace = true
tracing.workspace = true
url.workspace = true
metrics.workspace = true

View File

@@ -2,7 +2,6 @@
// Main entry point for the safekeeper executable
//
use anyhow::{bail, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use clap::Parser;
use futures::future::BoxFuture;
use futures::stream::FuturesUnordered;
@@ -15,6 +14,7 @@ use toml_edit::Document;
use std::fs::{self, File};
use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
@@ -63,7 +63,7 @@ split), and serving the hardened part further downstream to pageserver(s).
struct Args {
/// Path to the safekeeper data directory.
#[arg(short = 'D', long, default_value = "./")]
datadir: Utf8PathBuf,
datadir: PathBuf,
/// Safekeeper node id.
#[arg(long)]
id: Option<u64>,
@@ -92,7 +92,7 @@ struct Args {
no_sync: bool,
/// Dump control file at path specified by this argument and exit.
#[arg(long)]
dump_control_file: Option<Utf8PathBuf>,
dump_control_file: Option<PathBuf>,
/// Broker endpoint for storage nodes coordination in the form
/// http[s]://host:port. In case of https schema TLS is connection is
/// established; plaintext otherwise.
@@ -128,19 +128,19 @@ struct Args {
/// validations of JWT tokens. Empty string is allowed and means disabling
/// auth.
#[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
pg_auth_public_key_path: Option<Utf8PathBuf>,
pg_auth_public_key_path: Option<PathBuf>,
/// If given, enables auth on incoming connections to tenant only WAL
/// service endpoint (--listen-pg-tenant-only). Value specifies path to a
/// .pem public key used for validations of JWT tokens. Empty string is
/// allowed and means disabling auth.
#[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
pg_tenant_only_auth_public_key_path: Option<Utf8PathBuf>,
pg_tenant_only_auth_public_key_path: Option<PathBuf>,
/// If given, enables auth on incoming connections to http management
/// service endpoint (--listen-http). Value specifies path to a .pem public
/// key used for validations of JWT tokens. Empty string is allowed and
/// means disabling auth.
#[arg(long, verbatim_doc_comment, value_parser = opt_pathbuf_parser)]
http_auth_public_key_path: Option<Utf8PathBuf>,
http_auth_public_key_path: Option<PathBuf>,
/// Format for logging, either 'plain' or 'json'.
#[arg(long, default_value = "plain")]
log_format: String,
@@ -151,8 +151,8 @@ struct Args {
}
// Like PathBufValueParser, but allows empty string.
fn opt_pathbuf_parser(s: &str) -> Result<Utf8PathBuf, String> {
Ok(Utf8PathBuf::from_str(s).unwrap())
fn opt_pathbuf_parser(s: &str) -> Result<PathBuf, String> {
Ok(PathBuf::from_str(s).unwrap())
}
#[tokio::main(flavor = "current_thread")]
@@ -203,7 +203,7 @@ async fn main() -> anyhow::Result<()> {
info!("version: {GIT_VERSION}");
let args_workdir = &args.datadir;
let workdir = args_workdir.canonicalize_utf8().with_context(|| {
let workdir = args_workdir.canonicalize().with_context(|| {
format!("Failed to get the absolute path for input workdir {args_workdir:?}")
})?;
@@ -222,7 +222,7 @@ async fn main() -> anyhow::Result<()> {
None
}
Some(path) => {
info!("loading pg auth JWT key from {path}");
info!("loading pg auth JWT key from {}", path.display());
Some(Arc::new(
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
))
@@ -234,7 +234,10 @@ async fn main() -> anyhow::Result<()> {
None
}
Some(path) => {
info!("loading pg tenant only auth JWT key from {path}");
info!(
"loading pg tenant only auth JWT key from {}",
path.display()
);
Some(Arc::new(
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
))
@@ -246,7 +249,7 @@ async fn main() -> anyhow::Result<()> {
None
}
Some(path) => {
info!("loading http auth JWT key from {path}");
info!("loading http auth JWT key from {}", path.display());
Some(Arc::new(
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
))
@@ -444,7 +447,7 @@ async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
}
/// Determine safekeeper id.
fn set_id(workdir: &Utf8Path, given_id: Option<NodeId>) -> Result<NodeId> {
fn set_id(workdir: &Path, given_id: Option<NodeId>) -> Result<NodeId> {
let id_file_path = workdir.join(ID_FILE_NAME);
let my_id: NodeId;

View File

@@ -2,13 +2,12 @@
use anyhow::{bail, ensure, Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use camino::Utf8PathBuf;
use tokio::fs::{self, File};
use tokio::io::AsyncWriteExt;
use std::io::Read;
use std::ops::Deref;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::time::Instant;
use crate::control_file_upgrade::upgrade_control_file;
@@ -40,7 +39,7 @@ pub trait Storage: Deref<Target = SafeKeeperState> {
#[derive(Debug)]
pub struct FileStorage {
// save timeline dir to avoid reconstructing it every time
timeline_dir: Utf8PathBuf,
timeline_dir: PathBuf,
conf: SafeKeeperConf,
/// Last state persisted to disk.
@@ -175,7 +174,7 @@ impl Storage for FileStorage {
let mut control_partial = File::create(&control_partial_path).await.with_context(|| {
format!(
"failed to create partial control file at: {}",
&control_partial_path
&control_partial_path.display()
)
})?;
let mut buf: Vec<u8> = Vec::new();
@@ -190,13 +189,13 @@ impl Storage for FileStorage {
control_partial.write_all(&buf).await.with_context(|| {
format!(
"failed to write safekeeper state into control file at: {}",
control_partial_path
control_partial_path.display()
)
})?;
control_partial.flush().await.with_context(|| {
format!(
"failed to flush safekeeper state into control file at: {}",
control_partial_path
control_partial_path.display()
)
})?;
@@ -205,7 +204,7 @@ impl Storage for FileStorage {
control_partial.sync_all().await.with_context(|| {
format!(
"failed to sync partial control file at {}",
control_partial_path
control_partial_path.display()
)
})?;
}
@@ -217,10 +216,12 @@ impl Storage for FileStorage {
// this sync is not required by any standard but postgres does this (see durable_rename)
if !self.conf.no_sync {
let new_f = File::open(&control_path).await?;
new_f
.sync_all()
.await
.with_context(|| format!("failed to sync control file at: {}", &control_path))?;
new_f.sync_all().await.with_context(|| {
format!(
"failed to sync control file at: {}",
&control_path.display()
)
})?;
// fsync the directory (linux specific)
let tli_dir = File::open(&self.timeline_dir).await?;
@@ -249,7 +250,7 @@ mod test {
use utils::{id::TenantTimelineId, lsn::Lsn};
fn stub_conf() -> SafeKeeperConf {
let workdir = camino_tempfile::tempdir().unwrap().into_path();
let workdir = tempfile::tempdir().unwrap().into_path();
SafeKeeperConf {
workdir,
..SafeKeeperConf::dummy()

View File

@@ -7,7 +7,6 @@ use std::io::Read;
use std::path::PathBuf;
use anyhow::Result;
use camino::Utf8Path;
use chrono::{DateTime, Utc};
use postgres_ffi::XLogSegNo;
use serde::Deserialize;
@@ -202,7 +201,7 @@ pub async fn build(args: Args) -> Result<Response> {
/// Builds DiskContent from a directory path. It can fail if the directory
/// is deleted between the time we get the path and the time we try to open it.
fn build_disk_content(path: &Utf8Path) -> Result<DiskContent> {
fn build_disk_content(path: &std::path::Path) -> Result<DiskContent> {
let mut files = Vec::new();
for entry in fs::read_dir(path)? {
if entry.is_err() {
@@ -257,7 +256,7 @@ fn build_file_info(entry: DirEntry) -> Result<FileInfo> {
fn build_config(config: SafeKeeperConf) -> Config {
Config {
id: config.my_id,
workdir: config.workdir.into(),
workdir: config.workdir,
listen_pg_addr: config.listen_pg_addr,
listen_http_addr: config.listen_http_addr,
no_sync: config.no_sync,

View File

@@ -1,8 +1,8 @@
use camino::Utf8PathBuf;
use once_cell::sync::Lazy;
use remote_storage::RemoteStorageConfig;
use tokio::runtime::Runtime;
use std::path::PathBuf;
use std::time::Duration;
use storage_broker::Uri;
@@ -51,7 +51,7 @@ pub struct SafeKeeperConf {
// that during unit testing, because the current directory is global
// to the process but different unit tests work on different
// data directories to avoid clashing with each other.
pub workdir: Utf8PathBuf,
pub workdir: PathBuf,
pub my_id: NodeId,
pub listen_pg_addr: String,
pub listen_pg_addr_tenant_only: Option<String>,
@@ -73,11 +73,11 @@ pub struct SafeKeeperConf {
}
impl SafeKeeperConf {
pub fn tenant_dir(&self, tenant_id: &TenantId) -> Utf8PathBuf {
pub fn tenant_dir(&self, tenant_id: &TenantId) -> PathBuf {
self.workdir.join(tenant_id.to_string())
}
pub fn timeline_dir(&self, ttid: &TenantTimelineId) -> Utf8PathBuf {
pub fn timeline_dir(&self, ttid: &TenantTimelineId) -> PathBuf {
self.tenant_dir(&ttid.tenant_id)
.join(ttid.timeline_id.to_string())
}
@@ -87,7 +87,7 @@ impl SafeKeeperConf {
#[cfg(test)]
fn dummy() -> Self {
SafeKeeperConf {
workdir: Utf8PathBuf::from("./"),
workdir: PathBuf::from("./"),
no_sync: false,
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
listen_pg_addr_tenant_only: None,

View File

@@ -167,11 +167,11 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
tokio::fs::create_dir_all(&temp_base).await?;
let tli_dir = camino_tempfile::Builder::new()
let tli_dir = tempfile::Builder::new()
.suffix("_temptli")
.prefix(&format!("{}_{}_", ttid.tenant_id, ttid.timeline_id))
.tempdir_in(temp_base)?;
let tli_dir_path = tli_dir.path().to_path_buf();
let tli_dir_path = tli_dir.path().to_owned();
// Note: some time happens between fetching list of files and fetching files themselves.
// It's possible that some files will be removed from safekeeper and we will fail to fetch them.
@@ -220,7 +220,9 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result<Response>
info!(
"Moving timeline {} from {} to {}",
ttid, tli_dir_path, timeline_path
ttid,
tli_dir_path.display(),
timeline_path.display()
);
tokio::fs::create_dir_all(conf.tenant_dir(&ttid.tenant_id)).await?;
tokio::fs::rename(tli_dir_path, &timeline_path).await?;

View File

@@ -456,7 +456,7 @@ impl ProposerAcceptorMessage {
Ok(ProposerAcceptorMessage::AppendRequest(msg))
}
_ => bail!("unknown proposer-acceptor message tag: {}", tag),
_ => bail!("unknown proposer-acceptor message tag: {}", tag,),
}
}
}

View File

@@ -2,7 +2,6 @@
//! to glue together SafeKeeper and all other background services.
use anyhow::{anyhow, bail, Result};
use camino::Utf8PathBuf;
use postgres_ffi::XLogSegNo;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
@@ -10,6 +9,7 @@ use tokio::fs;
use serde_with::DisplayFromStr;
use std::cmp::max;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{Mutex, MutexGuard};
use tokio::{
@@ -331,7 +331,7 @@ pub struct Timeline {
cancellation_rx: watch::Receiver<bool>,
/// Directory where timeline state is stored.
pub timeline_dir: Utf8PathBuf,
pub timeline_dir: PathBuf,
}
impl Timeline {
@@ -805,7 +805,7 @@ impl Timeline {
}
/// Deletes directory and it's contents. Returns false if directory does not exist.
async fn delete_dir(path: &Utf8PathBuf) -> Result<bool> {
async fn delete_dir(path: &PathBuf) -> Result<bool> {
match fs::remove_dir_all(path).await {
Ok(_) => Ok(true),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),

View File

@@ -6,10 +6,10 @@ use crate::safekeeper::ServerInfo;
use crate::timeline::{Timeline, TimelineError};
use crate::SafeKeeperConf;
use anyhow::{bail, Context, Result};
use camino::Utf8PathBuf;
use once_cell::sync::Lazy;
use serde::Serialize;
use std::collections::HashMap;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::Sender;
@@ -89,7 +89,7 @@ impl GlobalTimelines {
};
let mut tenant_count = 0;
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
.with_context(|| format!("failed to list tenants dir {}", tenants_dir))?
.with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))?
{
match &tenants_dir_entry {
Ok(tenants_dir_entry) => {
@@ -102,7 +102,9 @@ impl GlobalTimelines {
}
Err(e) => error!(
"failed to list tenants dir entry {:?} in directory {}, reason: {:?}",
tenants_dir_entry, tenants_dir, e
tenants_dir_entry,
tenants_dir.display(),
e
),
}
}
@@ -134,7 +136,7 @@ impl GlobalTimelines {
let timelines_dir = conf.tenant_dir(&tenant_id);
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
.with_context(|| format!("failed to list timelines dir {}", timelines_dir))?
.with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))?
{
match &timelines_dir_entry {
Ok(timeline_dir_entry) => {
@@ -166,7 +168,9 @@ impl GlobalTimelines {
}
Err(e) => error!(
"failed to list timelines dir entry {:?} in directory {}, reason: {:?}",
timelines_dir_entry, timelines_dir, e
timelines_dir_entry,
timelines_dir.display(),
e
),
}
}
@@ -417,7 +421,7 @@ pub struct TimelineDeleteForceResult {
}
/// Deletes directory and it's contents. Returns false if directory does not exist.
fn delete_dir(path: Utf8PathBuf) -> Result<bool> {
fn delete_dir(path: PathBuf) -> Result<bool> {
match std::fs::remove_dir_all(path) {
Ok(_) => Ok(true),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false),

View File

@@ -1,6 +1,5 @@
use anyhow::{Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use futures::stream::FuturesOrdered;
use futures::StreamExt;
use tokio::task::JoinHandle;
@@ -8,6 +7,7 @@ use utils::id::NodeId;
use std::cmp::min;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
@@ -230,8 +230,8 @@ pub async fn wal_backup_launcher_task_main(
struct WalBackupTask {
timeline: Arc<Timeline>,
timeline_dir: Utf8PathBuf,
workspace_dir: Utf8PathBuf,
timeline_dir: PathBuf,
workspace_dir: PathBuf,
wal_seg_size: usize,
parallel_jobs: usize,
commit_lsn_watch_rx: watch::Receiver<Lsn>,
@@ -240,8 +240,8 @@ struct WalBackupTask {
/// Offload single timeline.
async fn backup_task_main(
ttid: TenantTimelineId,
timeline_dir: Utf8PathBuf,
workspace_dir: Utf8PathBuf,
timeline_dir: PathBuf,
workspace_dir: PathBuf,
parallel_jobs: usize,
mut shutdown_rx: Receiver<()>,
) {
@@ -351,8 +351,8 @@ pub async fn backup_lsn_range(
backup_lsn: &mut Lsn,
end_lsn: Lsn,
wal_seg_size: usize,
timeline_dir: &Utf8Path,
workspace_dir: &Utf8Path,
timeline_dir: &Path,
workspace_dir: &Path,
parallel_jobs: usize,
) -> Result<()> {
if parallel_jobs < 1 {
@@ -408,8 +408,8 @@ pub async fn backup_lsn_range(
async fn backup_single_segment(
seg: &Segment,
timeline_dir: &Utf8Path,
workspace_dir: &Utf8Path,
timeline_dir: &Path,
workspace_dir: &Path,
) -> Result<Segment> {
let segment_file_path = seg.file_path(timeline_dir)?;
let remote_segment_path = segment_file_path
@@ -429,7 +429,7 @@ async fn backup_single_segment(
BACKUP_ERRORS.inc();
}
res?;
debug!("Backup of {} done", segment_file_path);
debug!("Backup of {} done", segment_file_path.display());
Ok(*seg)
}
@@ -454,7 +454,7 @@ impl Segment {
XLogFileName(PG_TLI, self.seg_no, self.size())
}
pub fn file_path(self, timeline_dir: &Utf8Path) -> Result<Utf8PathBuf> {
pub fn file_path(self, timeline_dir: &Path) -> Result<PathBuf> {
Ok(timeline_dir.join(self.object_name()))
}
@@ -479,22 +479,19 @@ fn get_segments(start: Lsn, end: Lsn, seg_size: usize) -> Vec<Segment> {
static REMOTE_STORAGE: OnceCell<Option<GenericRemoteStorage>> = OnceCell::new();
async fn backup_object(
source_file: &Utf8Path,
target_file: &RemotePath,
size: usize,
) -> Result<()> {
async fn backup_object(source_file: &Path, target_file: &RemotePath, size: usize) -> Result<()> {
let storage = REMOTE_STORAGE
.get()
.expect("failed to get remote storage")
.as_ref()
.unwrap();
let file = tokio::io::BufReader::new(
File::open(&source_file)
.await
.with_context(|| format!("Failed to open file {} for wal backup", source_file))?,
);
let file = tokio::io::BufReader::new(File::open(&source_file).await.with_context(|| {
format!(
"Failed to open file {} for wal backup",
source_file.display()
)
})?);
storage
.upload_storage_object(Box::new(file), size, target_file)

View File

@@ -9,13 +9,13 @@
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use camino::{Utf8Path, Utf8PathBuf};
use futures::future::BoxFuture;
use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
use postgres_ffi::{dispatch_pgversion, XLogSegNo, PG_TLI};
use remote_storage::RemotePath;
use std::cmp::{max, min};
use std::io::{self, SeekFrom};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use tokio::fs::{self, remove_file, File, OpenOptions};
use tokio::io::{AsyncRead, AsyncWriteExt};
@@ -72,7 +72,7 @@ pub trait Storage {
/// When storage is created first time, all LSNs are zeroes and there are no segments on disk.
pub struct PhysicalStorage {
metrics: WalStorageMetrics,
timeline_dir: Utf8PathBuf,
timeline_dir: PathBuf,
conf: SafeKeeperConf,
/// Size of WAL segment in bytes.
@@ -123,7 +123,7 @@ impl PhysicalStorage {
/// the disk. Otherwise, all LSNs are set to zero.
pub fn new(
ttid: &TenantTimelineId,
timeline_dir: Utf8PathBuf,
timeline_dir: PathBuf,
conf: &SafeKeeperConf,
state: &SafeKeeperState,
) -> Result<PhysicalStorage> {
@@ -142,11 +142,7 @@ impl PhysicalStorage {
dispatch_pgversion!(
version,
pgv::xlog_utils::find_end_of_wal(
timeline_dir.as_std_path(),
wal_seg_size,
state.commit_lsn,
)?,
pgv::xlog_utils::find_end_of_wal(&timeline_dir, wal_seg_size, state.commit_lsn,)?,
bail!("unsupported postgres version: {}", version)
)
};
@@ -462,7 +458,7 @@ impl Storage for PhysicalStorage {
/// Remove all WAL segments in timeline_dir that match the given predicate.
async fn remove_segments_from_disk(
timeline_dir: &Utf8Path,
timeline_dir: &Path,
wal_seg_size: usize,
remove_predicate: impl Fn(XLogSegNo) -> bool,
) -> Result<()> {
@@ -501,8 +497,8 @@ async fn remove_segments_from_disk(
}
pub struct WalReader {
workdir: Utf8PathBuf,
timeline_dir: Utf8PathBuf,
workdir: PathBuf,
timeline_dir: PathBuf,
wal_seg_size: usize,
pos: Lsn,
wal_segment: Option<Pin<Box<dyn AsyncRead + Send + Sync>>>,
@@ -523,8 +519,8 @@ pub struct WalReader {
impl WalReader {
pub fn new(
workdir: Utf8PathBuf,
timeline_dir: Utf8PathBuf,
workdir: PathBuf,
timeline_dir: PathBuf,
state: &SafeKeeperState,
start_pos: Lsn,
enable_remote_read: bool,
@@ -691,7 +687,7 @@ impl WalReader {
}
/// Helper function for opening a wal file.
async fn open_wal_file(wal_file_path: &Utf8Path) -> Result<tokio::fs::File> {
async fn open_wal_file(wal_file_path: &Path) -> Result<tokio::fs::File> {
// First try to open the .partial file.
let mut partial_path = wal_file_path.to_owned();
partial_path.set_extension("partial");
@@ -726,10 +722,10 @@ async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> {
/// Helper returning full path to WAL segment file and its .partial brother.
fn wal_file_paths(
timeline_dir: &Utf8Path,
timeline_dir: &Path,
segno: XLogSegNo,
wal_seg_size: usize,
) -> Result<(Utf8PathBuf, Utf8PathBuf)> {
) -> Result<(PathBuf, PathBuf)> {
let wal_file_name = XLogFileName(PG_TLI, segno, wal_seg_size);
let wal_file_path = timeline_dir.join(wal_file_name.clone());
let wal_file_partial_path = timeline_dir.join(wal_file_name + ".partial");

View File

@@ -37,7 +37,6 @@ from psycopg2.extensions import connection as PgConnection
from psycopg2.extensions import cursor as PgCursor
from psycopg2.extensions import make_dsn, parse_dsn
from typing_extensions import Literal
from urllib3.util.retry import Retry
from fixtures.broker import NeonBroker
from fixtures.log_helper import log
@@ -1652,14 +1651,11 @@ class NeonPageserver(PgProtocol):
if '"testing"' not in self.version:
pytest.skip("pageserver was built without 'testing' feature")
def http_client(
self, auth_token: Optional[str] = None, retries: Optional[Retry] = None
) -> PageserverHttpClient:
def http_client(self, auth_token: Optional[str] = None) -> PageserverHttpClient:
return PageserverHttpClient(
port=self.service_port.http,
auth_token=auth_token,
is_testing_enabled_or_skip=self.is_testing_enabled_or_skip,
retries=retries,
)
@property

View File

@@ -7,8 +7,6 @@ from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from fixtures.log_helper import log
from fixtures.metrics import Metrics, parse_metrics
@@ -115,40 +113,12 @@ class TenantConfig:
class PageserverHttpClient(requests.Session):
def __init__(
self,
port: int,
is_testing_enabled_or_skip: Fn,
auth_token: Optional[str] = None,
retries: Optional[Retry] = None,
):
def __init__(self, port: int, is_testing_enabled_or_skip: Fn, auth_token: Optional[str] = None):
super().__init__()
self.port = port
self.auth_token = auth_token
self.is_testing_enabled_or_skip = is_testing_enabled_or_skip
if retries is None:
# We apply a retry policy that is different to the default `requests` behavior,
# because the pageserver has various transiently unavailable states that benefit
# from a client retrying on 503
retries = Retry(
# Status retries are for retrying on 503 while e.g. waiting for tenants to activate
status=5,
# Connection retries are for waiting for the pageserver to come up and listen
connect=5,
# No read retries: if a request hangs that is not expected behavior
# (this may change in future if we do fault injection of a kind that causes
# requests TCP flows to stick)
read=False,
backoff_factor=0,
status_forcelist=[503],
allowed_methods=None,
remove_headers_on_redirect=[],
)
self.mount("http://", HTTPAdapter(max_retries=retries))
if auth_token is not None:
self.headers["Authorization"] = f"Bearer {auth_token}"

View File

@@ -3,6 +3,7 @@ import json
import os
import re
import subprocess
import tarfile
import threading
import time
from pathlib import Path
@@ -10,7 +11,6 @@ from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Ty
from urllib.parse import urlencode
import allure
import zstandard
from psycopg2.extensions import cursor
from fixtures.log_helper import log
@@ -222,7 +222,7 @@ def get_scale_for_db(size_mb: int) -> int:
ATTACHMENT_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
r"regression\.diffs|.+\.(?:log|stderr|stdout|filediff|metrics|html|walredo)"
r"regression\.diffs|.+\.(?:log|stderr|stdout|filediff|metrics|html)"
)
@@ -231,35 +231,25 @@ def allure_attach_from_dir(dir: Path):
for attachment in Path(dir).glob("**/*"):
if ATTACHMENT_NAME_REGEX.fullmatch(attachment.name) and attachment.stat().st_size > 0:
source = str(attachment)
name = str(attachment.relative_to(dir))
# compress files that are larger than 1Mb, they're hardly readable in a browser
if attachment.stat().st_size > 1024**2:
compressed = attachment.with_suffix(".zst")
# compress files larger than 1Mb, they're hardly readable in a browser
if attachment.stat().st_size > 1024 * 1024:
source = f"{attachment}.tar.gz"
with tarfile.open(source, "w:gz") as tar:
tar.add(attachment, arcname=attachment.name)
name = f"{name}.tar.gz"
cctx = zstandard.ZstdCompressor()
with attachment.open("rb") as fin, compressed.open("wb") as fout:
cctx.copy_stream(fin, fout)
name = f"{name}.zst"
attachment = compressed
source = str(attachment)
if source.endswith(".gz"):
if source.endswith(".tar.gz"):
attachment_type = "application/gzip"
extension = "gz"
elif source.endswith(".zst"):
attachment_type = "application/zstd"
extension = "zst"
extension = "tar.gz"
elif source.endswith(".svg"):
attachment_type = "image/svg+xml"
extension = "svg"
elif source.endswith(".html"):
attachment_type = "text/html"
extension = "html"
elif source.endswith(".walredo"):
attachment_type = "application/octet-stream"
extension = "walredo"
else:
attachment_type = "text/plain"
extension = attachment.suffix.removeprefix(".")

View File

@@ -0,0 +1,52 @@
import queue
import threading
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.types import TenantId
"""
553 sudo mkfs.ext4 /dev/nvme1n1
555 mkdir test_output
556 sudo mount /dev/nvme1n1 test_output
557 htop
559 ./scripts/pysync
560 NEON_BIN=/home/admin/neon/target/release DEFAULT_PG_VERSION=15 ./scripts/pytest --preserve-database-files --timeout=0 ./test_runner/performance/test_pageserver_startup_many_tenants.py
561 sudo chown -R admin:admin test_output
cargo build_testing --release
562 NEON_BIN=$PWD/target/release DEFAULT_PG_VERSION=15 ./scripts/pytest --preserve-database-files --timeout=0 ./test_runner/performance/test_pageserver_startup_many_tenants.py
cd test_output/test_pageserver_startup_many_tenants/repo
sudo env NEON_REPO_DIR=$PWD prlimit --nofile=300000:300000 ../../../target/release/neon_local start
# watch initial load complete, then background jobs start. That's the interesting part.
sudo env NEON_REPO_DIR=$PWD prlimit --nofile=300000:300000 ../../../target/release/neon_local stop
# usually pageserver won't be responsive, kill with
sudo pkill -9 pageserver
"""
def test_pageserver_startup_many_tenants(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
# below doesn't work because summaries contain tenant and timeline ids and we check for them
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
pshttp = env.pageserver.http_client()
ep = env.endpoints.create_start("main")
ep.safe_psql("create table foo(b text)")
for i in range(0, 8):
ep.safe_psql("insert into foo(b) values ('some text')")
# pg_bin.run_capture(["pgbench", "-i", "-s1", ep.connstr()])
wait_for_last_flush_lsn(env, ep, tenant_id, timeline_id)
pshttp.timeline_checkpoint(tenant_id, timeline_id)
ep.stop_and_destroy()
env.pageserver.stop()
for sk in env.safekeepers:
sk.stop()
tenant_dir = env.repo_dir / "pageserver_1" / "tenants" / str(env.initial_tenant)
for i in range(0, 20_000):
import shutil
shutil.copytree(tenant_dir, tenant_dir.parent / str(TenantId.generate()))

View File

@@ -1,21 +1,19 @@
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, wait_for_last_flush_lsn
from fixtures.pageserver.utils import wait_for_upload_queue_empty
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from requests.exceptions import ConnectionError
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin
# Test duplicate layer detection
#
# This test sets fail point at the end of first compaction phase:
# after flushing new L1 layers but before deletion of L0 layers
# it should cause generation of duplicate L1 layer by compaction after restart.
@pytest.mark.timeout(600)
def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()
# use a failpoint to return all L0s as L1s
message = ".*duplicated L1 layer layer=.*"
env.pageserver.allowed_errors.append(message)
# Use aggressive compaction and checkpoint settings
tenant_id, _ = env.neon_cli.create_tenant(
conf={
@@ -35,100 +33,4 @@ def test_duplicate_layers(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
time.sleep(10) # let compaction to be performed
assert env.pageserver.log_contains("compact-level0-phase1-return-same")
def test_actually_duplicated_l1(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"""
This test sets fail point at the end of first compaction phase:
after flushing new L1 layers but before deletion of L0 layers
it should cause generation of duplicate L1 layer by compaction after restart.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
env = neon_env_builder.init_start(
initial_tenant_conf={
"checkpoint_distance": f"{1024 ** 2}",
"compaction_target_size": f"{1024 ** 2}",
"compaction_period": "0 s",
"compaction_threshold": "3",
}
)
pageserver_http = env.pageserver.http_client()
tenant_id, timeline_id = env.initial_tenant, env.initial_timeline
pageserver_http.configure_failpoints(("after-timeline-compacted-first-L1", "exit"))
endpoint = env.endpoints.create_start("main", tenant_id=tenant_id)
connstr = endpoint.connstr(options="-csynchronous_commit=off")
pg_bin.run_capture(["pgbench", "-i", "-s1", connstr])
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)
# make sure we receive no new wal after this, so that we'll write over the same L1 file.
endpoint.stop()
for sk in env.safekeepers:
sk.stop()
# hit the exit failpoint
with pytest.raises(ConnectionError, match="Remote end closed connection without response"):
pageserver_http.timeline_compact(tenant_id, timeline_id)
env.pageserver.stop()
# now the duplicate L1 has been created, but is not yet uploaded
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
# path = env.remote_storage.timeline_path(tenant_id, timeline_id)
l1_found = None
for path in env.pageserver.timeline_dir(tenant_id, timeline_id).iterdir():
if path.name == "metadata" or path.name.startswith("ephemeral-"):
continue
if len(path.suffixes) > 0:
# temp files
continue
[key_range, lsn_range] = path.name.split("__", maxsplit=1)
if "-" not in lsn_range:
# image layer
continue
[key_start, key_end] = key_range.split("-", maxsplit=1)
if key_start == "0" * 36 and key_end == "F" * 36:
# L0
continue
if l1_found is not None:
raise RuntimeError(f"found multiple L1: {l1_found.name} and {path.name}")
l1_found = path
assert l1_found is not None, "failed to find L1 locally"
original_created_at = l1_found.stat()[8]
uploaded = env.pageserver_remote_storage.timeline_path(tenant_id, timeline_id) / l1_found.name
assert not uploaded.exists(), "to-be-overwritten should not yet be uploaded"
# give room for fs timestamps
time.sleep(1)
env.pageserver.start()
message = f".*duplicated L1 layer layer={l1_found.name}"
env.pageserver.allowed_errors.append(message)
pageserver_http.timeline_compact(tenant_id, timeline_id)
# give time for log flush
time.sleep(1)
found_msg = env.pageserver.log_contains(message)
assert found_msg is not None, "no layer was duplicated, has this been fixed already?"
log.info(f"found log line: {found_msg}")
overwritten_at = l1_found.stat()[8]
assert original_created_at < overwritten_at, "expected the L1 to be overwritten"
wait_for_upload_queue_empty(pageserver_http, tenant_id, timeline_id)
uploaded_at = uploaded.stat()[8]
assert overwritten_at <= uploaded_at, "expected the L1 to finally be uploaded"
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T200", "-Mprepared", connstr])

View File

@@ -120,10 +120,6 @@ def test_pageserver_chaos(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
# these can happen, if we shutdown at a good time. to be fixed as part of #5172.
message = ".*duplicated L1 layer layer=.*"
env.pageserver.allowed_errors.append(message)
# Use a tiny checkpoint distance, to create a lot of layers quickly.
# That allows us to stress the compaction and layer flushing logic more.
tenant, _ = env.neon_cli.create_tenant(

View File

@@ -34,7 +34,6 @@ from fixtures.remote_storage import (
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, run_pg_bench_small, wait_until
from urllib3.util.retry import Retry
def test_timeline_delete(neon_simple_env: NeonEnv):
@@ -615,7 +614,7 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
child_timeline_id = env.neon_cli.create_branch("child", "main")
ps_http = env.pageserver.http_client(retries=Retry(0, read=False))
ps_http = env.pageserver.http_client()
failpoint_name = "persist_deleted_index_part"
ps_http.configure_failpoints((failpoint_name, "pause"))
@@ -855,7 +854,7 @@ def test_timeline_delete_resumed_on_attach(
# error from http response is also logged
".*InternalServerError\\(Tenant is marked as deleted on remote storage.*",
# Polling after attach may fail with this
".*Resource temporarily unavailable.*Tenant not yet active",
f".*InternalServerError\\(Tenant {tenant_id} is not active.*",
'.*shutdown_pageserver{exit_code=0}: stopping left-over name="remote upload".*',
)
)

View File

@@ -801,9 +801,6 @@ def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
wa_http_cli_debug = wa.http_client(auth_token=env.auth_keys.generate_safekeeper_token())
wa_http_cli_debug.check_status()
# create a dummy table to wait for timeline initialization in safekeeper
endpoint.safe_psql("create table wait_for_sk()")
# fetch something sensible from status
tli_status = wa_http_cli.timeline_status(tenant_id, timeline_id)
epoch = tli_status.acceptor_epoch