Compare commits

..

13 Commits

Author SHA1 Message Date
Christian Schwarz
53ac78ec63 make read_exact_at cancel-safe
Pass `FileGuard` to the tokio-epoll-uring read() operation.
and avoids the nasty scopeguard usage.

This makes the `read_exact_at` cancel-safe.

Depends on https://github.com/neondatabase/tokio-epoll-uring/pull/27
2023-12-12 14:13:09 +00:00
Christian Schwarz
69158a33dd fix cfg(test) and provide again VirtualFile::read_at, used only in cfg(test) 2023-12-12 11:03:07 +00:00
Christian Schwarz
0ca94f5b53 re-add duration metrics 2023-12-11 14:42:21 +00:00
Christian Schwarz
65d0acb081 use tokio_epoll_uring for read path & VirtualFile::open
This makes Delta/Image ::load fns fully tokio-epoll-uring
2023-12-11 14:30:21 +00:00
Christian Schwarz
7d90ef62ae Revert "revert recent VirtualFile asyncification changes (#5291)"
This reverts commit ab1f37e908.

fixes #5479
2023-12-11 14:24:06 +00:00
Joonas Koivunen
f0d15cee6f build: update azure-* to 0.17 (#6081)
this is a drive-by upgrade while we refresh the access tokens at the
same time.
2023-12-11 12:21:02 +01:00
Sasha Krassovsky
0ba4cae491 Fix RLS/REPLICATION granting (#6083)
## Problem

## Summary of changes

## Checklist before requesting a review

- [ ] I have performed a self-review of my code.
- [ ] If it is a core feature, I have added thorough tests.
- [ ] Do we need to implement analytics? if so did you add the relevant
metrics to the dashboard?
- [ ] If this PR requires public announcement, mark it with
/release-notes label and add several sentences in this section.

## Checklist before merging

- [ ] Do not forget to reformat commit message to not include the above
checklist
2023-12-08 12:55:44 -08:00
Andrew Rudenko
df1f8e13c4 proxy: pass neon options in deep object format (#6068)
---------

Co-authored-by: Conrad Ludgate <conradludgate@gmail.com>
2023-12-08 19:58:36 +01:00
John Spray
e640bc7dba tests: allow-lists for occasional failures (#6074)
test_creating_tenant_conf_after...
- Test detaches a tenant and then re-attaches immediatel: this causes a
race between pending remote LSN update and the generation bump in the
attachment.

test_gc_cutoff:
- Test rapidly restarts a pageserver before one generation has had the
chance to process deletions from the previous generation
2023-12-08 17:32:16 +00:00
Christian Schwarz
cf024de202 virtual_file metrics: expose max size of the fd cache (#6078)
And also leave a comment on how to determine current size.

Kind of follow-up to #6066

refs https://github.com/neondatabase/cloud/issues/8351
refs https://github.com/neondatabase/neon/issues/5479
2023-12-08 17:23:50 +00:00
Conrad Ludgate
e1a564ace2 proxy simplify cancellation (#5916)
## Problem

The cancellation code was confusing and error prone (as seen before in
our memory leaks).

## Summary of changes

* Use the new `TaskTracker` primitve instead of JoinSet to gracefully
wait for tasks to shutdown.
* Updated libs/utils/completion to use `TaskTracker`
* Remove `tokio::select` in favour of `futures::future::select` in a
specialised `run_until_cancelled()` helper function
2023-12-08 16:21:17 +00:00
Christian Schwarz
f5b9af6ac7 page cache: improve eviction-related metrics (#6077)
These changes help with identifying thrashing.

The existing `pageserver_page_cache_find_victim_iters_total` is already
useful, but, it doesn't tell us how many individual find_victim() calls
are happening, only how many clock-LRU steps happened in the entire
system,
without info about whether we needed to actually evict other data vs
just scan for a long time, e.g., because the cache is large.

The changes in this PR allows us to
1. count each possible outcome separately, esp evictions
2. compute mean iterations/outcome

I don't think anyone except me was paying close attention to
`pageserver_page_cache_find_victim_iters_total` before, so,
I think the slight behavior change of also counting iterations
for the 'iters exceeded' case is fine.

refs https://github.com/neondatabase/cloud/issues/8351
refs https://github.com/neondatabase/neon/issues/5479
2023-12-08 15:27:21 +00:00
John Spray
5e98855d80 tests: update tests that used local_fs&mock_s3 to use one or the other (#6015)
## Problem

This was wasting resources: if we run a test with mock s3 we don't then
need to run it again with local fs. When we're running in CI, we don't
need to run with the mock/local storage as well as real S3. There is
some value in having CI notice/spot issues that might otherwise only
happen when running locally, but that doesn't justify the cost of
running the tests so many more times on every PR.

## Summary of changes

- For tests that used available_remote_storages or
available_s3_storages, update them to either specify no remote storage
(therefore inherit the default, which is currently local fs), or to
specify s3_storage() for the tests that actually want an S3 API.
2023-12-08 14:52:37 +00:00
43 changed files with 1003 additions and 778 deletions

140
Cargo.lock generated
View File

@@ -44,6 +44,12 @@ dependencies = [
"memchr",
]
[[package]]
name = "allocator-api2"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0942ffc6dcaadf03badf6e6a2d0228460359d5e34b57ccdc720b7382dfbd5ec5"
[[package]]
name = "android_system_properties"
version = "0.1.5"
@@ -178,7 +184,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35"
dependencies = [
"concurrent-queue",
"event-listener",
"event-listener 2.5.3",
"futures-core",
]
@@ -199,11 +205,13 @@ dependencies = [
[[package]]
name = "async-lock"
version = "2.8.0"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b"
checksum = "7125e42787d53db9dd54261812ef17e937c95a51e4d291373b670342fa44310c"
dependencies = [
"event-listener",
"event-listener 4.0.0",
"event-listener-strategy",
"pin-project-lite",
]
[[package]]
@@ -686,9 +694,9 @@ dependencies = [
[[package]]
name = "azure_core"
version = "0.16.0"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e29286b9edfdd6f2c7e9d970bb5b015df8621258acab9ecfcea09b2d7692467"
checksum = "4ccd63c07d1fbfb3d4543d7ea800941bf5a30db1911b9b9e4db3b2c4210a434f"
dependencies = [
"async-trait",
"base64 0.21.1",
@@ -713,9 +721,9 @@ dependencies = [
[[package]]
name = "azure_identity"
version = "0.16.2"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b67b337346da8739e91ea1e9400a6ebc9bc54e0b2af1d23c9bcd565950588f9"
checksum = "8bd7ea32ca7eb66ff4757f83baac702ff11d469e5de365b6bc6f79f9c25d3436"
dependencies = [
"async-lock",
"async-trait",
@@ -734,9 +742,9 @@ dependencies = [
[[package]]
name = "azure_storage"
version = "0.16.0"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bed0ccefde57930b2886fd4aed1f70ac469c197b8c2e94828290d71bcbdb5d97"
checksum = "83ca0a07f89fd72a006da4713e93af3d6c44a693e61a1c3c2e7985de39c182e8"
dependencies = [
"RustyXML",
"async-trait",
@@ -756,9 +764,9 @@ dependencies = [
[[package]]
name = "azure_storage_blobs"
version = "0.16.0"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f91a52da2d192cfe43759f61e8bb31a5969f1722d5b85ac89627f356ad674ab4"
checksum = "8096c04d370118323c42b2752aa1883e4880a56ef65239f317b359f263b6e194"
dependencies = [
"RustyXML",
"azure_core",
@@ -890,7 +898,7 @@ checksum = "a246e68bb43f6cd9db24bea052a53e40405417c5fb372e3d1a8a7f770a564ef5"
dependencies = [
"memchr",
"once_cell",
"regex-automata",
"regex-automata 0.1.10",
"serde",
]
@@ -1680,6 +1688,27 @@ version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "event-listener"
version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "770d968249b5d99410d61f5bf89057f3199a077a04d087092f58e7d10692baae"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]]
name = "event-listener-strategy"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3"
dependencies = [
"event-listener 4.0.0",
"pin-project-lite",
]
[[package]]
name = "fail"
version = "0.5.1"
@@ -2042,6 +2071,10 @@ name = "hashbrown"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a"
dependencies = [
"ahash",
"allocator-api2",
]
[[package]]
name = "hashlink"
@@ -2380,6 +2413,16 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "io-uring"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "460648e47a07a43110fbfa2e0b14afb2be920093c31e5dccc50e49568e099762"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]]
name = "ipnet"
version = "2.9.0"
@@ -2533,7 +2576,7 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
"regex-automata 0.1.10",
]
[[package]]
@@ -2559,9 +2602,9 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "memchr"
version = "2.5.0"
version = "2.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167"
[[package]]
name = "memoffset"
@@ -3088,6 +3131,7 @@ dependencies = [
"tenant_size_model",
"thiserror",
"tokio",
"tokio-epoll-uring",
"tokio-io-timeout",
"tokio-postgres",
"tokio-tar",
@@ -3668,9 +3712,9 @@ dependencies = [
[[package]]
name = "quick-xml"
version = "0.30.0"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956"
checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33"
dependencies = [
"memchr",
"serde",
@@ -3810,13 +3854,14 @@ dependencies = [
[[package]]
name = "regex"
version = "1.8.2"
version = "1.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1a59b5d8e97dee33696bf13c5ba8ab85341c002922fba050069326b9c498974"
checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.7.2",
"regex-automata 0.4.3",
"regex-syntax 0.8.2",
]
[[package]]
@@ -3828,6 +3873,17 @@ dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
name = "regex-automata"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.8.2",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
@@ -3836,9 +3892,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.7.2"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
name = "relative-path"
@@ -4983,18 +5039,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.40"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.40"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
dependencies = [
"proc-macro2",
"quote",
@@ -5091,6 +5147,7 @@ dependencies = [
"libc",
"mio",
"num_cpus",
"parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.5.5",
@@ -5098,6 +5155,21 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-epoll-uring"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#82d74064c5019b0e9a8ae1bcdc75b0345d41bba9"
dependencies = [
"futures",
"once_cell",
"scopeguard",
"thiserror",
"tokio",
"tokio-util",
"tracing",
"uring-common",
]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
@@ -5224,6 +5296,8 @@ dependencies = [
"futures-core",
"futures-io",
"futures-sink",
"futures-util",
"hashbrown 0.14.0",
"pin-project-lite",
"tokio",
"tracing",
@@ -5637,6 +5711,15 @@ dependencies = [
"webpki-roots 0.23.1",
]
[[package]]
name = "uring-common"
version = "0.1.0"
source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#82d74064c5019b0e9a8ae1bcdc75b0345d41bba9"
dependencies = [
"io-uring",
"libc",
]
[[package]]
name = "url"
version = "2.3.1"
@@ -6219,7 +6302,8 @@ dependencies = [
"prost",
"rand 0.8.5",
"regex",
"regex-syntax 0.7.2",
"regex-automata 0.4.3",
"regex-syntax 0.8.2",
"reqwest",
"ring 0.16.20",
"rustls",

View File

@@ -38,10 +38,10 @@ license = "Apache-2.0"
anyhow = { version = "1.0", features = ["backtrace"] }
arc-swap = "1.6"
async-compression = { version = "0.4.0", features = ["tokio", "gzip", "zstd"] }
azure_core = "0.16"
azure_identity = "0.16"
azure_storage = "0.16"
azure_storage_blobs = "0.16"
azure_core = "0.17"
azure_identity = "0.17"
azure_storage = "0.17"
azure_storage_blobs = "0.17"
flate2 = "1.0.26"
async-stream = "0.3"
async-trait = "0.1"
@@ -109,7 +109,7 @@ pin-project-lite = "0.2"
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
prost = "0.11"
rand = "0.8"
regex = "1.4"
regex = "1.10.2"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls"] }
reqwest-tracing = { version = "0.4.0", features = ["opentelemetry_0_19"] }
reqwest-middleware = "0.2.0"
@@ -149,7 +149,7 @@ tokio-postgres-rustls = "0.10.0"
tokio-rustls = "0.24"
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-util = { version = "0.7", features = ["io"] }
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.7"
toml_edit = "0.19"
tonic = {version = "0.9", features = ["tls", "tls-roots"]}

View File

@@ -252,7 +252,7 @@ fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()>
IF NOT EXISTS (
SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
THEN
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION IN ROLE pg_read_all_data, pg_write_all_data;
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN REPLICATION BYPASSRLS IN ROLE pg_read_all_data, pg_write_all_data;
IF array_length(roles, 1) IS NOT NULL THEN
EXECUTE format('GRANT neon_superuser TO %s',
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));

View File

@@ -193,16 +193,11 @@ impl Escaping for PgIdent {
/// Build a list of existing Postgres roles
pub fn get_existing_roles(xact: &mut Transaction<'_>) -> Result<Vec<Role>> {
let postgres_roles = xact
.query(
"SELECT rolname, rolpassword, rolreplication, rolbypassrls FROM pg_catalog.pg_authid",
&[],
)?
.query("SELECT rolname, rolpassword FROM pg_catalog.pg_authid", &[])?
.iter()
.map(|row| Role {
name: row.get("rolname"),
encrypted_password: row.get("rolpassword"),
replication: Some(row.get("rolreplication")),
bypassrls: Some(row.get("rolbypassrls")),
options: None,
})
.collect();

View File

@@ -252,8 +252,6 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
let action = if let Some(r) = pg_role {
if (r.encrypted_password.is_none() && role.encrypted_password.is_some())
|| (r.encrypted_password.is_some() && role.encrypted_password.is_none())
|| !r.bypassrls.unwrap_or(false)
|| !r.replication.unwrap_or(false)
{
RoleAction::Update
} else if let Some(pg_pwd) = &r.encrypted_password {
@@ -285,14 +283,22 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
match action {
RoleAction::None => {}
RoleAction::Update => {
let mut query: String =
format!("ALTER ROLE {} BYPASSRLS REPLICATION", name.pg_quote());
// This can be run on /every/ role! Not just ones created through the console.
// This means that if you add some funny ALTER here that adds a permission,
// this will get run even on user-created roles! This will result in different
// behavior before and after a spec gets reapplied. The below ALTER as it stands
// now only grants LOGIN and changes the password. Please do not allow this branch
// to do anything silly.
let mut query: String = format!("ALTER ROLE {} ", name.pg_quote());
query.push_str(&role.to_pg_options());
xact.execute(query.as_str(), &[])?;
}
RoleAction::Create => {
// This branch only runs when roles are created through the console, so it is
// safe to add more permissions here. BYPASSRLS and REPLICATION are inherited
// from neon_superuser.
let mut query: String = format!(
"CREATE ROLE {} CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser",
"CREATE ROLE {} INHERIT CREATEROLE CREATEDB IN ROLE neon_superuser",
name.pg_quote()
);
info!("role create query: '{}'", &query);

View File

@@ -207,8 +207,6 @@ pub struct DeltaOp {
pub struct Role {
pub name: PgIdent,
pub encrypted_password: Option<String>,
pub replication: Option<bool>,
pub bypassrls: Option<bool>,
pub options: GenericOptions,
}

View File

@@ -1,16 +1,14 @@
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tokio_util::task::{task_tracker::TaskTrackerToken, TaskTracker};
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///
/// Can be cloned, moved and kept around in futures as "guard objects".
#[derive(Clone)]
pub struct Completion(mpsc::Sender<()>);
pub struct Completion(TaskTrackerToken);
/// Barrier will wait until all clones of [`Completion`] have been dropped.
#[derive(Clone)]
pub struct Barrier(Arc<Mutex<mpsc::Receiver<()>>>);
pub struct Barrier(TaskTracker);
impl Default for Barrier {
fn default() -> Self {
@@ -21,7 +19,7 @@ impl Default for Barrier {
impl Barrier {
pub async fn wait(self) {
self.0.lock().await.recv().await;
self.0.wait().await;
}
pub async fn maybe_wait(barrier: Option<Barrier>) {
@@ -33,8 +31,7 @@ impl Barrier {
impl PartialEq for Barrier {
fn eq(&self, other: &Self) -> bool {
// we don't use dyn so this is good
Arc::ptr_eq(&self.0, &other.0)
TaskTracker::ptr_eq(&self.0, &other.0)
}
}
@@ -42,8 +39,10 @@ impl Eq for Barrier {}
/// Create new Guard and Barrier pair.
pub fn channel() -> (Completion, Barrier) {
let (tx, rx) = mpsc::channel::<()>(1);
let rx = Mutex::new(rx);
let rx = Arc::new(rx);
(Completion(tx), Barrier(rx))
let tracker = TaskTracker::new();
// otherwise wait never exits
tracker.close();
let token = tracker.token();
(Completion(token), Barrier(tracker))
}

View File

@@ -83,6 +83,8 @@ enum-map.workspace = true
enumset.workspace = true
strum.workspace = true
strum_macros.workspace = true
#tokio-epoll-uring = { path = "../../tokio-epoll-uring/tokio-epoll-uring" }
tokio-epoll-uring = { git = "https://github.com/neondatabase/tokio-epoll-uring.git" , branch = "main" }
[dev-dependencies]
criterion.workspace = true

View File

@@ -425,7 +425,6 @@ fn start_pageserver(
let tenant_manager = Arc::new(tenant_manager);
BACKGROUND_RUNTIME.spawn({
let init_done_rx = init_done_rx;
let shutdown_pageserver = shutdown_pageserver.clone();
let drive_init = async move {
// NOTE: unlike many futures in pageserver, this one is cancellation-safe
@@ -560,7 +559,6 @@ fn start_pageserver(
}
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let background_jobs_barrier = background_jobs_barrier;
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.

View File

@@ -285,6 +285,63 @@ pub static PAGE_CACHE_SIZE: Lazy<PageCacheSizeMetrics> = Lazy::new(|| PageCacheS
},
});
pub(crate) mod page_cache_eviction_metrics {
use std::num::NonZeroUsize;
use metrics::{register_int_counter_vec, IntCounter, IntCounterVec};
use once_cell::sync::Lazy;
#[derive(Clone, Copy)]
pub(crate) enum Outcome {
FoundSlotUnused { iters: NonZeroUsize },
FoundSlotEvicted { iters: NonZeroUsize },
ItersExceeded { iters: NonZeroUsize },
}
static ITERS_TOTAL_VEC: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_find_victim_iters_total",
"Counter for the number of iterations in the find_victim loop",
&["outcome"],
)
.expect("failed to define a metric")
});
static CALLS_VEC: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_find_victim_calls",
"Incremented at the end of each find_victim() call.\
Filter by outcome to get e.g., eviction rate.",
&["outcome"]
)
.unwrap()
});
pub(crate) fn observe(outcome: Outcome) {
macro_rules! dry {
($label:literal, $iters:expr) => {{
static LABEL: &'static str = $label;
static ITERS_TOTAL: Lazy<IntCounter> =
Lazy::new(|| ITERS_TOTAL_VEC.with_label_values(&[LABEL]));
static CALLS: Lazy<IntCounter> =
Lazy::new(|| CALLS_VEC.with_label_values(&[LABEL]));
ITERS_TOTAL.inc_by(($iters.get()) as u64);
CALLS.inc();
}};
}
match outcome {
Outcome::FoundSlotUnused { iters } => dry!("found_empty", iters),
Outcome::FoundSlotEvicted { iters } => {
dry!("found_evicted", iters)
}
Outcome::ItersExceeded { iters } => {
dry!("err_iters_exceeded", iters);
super::page_cache_errors_inc(super::PageCacheErrorKind::EvictIterLimit);
}
}
}
}
pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy<Histogram> = Lazy::new(|| {
register_histogram!(
"pageserver_page_cache_acquire_pinned_slot_seconds",
@@ -294,14 +351,6 @@ pub(crate) static PAGE_CACHE_ACQUIRE_PINNED_SLOT_TIME: Lazy<Histogram> = Lazy::n
.expect("failed to define a metric")
});
pub(crate) static PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_page_cache_find_victim_iters_total",
"Counter for the number of iterations in the find_victim loop",
)
.expect("failed to define a metric")
});
static PAGE_CACHE_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"page_cache_errors_total",
@@ -842,6 +891,26 @@ pub(crate) static STORAGE_IO_SIZE: Lazy<IntGaugeVec> = Lazy::new(|| {
)
.expect("failed to define a metric")
});
pub(crate) mod virtual_file_descriptor_cache {
use super::*;
pub(crate) static SIZE_MAX: Lazy<UIntGauge> = Lazy::new(|| {
register_uint_gauge!(
"pageserver_virtual_file_descriptor_cache_size_max",
"Maximum number of open file descriptors in the cache."
)
.unwrap()
});
// SIZE_CURRENT: derive it like so:
// ```
// sum (pageserver_io_operations_seconds_count{operation=~"^(open|open-after-replace)$")
// -ignoring(operation)
// sum(pageserver_io_operations_seconds_count{operation=~"^(close|close-by-replace)$"}
// ```
}
#[derive(Debug)]
struct GlobalAndPerTimelineHistogram {
global: Histogram,

View File

@@ -88,7 +88,11 @@ use utils::{
lsn::Lsn,
};
use crate::{context::RequestContext, metrics::PageCacheSizeMetrics, repository::Key};
use crate::{
context::RequestContext,
metrics::{page_cache_eviction_metrics, PageCacheSizeMetrics},
repository::Key,
};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
const TEST_PAGE_CACHE_SIZE: usize = 50;
@@ -897,8 +901,10 @@ impl PageCache {
// Note that just yielding to tokio during iteration without such
// priority boosting is likely counter-productive. We'd just give more opportunities
// for B to bump usage count, further starving A.
crate::metrics::page_cache_errors_inc(
crate::metrics::PageCacheErrorKind::EvictIterLimit,
page_cache_eviction_metrics::observe(
page_cache_eviction_metrics::Outcome::ItersExceeded {
iters: iters.try_into().unwrap(),
},
);
anyhow::bail!("exceeded evict iter limit");
}
@@ -909,8 +915,18 @@ impl PageCache {
// remove mapping for old buffer
self.remove_mapping(old_key);
inner.key = None;
page_cache_eviction_metrics::observe(
page_cache_eviction_metrics::Outcome::FoundSlotEvicted {
iters: iters.try_into().unwrap(),
},
);
} else {
page_cache_eviction_metrics::observe(
page_cache_eviction_metrics::Outcome::FoundSlotUnused {
iters: iters.try_into().unwrap(),
},
);
}
crate::metrics::PAGE_CACHE_FIND_VICTIMS_ITERS_TOTAL.inc_by(iters as u64);
return Ok((slot_idx, inner));
}
}

View File

@@ -5,10 +5,10 @@
use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::page_cache::{self, PageReadGuard, PageWriteGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
use std::ops::{Deref, DerefMut};
use std::ops::Deref;
/// This is implemented by anything that can read 8 kB (PAGE_SZ)
/// blocks, using the page cache
@@ -39,6 +39,8 @@ pub enum BlockLease<'a> {
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
#[cfg(test)]
Vec(Vec<u8>),
}
impl From<PageReadGuard<'static>> for BlockLease<'static> {
@@ -63,6 +65,13 @@ impl<'a> Deref for BlockLease<'a> {
BlockLease::EphemeralFileMutableTail(v) => v,
#[cfg(test)]
BlockLease::Arc(v) => v.deref(),
#[cfg(test)]
BlockLease::Vec(v) => {
let v: &Vec<u8> = v;
assert_eq!(v.len(), PAGE_SZ, "caller must ensure that v has PAGE_SZ");
// Safety: see above assertion.
unsafe { &*(v.as_ptr() as *const [u8; PAGE_SZ]) }
}
}
}
}
@@ -169,10 +178,14 @@ impl FileBlockReader {
}
/// Read a page from the underlying file into given buffer.
async fn fill_buffer(&self, buf: &mut [u8], blkno: u32) -> Result<(), std::io::Error> {
async fn fill_buffer(
&self,
buf: PageWriteGuard<'static>,
blkno: u32,
) -> Result<PageWriteGuard<'static>, std::io::Error> {
assert!(buf.len() == PAGE_SZ);
self.file
.read_exact_at(buf, blkno as u64 * PAGE_SZ as u64)
.read_exact_at_page(buf, blkno as u64 * PAGE_SZ as u64)
.await
}
/// Read a block.
@@ -196,9 +209,9 @@ impl FileBlockReader {
)
})? {
ReadBufResult::Found(guard) => Ok(guard.into()),
ReadBufResult::NotFound(mut write_guard) => {
ReadBufResult::NotFound(write_guard) => {
// Read the page from disk into the buffer
self.fill_buffer(write_guard.deref_mut(), blknum).await?;
let write_guard = self.fill_buffer(write_guard, blknum).await?;
Ok(write_guard.mark_valid().into())
}
}

View File

@@ -9,7 +9,6 @@ use crate::virtual_file::VirtualFile;
use camino::Utf8PathBuf;
use pageserver_api::shard::TenantShardId;
use std::cmp::min;
use std::fs::OpenOptions;
use std::io::{self, ErrorKind};
use std::ops::DerefMut;
use std::sync::atomic::AtomicU64;
@@ -45,11 +44,11 @@ impl EphemeralFile {
"ephemeral-{filename_disambiguator}"
)));
let file = VirtualFile::open_with_options(
&filename,
OpenOptions::new().read(true).write(true).create(true),
)
.await?;
let file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true).write(true).create(true);
VirtualFile::open_with_options_async(&filename, options).await?
};
Ok(EphemeralFile {
page_cache_file_id: page_cache::next_file_id(),
@@ -89,11 +88,10 @@ impl EphemeralFile {
page_cache::ReadBufResult::Found(guard) => {
return Ok(BlockLease::PageReadGuard(guard))
}
page_cache::ReadBufResult::NotFound(mut write_guard) => {
let buf: &mut [u8] = write_guard.deref_mut();
debug_assert_eq!(buf.len(), PAGE_SZ);
self.file
.read_exact_at(&mut buf[..], blknum as u64 * PAGE_SZ as u64)
page_cache::ReadBufResult::NotFound(write_guard) => {
let write_guard = self
.file
.read_exact_at_page(write_guard, blknum as u64 * PAGE_SZ as u64)
.await?;
let read_guard = write_guard.mark_valid();
return Ok(BlockLease::PageReadGuard(read_guard));

View File

@@ -647,12 +647,13 @@ impl DeltaLayer {
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true).write(true);
VirtualFile::open_with_options_async(path, options)
.await
.with_context(|| format!("Failed to open file '{}'", path))?
};
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;

View File

@@ -325,12 +325,13 @@ impl ImageLayer {
where
F: Fn(Summary) -> Summary,
{
let file = VirtualFile::open_with_options(
path,
&*std::fs::OpenOptions::new().read(true).write(true),
)
.await
.with_context(|| format!("Failed to open file '{}'", path))?;
let file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true).write(true);
VirtualFile::open_with_options_async(path, options)
.await
.with_context(|| format!("Failed to open file '{}'", path))?
};
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref()).context("deserialize")?;
@@ -492,11 +493,11 @@ impl ImageLayerWriterInner {
},
);
info!("new image layer {path}");
let mut file = VirtualFile::open_with_options(
&path,
std::fs::OpenOptions::new().write(true).create_new(true),
)
.await?;
let mut file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.write(true).create_new(true);
VirtualFile::open_with_options_async(&path, options).await?
};
// make room for the header block
file.seek(SeekFrom::Start(PAGE_SZ as u64)).await?;
let blob_writer = BlobWriter::new(file, PAGE_SZ as u64);

View File

@@ -11,14 +11,17 @@
//! src/backend/storage/file/fd.c
//!
use crate::metrics::{StorageIoOperation, STORAGE_IO_SIZE, STORAGE_IO_TIME_METRIC};
use crate::tenant::TENANTS_SEGMENT_NAME;
use crate::page_cache::PageWriteGuard;
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use std::fs::{self, File, OpenOptions};
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tokio::time::Instant;
use tokio_epoll_uring::IoBufMut;
use utils::fs_ext;
///
@@ -53,7 +56,7 @@ pub struct VirtualFile {
/// opened, in the VirtualFile::create() function, and strip the flag before
/// storing it here.
pub path: Utf8PathBuf,
open_options: OpenOptions,
open_options: tokio_epoll_uring::ops::open_at::OpenOptions,
// These are strings becase we only use them for metrics, and those expect strings.
// It makes no sense for us to constantly turn the `TimelineId` and `TenantId` into
@@ -103,7 +106,38 @@ struct SlotInner {
tag: u64,
/// the underlying file
file: Option<File>,
file: Option<OwnedFd>,
}
/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
struct PageWriteGuardBuf {
page: PageWriteGuard<'static>,
init_up_to: usize,
}
// Safety: the [`PageWriteGuard`] gives us exclusive ownership of the page cache slot,
// and the location remains stable even if [`Self`] or the [`PageWriteGuard`] is moved.
unsafe impl tokio_epoll_uring::IoBuf for PageWriteGuardBuf {
fn stable_ptr(&self) -> *const u8 {
self.page.as_ptr()
}
fn bytes_init(&self) -> usize {
self.init_up_to
}
fn bytes_total(&self) -> usize {
self.page.len()
}
}
// Safety: see above, plus: the ownership of [`PageWriteGuard`] means exclusive access,
// hence it's safe to hand out the `stable_mut_ptr()`.
unsafe impl tokio_epoll_uring::IoBufMut for PageWriteGuardBuf {
fn stable_mut_ptr(&mut self) -> *mut u8 {
self.page.as_mut_ptr()
}
unsafe fn set_init(&mut self, pos: usize) {
assert!(pos <= self.page.len());
self.init_up_to = pos;
}
}
impl OpenFiles {
@@ -111,7 +145,7 @@ impl OpenFiles {
///
/// On return, we hold a lock on the slot, and its 'tag' has been updated
/// recently_used has been set. It's all ready for reuse.
fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard<SlotInner>) {
//
// Run the clock algorithm to find a slot to replace.
//
@@ -143,7 +177,7 @@ impl OpenFiles {
}
retries += 1;
} else {
slot_guard = slot.inner.write().unwrap();
slot_guard = slot.inner.write().await;
index = next;
break;
}
@@ -154,7 +188,7 @@ impl OpenFiles {
// old file.
//
if let Some(old_file) = slot_guard.file.take() {
// the normal path of dropping VirtualFile uses "close", use "close-by-replace" here to
// the normal path of dropping VirtualFile uses `Close`, use `CloseByReplace` here to
// distinguish the two.
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::CloseByReplace)
@@ -250,76 +284,47 @@ impl<T> MaybeFatalIo<T> for std::io::Result<T> {
}
}
/// Observe duration for the given storage I/O operation
///
/// Unlike `observe_closure_duration`, this supports async,
/// where "support" means that we measure wall clock time.
macro_rules! observe_duration {
($op:expr, $($body:tt)*) => {{
let instant = Instant::now();
let result = $($body)*;
let elapsed = instant.elapsed().as_secs_f64();
STORAGE_IO_TIME_METRIC
.get($op)
.observe(elapsed);
result
}}
}
macro_rules! with_file {
($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{
let $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
let mut $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
}
impl VirtualFile {
/// Open a file in read-only mode. Like File::open.
pub async fn open(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(path, OpenOptions::new().read(true)).await
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true);
Self::open_with_options_async(path, options).await
}
/// Create a new file for writing. If the file exists, it will be truncated.
/// Like File::create.
pub async fn create(path: &Utf8Path) -> Result<VirtualFile, std::io::Error> {
Self::open_with_options(
path,
OpenOptions::new().write(true).create(true).truncate(true),
)
.await
}
/// Open a file with given options.
///
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
pub async fn open_with_options(
path: &Utf8Path,
open_options: &OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
let path_str = path.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
let tenant_id;
let timeline_id;
if parts.len() > 5 && parts[parts.len() - 5] == TENANTS_SEGMENT_NAME {
tenant_id = parts[parts.len() - 4].to_string();
timeline_id = parts[parts.len() - 2].to_string();
} else {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot();
// NB: there is also StorageIoOperation::OpenAfterReplace which is for the case
// where our caller doesn't get to use the returned VirtualFile before its
// slot gets re-used by someone else.
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Open)
.observe_closure_duration(|| open_options.open(path))?;
// Strip all options other than read and write.
//
// It would perhaps be nicer to check just for the read and write flags
// explicitly, but OpenOptions doesn't contain any functions to read flags,
// only to set them.
let mut reopen_options = open_options.clone();
reopen_options.create(false);
reopen_options.create_new(false);
reopen_options.truncate(false);
let vfile = VirtualFile {
handle: RwLock::new(handle),
pos: 0,
path: path.to_path_buf(),
open_options: reopen_options,
tenant_id,
timeline_id,
};
// TODO: Under pressure, it's likely the slot will get re-used and
// the underlying file closed before they get around to using it.
// => https://github.com/neondatabase/neon/issues/6065
slot_guard.file.replace(file);
Ok(vfile)
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.write(true).create(true).truncate(true);
Self::open_with_options_async(path, options).await
}
/// Writes a file to the specified `final_path` in a crash safe fasion
@@ -339,15 +344,15 @@ impl VirtualFile {
));
};
std::fs::remove_file(tmp_path).or_else(fs_ext::ignore_not_found)?;
let mut file = Self::open_with_options(
tmp_path,
OpenOptions::new()
let mut file = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options
.write(true)
// Use `create_new` so that, if we race with ourselves or something else,
// we bail out instead of causing damage.
.create_new(true),
)
.await?;
.create_new(true);
Self::open_with_options_async(tmp_path, options).await?
};
file.write_all(content).await?;
file.sync_all().await?;
drop(file); // before the rename, that's important!
@@ -358,30 +363,91 @@ impl VirtualFile {
// the current `find_victim_slot` impl might pick the same slot for both
// VirtualFile., and it eventually does a blocking write lock instead of
// try_lock.
let final_parent_dirfd =
Self::open_with_options(final_path_parent, OpenOptions::new().read(true)).await?;
let final_parent_dirfd = {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true);
Self::open_with_options_async(final_path_parent, options).await?
};
final_parent_dirfd.sync_all().await?;
Ok(())
}
/// Open a file with given options.
///
/// Note: If any custom flags were set in 'open_options' through OpenOptionsExt,
/// they will be applied also when the file is subsequently re-opened, not only
/// on the first time. Make sure that's sane!
pub async fn open_with_options_async(
path: &Utf8Path,
open_options: tokio_epoll_uring::ops::open_at::OpenOptions,
) -> Result<VirtualFile, std::io::Error> {
let path_str = path.to_string();
let parts = path_str.split('/').collect::<Vec<&str>>();
let tenant_id;
let timeline_id;
if parts.len() > 5 && parts[parts.len() - 5] == "tenants" {
tenant_id = parts[parts.len() - 4].to_string();
timeline_id = parts[parts.len() - 2].to_string();
} else {
tenant_id = "*".to_string();
timeline_id = "*".to_string();
}
let (handle, mut slot_guard) = get_open_files().find_victim_slot().await;
slot_guard.file = Some(observe_duration!(StorageIoOperation::Open, {
let system = tokio_epoll_uring::thread_local_system().await;
let file: OwnedFd = system
.open(path, &open_options)
.await
.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
file
}));
// Strip all options other than read and write.
//
// It would perhaps be nicer to check just for the read and write flags
// explicitly, but OpenOptions doesn't contain any functions to read flags,
// only to set them.
let mut reopen_options = open_options;
reopen_options.create(false);
reopen_options.create_new(false);
reopen_options.truncate(false);
let vfile = VirtualFile {
handle: RwLock::new(handle),
pos: 0,
path: path.to_path_buf(),
open_options: reopen_options,
tenant_id,
timeline_id,
};
Ok(vfile)
}
/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
self.with_file(StorageIoOperation::Fsync, |file| file.sync_all())
.await?
with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard
.with_std_file(|std_file| std_file.sync_all()))
}
pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
self.with_file(StorageIoOperation::Metadata, |file| file.metadata())
.await?
with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard
.with_std_file(|std_file| std_file.metadata()))
}
/// Helper function that looks up the underlying File for this VirtualFile,
/// opening it and evicting some other File if necessary. It calls 'func'
/// with the physical File.
async fn with_file<F, R>(&self, op: StorageIoOperation, mut func: F) -> Result<R, Error>
where
F: FnMut(&File) -> R,
{
/// Helper function internal to `VirtualFile` that looks up the underlying File,
/// opens it and evicts some other File if necessary. The passed parameter is
/// assumed to be a function available for the physical `File`.
///
/// We are doing it via a macro as Rust doesn't support async closures that
/// take on parameters with lifetimes.
async fn lock_file(&self) -> Result<FileGuard, Error> {
let open_files = get_open_files();
let mut handle_guard = {
@@ -391,27 +457,23 @@ impl VirtualFile {
// We only need to hold the handle lock while we read the current handle. If
// another thread closes the file and recycles the slot for a different file,
// we will notice that the handle we read is no longer valid and retry.
let mut handle = *self.handle.read().unwrap();
let mut handle = *self.handle.read().await;
loop {
// Check if the slot contains our File
{
let slot = &open_files.slots[handle.index];
let slot_guard = slot.inner.read().unwrap();
if slot_guard.tag == handle.tag {
if let Some(file) = &slot_guard.file {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(file)));
}
let slot_guard = slot.inner.read().await;
if slot_guard.tag == handle.tag && slot_guard.file.is_some() {
// Found a cached file descriptor.
slot.recently_used.store(true, Ordering::Relaxed);
return Ok(FileGuard { slot_guard });
}
}
// The slot didn't contain our File. We will have to open it ourselves,
// but before that, grab a write lock on handle in the VirtualFile, so
// that no other thread will try to concurrently open the same file.
let handle_guard = self.handle.write().unwrap();
let handle_guard = self.handle.write().await;
// If another thread changed the handle while we were not holding the lock,
// then the handle might now be valid again. Loop back to retry.
@@ -425,20 +487,25 @@ impl VirtualFile {
// We need to open the file ourselves. The handle in the VirtualFile is
// now locked in write-mode. Find a free slot to put it in.
let (handle, mut slot_guard) = open_files.find_victim_slot();
let (handle, mut slot_guard) = open_files.find_victim_slot().await;
// Re-open the physical file.
// NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this
// case from StorageIoOperation::Open. This helps with identifying thrashing
// of the virtual file descriptor cache.
let file = STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::OpenAfterReplace)
.observe_closure_duration(|| self.open_options.open(&self.path))?;
// Perform the requested operation on it
let result = STORAGE_IO_TIME_METRIC
.get(op)
.observe_closure_duration(|| func(&file));
let file = observe_duration!(StorageIoOperation::OpenAfterReplace, {
let system = tokio_epoll_uring::thread_local_system().await;
let file: OwnedFd = system
.open(&self.path, &self.open_options)
.await
.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
file
});
// Store the File in the slot and update the handle in the VirtualFile
// to point to it.
@@ -446,7 +513,9 @@ impl VirtualFile {
*handle_guard = handle;
Ok(result)
return Ok(FileGuard {
slot_guard: slot_guard.downgrade(),
});
}
pub fn remove(self) {
@@ -461,11 +530,8 @@ impl VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = self
.with_file(StorageIoOperation::Seek, |mut file| {
file.seek(SeekFrom::End(offset))
})
.await??
self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
.with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
@@ -485,24 +551,48 @@ impl VirtualFile {
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#117-135
pub async fn read_exact_at(&self, mut buf: &mut [u8], mut offset: u64) -> Result<(), Error> {
while !buf.is_empty() {
match self.read_at(buf, offset).await {
Ok(0) => {
return Err(Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
}
pub async fn read_exact_at<B>(&self, buf: B, mut offset: u64) -> Result<B, Error>
where
B: IoBufMut + Send,
{
use tokio_epoll_uring::BoundedBuf;
let mut buf: tokio_epoll_uring::Slice<B> = buf.slice_full();
while buf.bytes_total() != 0 {
let res;
(buf, res) = self.read_at(buf, offset).await;
match res {
Ok(0) => break,
Ok(n) => {
buf = &mut buf[n..];
buf = buf.slice(n..);
offset += n as u64;
}
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {}
Err(e) => return Err(e),
}
}
Ok(())
if !buf.is_empty() {
Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"failed to fill whole buffer",
))
} else {
Ok(buf.into_inner())
}
}
/// Like [`Self::read_exact_at`] but for [`PageWriteGuard`].
pub async fn read_exact_at_page(
&self,
page: PageWriteGuard<'static>,
offset: u64,
) -> Result<PageWriteGuard<'static>, Error> {
let buf = PageWriteGuardBuf {
page,
init_up_to: 0,
};
let res = self.read_exact_at(buf, offset).await;
res.map(|PageWriteGuardBuf { page, .. }| page)
.map_err(|e| Error::new(ErrorKind::Other, e))
}
// Copied from https://doc.rust-lang.org/1.72.0/src/std/os/unix/fs.rs.html#219-235
@@ -552,23 +642,47 @@ impl VirtualFile {
Ok(n)
}
pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset))
.await?;
if let Ok(size) = result {
pub(crate) async fn read_at<B>(&self, buf: B, offset: u64) -> (B, Result<usize, Error>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
{
let file_guard = match self.lock_file().await {
Ok(file_guard) => file_guard,
Err(e) => return (buf, Err(e)),
};
let (buf, result) = observe_duration!(StorageIoOperation::Read, {
self.read_at0(file_guard, buf, offset).await
});
(buf, result)
}
async fn read_at0<B>(
&self,
file_guard: FileGuard,
buf: B,
offset: u64,
) -> (B, Result<usize, Error>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
{
let system = tokio_epoll_uring::thread_local_system().await;
let ((_file_guard, buf), res) = system.read(file_guard, offset, buf).await;
if let Ok(size) = res {
// TODO: don't use with_label_values on hot path
// https://github.com/neondatabase/neon/issues/6107
STORAGE_IO_SIZE
.with_label_values(&["read", &self.tenant_id, &self.timeline_id])
.add(size as i64);
}
result
(buf, res.map_err(|e| Error::new(ErrorKind::Other, e)))
}
async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = self
.with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset))
.await?;
let result = with_file!(self, StorageIoOperation::Write, |file_guard| {
file_guard.with_std_file(|std_file| std_file.write_at(buf, offset))
});
if let Ok(size) = result {
// TODO: don't use with_label_values on hot path
// https://github.com/neondatabase/neon/issues/6107
STORAGE_IO_SIZE
.with_label_values(&["write", &self.tenant_id, &self.timeline_id])
.add(size as i64);
@@ -577,6 +691,54 @@ impl VirtualFile {
}
}
struct FileGuard {
slot_guard: RwLockReadGuard<'static, SlotInner>,
}
impl AsRef<OwnedFd> for FileGuard {
fn as_ref(&self) -> &OwnedFd {
// This unwrap is safe because we only create `FileGuard`s
// if we know that the file is Some.
self.slot_guard.file.as_ref().unwrap()
}
}
impl FileGuard {
// TODO: switch to tokio-epoll-uring native operations.
fn with_std_file<F, R>(&self, with: F) -> R
where
F: FnOnce(&File) -> R,
{
// SAFETY:
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
// - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
let res = with(&file);
let _ = file.into_raw_fd();
res
}
// TODO: switch to tokio-epoll-uring native operations.
fn with_std_file_mut<F, R>(&mut self, with: F) -> R
where
F: FnOnce(&mut File) -> R,
{
// SAFETY:
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
// - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
let res = with(&mut file);
let _ = file.into_raw_fd();
res
}
}
impl tokio_epoll_uring::IoFd for FileGuard {
unsafe fn as_fd(&self) -> RawFd {
let owned_fd: &OwnedFd = self.as_ref();
owned_fd.as_raw_fd()
}
}
#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
@@ -584,16 +746,19 @@ impl VirtualFile {
blknum: u32,
) -> Result<crate::tenant::block_io::BlockLease<'_>, std::io::Error> {
use crate::page_cache::PAGE_SZ;
let mut buf = [0; PAGE_SZ];
self.read_exact_at(&mut buf, blknum as u64 * (PAGE_SZ as u64))
let buf = vec![0; PAGE_SZ];
let buf = self
.read_exact_at(buf, blknum as u64 * (PAGE_SZ as u64))
.await?;
Ok(std::sync::Arc::new(buf).into())
Ok(crate::tenant::block_io::BlockLease::Vec(buf))
}
async fn read_to_end(&mut self, buf: &mut Vec<u8>) -> Result<(), Error> {
let mut tmp = vec![0; 128];
loop {
let mut tmp = [0; 128];
match self.read_at(&mut tmp, self.pos).await {
let res;
(tmp, res) = self.read_at(tmp, self.pos).await;
match res {
Ok(0) => return Ok(()),
Ok(n) => {
self.pos += n as u64;
@@ -609,22 +774,41 @@ impl VirtualFile {
impl Drop for VirtualFile {
/// If a VirtualFile is dropped, close the underlying file if it was open.
fn drop(&mut self) {
let handle = self.handle.get_mut().unwrap();
let handle = self.handle.get_mut();
// We could check with a read-lock first, to avoid waiting on an
// unrelated I/O.
let slot = &get_open_files().slots[handle.index];
let mut slot_guard = slot.inner.write().unwrap();
if slot_guard.tag == handle.tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also operation "close-by-replace" for closes done on eviction for
// comparison.
if let Some(fd) = slot_guard.file.take() {
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(fd));
fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) {
if slot_guard.tag == tag {
slot.recently_used.store(false, Ordering::Relaxed);
// there is also the `CloseByReplace` operation for closes done on eviction for
// comparison.
if let Some(fd) = slot_guard.file.take() {
STORAGE_IO_TIME_METRIC
.get(StorageIoOperation::Close)
.observe_closure_duration(|| drop(fd));
}
}
}
// We don't have async drop so we cannot directly await the lock here.
// Instead, first do a best-effort attempt at closing the underlying
// file descriptor by using `try_write`, and if that fails, spawn
// a tokio task to do it asynchronously: we just want it to be
// cleaned up eventually.
// Most of the time, the `try_lock` should succeed though,
// as we have `&mut self` access. In other words, if the slot
// is still occupied by our file, there should be no access from
// other I/O operations; the only other possible place to lock
// the slot is the lock algorithm looking for free slots.
let slot = &get_open_files().slots[handle.index];
if let Ok(slot_guard) = slot.inner.try_write() {
clean_slot(slot, slot_guard, handle.tag);
} else {
let tag = handle.tag;
tokio::spawn(async move {
let slot_guard = slot.inner.write().await;
clean_slot(slot, slot_guard, tag);
});
};
}
}
@@ -654,6 +838,7 @@ pub fn init(num_slots: usize) {
if OPEN_FILES.set(OpenFiles::new(num_slots)).is_err() {
panic!("virtual_file::init called twice");
}
crate::metrics::virtual_file_descriptor_cache::SIZE_MAX.set(num_slots as u64);
}
const TEST_MAX_FILE_DESCRIPTORS: usize = 10;
@@ -697,10 +882,10 @@ mod tests {
}
impl MaybeVirtualFile {
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
async fn read_exact_at(&self, mut buf: Vec<u8>, offset: u64) -> Result<Vec<u8>, Error> {
match self {
MaybeVirtualFile::VirtualFile(file) => file.read_exact_at(buf, offset).await,
MaybeVirtualFile::File(file) => file.read_exact_at(buf, offset),
MaybeVirtualFile::File(file) => file.read_exact_at(&mut buf, offset).map(|()| buf),
}
}
async fn write_all_at(&self, buf: &[u8], offset: u64) -> Result<(), Error> {
@@ -742,14 +927,15 @@ mod tests {
// Helper function to slurp a portion of a file into a string
async fn read_string_at(&mut self, pos: u64, len: usize) -> Result<String, Error> {
let mut buf = vec![0; len];
self.read_exact_at(&mut buf, pos).await?;
let buf = vec![0; len];
let buf = self.read_exact_at(buf, pos).await?;
Ok(String::from_utf8(buf).unwrap())
}
}
#[tokio::test]
async fn test_virtual_files() -> Result<(), Error> {
async fn test_virtual_files() -> anyhow::Result<()> {
crate::tenant::harness::setup_logging();
// The real work is done in the test_files() helper function. This
// allows us to run the same set of tests against a native File, and
// VirtualFile. We trust the native Files and wouldn't need to test them,
@@ -758,23 +944,35 @@ mod tests {
// native files, you will run out of file descriptors if the ulimit
// is low enough.)
test_files("virtual_files", |path, open_options| async move {
let vf = VirtualFile::open_with_options(&path, &open_options).await?;
let vf = VirtualFile::open_with_options_async(&path, open_options).await?;
Ok(MaybeVirtualFile::VirtualFile(vf))
})
.await
}
#[tokio::test]
async fn test_physical_files() -> Result<(), Error> {
async fn test_physical_files() -> anyhow::Result<()> {
test_files("physical_files", |path, open_options| async move {
Ok(MaybeVirtualFile::File(open_options.open(path)?))
Ok(MaybeVirtualFile::File({
let system = tokio_epoll_uring::thread_local_system().await;
let owned_fd = system
.open(path, &open_options)
.await
.map_err(|e| match e {
tokio_epoll_uring::Error::Op(e) => e,
tokio_epoll_uring::Error::System(system) => {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
File::from(owned_fd)
}))
})
.await
}
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> Result<(), Error>
async fn test_files<OF, FT>(testname: &str, openfunc: OF) -> anyhow::Result<()>
where
OF: Fn(Utf8PathBuf, OpenOptions) -> FT,
OF: Fn(Utf8PathBuf, tokio_epoll_uring::ops::open_at::OpenOptions) -> FT,
FT: Future<Output = Result<MaybeVirtualFile, std::io::Error>>,
{
let testdir = crate::config::PageServerConf::test_repo_dir(testname);
@@ -783,7 +981,7 @@ mod tests {
let path_a = testdir.join("file_a");
let mut file_a = openfunc(
path_a.clone(),
OpenOptions::new()
tokio_epoll_uring::ops::open_at::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
@@ -796,7 +994,13 @@ mod tests {
let _ = file_a.read_string().await.unwrap_err();
// Close the file and re-open for reading
let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?;
let mut file_a = openfunc(
path_a,
tokio_epoll_uring::ops::open_at::OpenOptions::new()
.read(true)
.to_owned(),
)
.await?;
// cannot write to a file opened in read-only mode
let _ = file_a.write_all(b"bar").await.unwrap_err();
@@ -833,7 +1037,7 @@ mod tests {
let path_b = testdir.join("file_b");
let mut file_b = openfunc(
path_b.clone(),
OpenOptions::new()
tokio_epoll_uring::ops::open_at::OpenOptions::new()
.read(true)
.write(true)
.create(true)
@@ -854,8 +1058,13 @@ mod tests {
let mut vfiles = Vec::new();
for _ in 0..100 {
let mut vfile =
openfunc(path_b.clone(), OpenOptions::new().read(true).to_owned()).await?;
let mut vfile = openfunc(
path_b.clone(),
tokio_epoll_uring::ops::open_at::OpenOptions::new()
.read(true)
.to_owned(),
)
.await?;
assert_eq!("FOOBAR", vfile.read_string().await?);
vfiles.push(vfile);
}
@@ -900,8 +1109,12 @@ mod tests {
// Open the file many times.
let mut files = Vec::new();
for _ in 0..VIRTUAL_FILES {
let f = VirtualFile::open_with_options(&test_file_path, OpenOptions::new().read(true))
.await?;
let f = VirtualFile::open_with_options_async(&test_file_path, {
let mut options = tokio_epoll_uring::ops::open_at::OpenOptions::new();
options.read(true);
options
})
.await?;
files.push(f);
}
let files = Arc::new(files);
@@ -916,11 +1129,11 @@ mod tests {
for _threadno in 0..THREADS {
let files = files.clone();
let hdl = rt.spawn(async move {
let mut buf = [0u8; SIZE];
let mut buf = vec![0u8; SIZE];
let mut rng = rand::rngs::OsRng;
for _ in 1..1000 {
let f = &files[rng.gen_range(0..files.len())];
f.read_exact_at(&mut buf, 0).await.unwrap();
buf = f.read_exact_at(buf, 0).await.unwrap();
assert!(buf == SAMPLE);
}
});

View File

@@ -61,6 +61,7 @@ thiserror.workspace = true
tls-listener.workspace = true
tokio-postgres.workspace = true
tokio-rustls.workspace = true
tokio-util.workspace = true
tokio = { workspace = true, features = ["signal"] }
tracing-opentelemetry.workspace = true
tracing-subscriber.workspace = true
@@ -77,7 +78,6 @@ postgres-protocol.workspace = true
smol_str.workspace = true
workspace_hack.workspace = true
tokio-util.workspace = true
[dev-dependencies]
rcgen.workspace = true

View File

@@ -189,7 +189,7 @@ async fn auth_quirks(
let AuthInfo {
secret,
allowed_ips,
} = api.get_auth_info(extra, &info, latency_timer).await?;
} = api.get_auth_info(extra, &info).await?;
// check allowed list
if !check_peer_addr_is_in_list(&info.inner.peer_addr, &allowed_ips) {
@@ -255,9 +255,7 @@ async fn auth_and_wake_compute(
let mut num_retries = 0;
let mut node = loop {
let wake_res = api
.wake_compute(extra, &compute_credentials.info, latency_timer)
.await;
let wake_res = api.wake_compute(extra, &compute_credentials.info).await;
match handle_try_wake(wake_res, num_retries) {
Err(e) => {
error!(error = ?e, num_retries, retriable = false, "couldn't wake compute node");
@@ -390,13 +388,12 @@ impl BackendType<'_, ComputeUserInfo> {
pub async fn get_allowed_ips(
&self,
extra: &ConsoleReqExtra<'_>,
latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
use BackendType::*;
match self {
Console(api, creds) => api.get_allowed_ips(extra, creds, latency_timer).await,
Console(api, creds) => api.get_allowed_ips(extra, creds).await,
#[cfg(feature = "testing")]
Postgres(api, creds) => api.get_allowed_ips(extra, creds, latency_timer).await,
Postgres(api, creds) => api.get_allowed_ips(extra, creds).await,
Link(_) => Ok(Arc::new(vec![])),
#[cfg(test)]
Test(x) => x.get_allowed_ips(),
@@ -408,22 +405,13 @@ impl BackendType<'_, ComputeUserInfo> {
pub async fn wake_compute(
&self,
extra: &ConsoleReqExtra<'_>,
latency_timer: &mut LatencyTimer,
) -> Result<Option<CachedNodeInfo>, console::errors::WakeComputeError> {
use BackendType::*;
match self {
Console(api, creds) => {
api.wake_compute(extra, creds, latency_timer)
.map_ok(Some)
.await
}
Console(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
#[cfg(feature = "testing")]
Postgres(api, creds) => {
api.wake_compute(extra, creds, latency_timer)
.map_ok(Some)
.await
}
Postgres(api, creds) => api.wake_compute(extra, creds).map_ok(Some).await,
Link(_) => Ok(None),
#[cfg(test)]
Test(x) => x.wake_compute().map(Some),

View File

@@ -33,7 +33,7 @@ pub(super) async fn authenticate(
config.scram_protocol_timeout,
async {
// pause the timer while we communicate with the client
let _paused = latency_timer.wait_for_user();
let _paused = latency_timer.pause();
flow.begin(scram).await.map_err(|error| {
warn!(?error, "error sending scram acknowledgement");

View File

@@ -24,7 +24,7 @@ pub async fn authenticate_cleartext(
warn!("cleartext auth flow override is enabled, proceeding");
// pause the timer while we communicate with the client
let _paused = latency_timer.wait_for_user();
let _paused = latency_timer.pause();
let auth_outcome = AuthFlow::new(client)
.begin(auth::CleartextPassword(secret))
@@ -54,7 +54,7 @@ pub async fn password_hack_no_authentication(
warn!("project not specified, resorting to the password hack auth flow");
// pause the timer while we communicate with the client
let _paused = latency_timer.wait_for_user();
let _paused = latency_timer.pause();
let payload = AuthFlow::new(client)
.begin(auth::PasswordHack)

View File

@@ -3,7 +3,7 @@
use crate::{
auth::password_hack::parse_endpoint_param,
error::UserFacingError,
proxy::{neon_options, NUM_CONNECTION_ACCEPTED_BY_SNI},
proxy::{neon_options_str, NUM_CONNECTION_ACCEPTED_BY_SNI},
};
use itertools::Itertools;
use pq_proto::StartupMessageParams;
@@ -140,7 +140,7 @@ impl ClientCredentials {
let cache_key = format!(
"{}{}",
project.as_deref().unwrap_or(""),
neon_options(params).unwrap_or("".to_string())
neon_options_str(params)
)
.into();
@@ -406,10 +406,7 @@ mod tests {
let peer_addr = IpAddr::from([127, 0, 0, 1]);
let creds = ClientCredentials::parse(&options, sni, common_names, peer_addr)?;
assert_eq!(creds.project.as_deref(), Some("project"));
assert_eq!(
creds.cache_key,
"projectneon_endpoint_type:read_write neon_lsn:0/2"
);
assert_eq!(creds.cache_key, "projectendpoint_type:read_write lsn:0/2");
Ok(())
}

View File

@@ -8,6 +8,7 @@ use std::{net::SocketAddr, sync::Arc};
use futures::future::Either;
use itertools::Itertools;
use proxy::config::TlsServerEndPoint;
use proxy::proxy::run_until_cancelled;
use tokio::net::TcpListener;
use anyhow::{anyhow, bail, ensure, Context};
@@ -20,7 +21,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
use utils::{project_git_version, sentry_init::init_sentry};
use tracing::{error, info, warn, Instrument};
use tracing::{error, info, Instrument};
project_git_version!(GIT_VERSION);
@@ -151,63 +152,39 @@ async fn task_main(
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let mut connections = tokio::task::JoinSet::new();
let connections = tokio_util::task::task_tracker::TaskTracker::new();
loop {
tokio::select! {
accept_result = listener.accept() => {
let (socket, peer_addr) = accept_result?;
while let Some(accept_result) =
run_until_cancelled(listener.accept(), &cancellation_token).await
{
let (socket, peer_addr) = accept_result?;
let session_id = uuid::Uuid::new_v4();
let tls_config = Arc::clone(&tls_config);
let dest_suffix = Arc::clone(&dest_suffix);
let session_id = uuid::Uuid::new_v4();
let tls_config = Arc::clone(&tls_config);
let dest_suffix = Arc::clone(&dest_suffix);
connections.spawn(
async move {
socket
.set_nodelay(true)
.context("failed to set socket option")?;
connections.spawn(
async move {
socket
.set_nodelay(true)
.context("failed to set socket option")?;
info!(%peer_addr, "serving");
handle_client(dest_suffix, tls_config, tls_server_end_point, socket).await
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
})
.instrument(tracing::info_span!("handle_client", ?session_id))
);
info!(%peer_addr, "serving");
handle_client(dest_suffix, tls_config, tls_server_end_point, socket).await
}
// Don't modify this unless you read https://docs.rs/tokio/latest/tokio/macro.select.html carefully.
// If this future completes and the pattern doesn't match, this branch is disabled for this call to `select!`.
// This only counts for this loop and it will be enabled again on next `select!`.
//
// Prior code had this as `Some(Err(e))` which _looks_ equivalent to the current setup, but it's not.
// When `connections.join_next()` returned `Some(Ok(()))` (which we expect), it would disable the join_next and it would
// not get called again, even if there are more connections to remove.
Some(res) = connections.join_next() => {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
_ = cancellation_token.cancelled() => {
drop(listener);
break;
}
}
.unwrap_or_else(|e| {
// Acknowledge that the task has finished with an error.
error!("per-client task finished with an error: {e:#}");
})
.instrument(tracing::info_span!("handle_client", ?session_id)),
);
}
// Drain connections
info!("waiting for all client connections to finish");
while let Some(res) = connections.join_next().await {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
connections.close();
drop(listener);
connections.wait().await;
info!("all client connections have finished");
Ok(())
}

View File

@@ -1,6 +1,6 @@
use crate::{
auth::parse_endpoint_param, cancellation::CancelClosure, console::errors::WakeComputeError,
error::UserFacingError, proxy::is_neon_param,
error::UserFacingError, proxy::neon_option,
};
use futures::{FutureExt, TryFutureExt};
use itertools::Itertools;
@@ -275,7 +275,7 @@ fn filtered_options(params: &StartupMessageParams) -> Option<String> {
#[allow(unstable_name_collisions)]
let options: String = params
.options_raw()?
.filter(|opt| parse_endpoint_param(opt).is_none() && !is_neon_param(opt))
.filter(|opt| parse_endpoint_param(opt).is_none() && neon_option(opt).is_none())
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();

View File

@@ -6,9 +6,7 @@ use super::messages::MetricsAuxInfo;
use crate::{
auth::backend::ComputeUserInfo,
cache::{timed_lru, TimedLru},
compute,
proxy::LatencyTimer,
scram,
compute, scram,
};
use async_trait::async_trait;
use dashmap::DashMap;
@@ -203,7 +201,18 @@ pub struct ConsoleReqExtra<'a> {
pub session_id: uuid::Uuid,
/// Name of client application, if set.
pub application_name: Option<&'a str>,
pub options: Option<&'a str>,
pub options: Vec<(String, String)>,
}
impl<'a> ConsoleReqExtra<'a> {
// https://swagger.io/docs/specification/serialization/ DeepObject format
// paramName[prop1]=value1&paramName[prop2]=value2&....
pub fn options_as_deep_object(&self) -> Vec<(String, String)> {
self.options
.iter()
.map(|(k, v)| (format!("options[{}]", k), v.to_string()))
.collect()
}
}
/// Auth secret which is managed by the cloud.
@@ -252,14 +261,12 @@ pub trait Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<AuthInfo, errors::GetAuthInfoError>;
async fn get_allowed_ips(
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, errors::GetAuthInfoError>;
/// Wake up the compute node and return the corresponding connection info.
@@ -267,7 +274,6 @@ pub trait Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<CachedNodeInfo, errors::WakeComputeError>;
}

View File

@@ -6,7 +6,6 @@ use super::{
errors::{ApiError, GetAuthInfoError, WakeComputeError},
AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
};
use crate::proxy::LatencyTimer;
use crate::{auth::backend::ComputeUserInfo, compute, error::io_error, scram, url::ApiUrl};
use async_trait::async_trait;
use futures::TryFutureExt;
@@ -147,7 +146,6 @@ impl super::Api for Api {
&self,
_extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
_latency_timer: &mut LatencyTimer,
) -> Result<AuthInfo, GetAuthInfoError> {
self.do_get_auth_info(creds).await
}
@@ -156,7 +154,6 @@ impl super::Api for Api {
&self,
_extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
_latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
Ok(Arc::new(self.do_get_auth_info(creds).await?.allowed_ips))
}
@@ -166,7 +163,6 @@ impl super::Api for Api {
&self,
_extra: &ConsoleReqExtra<'_>,
_creds: &ComputeUserInfo,
_latency_timer: &mut LatencyTimer,
) -> Result<CachedNodeInfo, WakeComputeError> {
self.do_wake_compute()
.map_ok(CachedNodeInfo::new_uncached)

View File

@@ -5,7 +5,7 @@ use super::{
errors::{ApiError, GetAuthInfoError, WakeComputeError},
ApiCaches, ApiLocks, AuthInfo, AuthSecret, CachedNodeInfo, ConsoleReqExtra, NodeInfo,
};
use crate::proxy::{LatencyTimer, ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER};
use crate::proxy::{ALLOWED_IPS_BY_CACHE_OUTCOME, ALLOWED_IPS_NUMBER};
use crate::{auth::backend::ComputeUserInfo, compute, http, scram};
use async_trait::async_trait;
use futures::TryFutureExt;
@@ -106,7 +106,7 @@ impl Api {
) -> Result<NodeInfo, WakeComputeError> {
let request_id = uuid::Uuid::new_v4().to_string();
async {
let request = self
let mut request_builder = self
.endpoint
.get("proxy_wake_compute")
.header("X-Request-ID", &request_id)
@@ -115,9 +115,14 @@ impl Api {
.query(&[
("application_name", extra.application_name),
("project", Some(&creds.endpoint)),
("options", extra.options),
])
.build()?;
]);
request_builder = if extra.options.is_empty() {
request_builder
} else {
request_builder.query(&extra.options_as_deep_object())
};
let request = request_builder.build()?;
info!(url = request.url().as_str(), "sending http request");
let start = Instant::now();
@@ -158,9 +163,7 @@ impl super::Api for Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<AuthInfo, GetAuthInfoError> {
let _timer = latency_timer.control_plane();
self.do_get_auth_info(extra, creds).await
}
@@ -168,7 +171,6 @@ impl super::Api for Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<Arc<Vec<String>>, GetAuthInfoError> {
let key: &str = &creds.endpoint;
if let Some(allowed_ips) = self.caches.allowed_ips.get(key) {
@@ -180,11 +182,7 @@ impl super::Api for Api {
ALLOWED_IPS_BY_CACHE_OUTCOME
.with_label_values(&["miss"])
.inc();
let timer = latency_timer.control_plane();
let allowed_ips = Arc::new(self.do_get_auth_info(extra, creds).await?.allowed_ips);
drop(timer);
self.caches
.allowed_ips
.insert(key.into(), allowed_ips.clone());
@@ -196,7 +194,6 @@ impl super::Api for Api {
&self,
extra: &ConsoleReqExtra<'_>,
creds: &ComputeUserInfo,
latency_timer: &mut LatencyTimer,
) -> Result<CachedNodeInfo, WakeComputeError> {
let key: &str = &creds.inner.cache_key;
@@ -222,10 +219,7 @@ impl super::Api for Api {
}
}
let timer = latency_timer.control_plane();
let node = self.do_wake_compute(extra, creds).await?;
drop(timer);
let (_, cached) = self.caches.node_info.insert(key.clone(), node);
info!(key = &*key, "created a cache entry for compute node info");

View File

@@ -110,19 +110,6 @@ static COMPUTE_CONNECTION_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
.unwrap()
});
static CONTROL_PLANE_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"proxy_compute_connection_control_plane_latency_seconds",
"Time proxy spent talking to control-plane/console while trying to establish a connection to the compute endpoint",
// http/ws/tcp, true/false, true/false, success/failure
// 3 * 2 * 2 * 2 = 24 counters
&["protocol", "cache_miss", "pool_miss", "outcome"],
// largest bucket = 2^16 * 0.5ms = 32s
exponential_buckets(0.0005, 2.0, 16).unwrap(),
)
.unwrap()
});
pub static CONSOLE_REQUEST_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"proxy_console_request_latency",
@@ -187,10 +174,6 @@ pub struct LatencyTimer {
start: Option<Instant>,
// accumulated time on the stopwatch
accumulated: std::time::Duration,
// time since the stopwatch was started while talking to control-plane
start_cp: Option<Instant>,
// accumulated time on the stopwatch while talking to control-plane
accumulated_cp: std::time::Duration,
// label data
protocol: &'static str,
cache_miss: bool,
@@ -198,11 +181,7 @@ pub struct LatencyTimer {
outcome: &'static str,
}
pub struct LatencyTimerUserIO<'a> {
timer: &'a mut LatencyTimer,
}
pub struct LatencyTimerControlPlane<'a> {
pub struct LatencyTimerPause<'a> {
timer: &'a mut LatencyTimer,
}
@@ -211,8 +190,6 @@ impl LatencyTimer {
Self {
start: Some(Instant::now()),
accumulated: std::time::Duration::ZERO,
start_cp: None,
accumulated_cp: std::time::Duration::ZERO,
protocol,
cache_miss: false,
// by default we don't do pooling
@@ -222,17 +199,11 @@ impl LatencyTimer {
}
}
pub fn control_plane(&mut self) -> LatencyTimerControlPlane<'_> {
// start the stopwatch again
self.start = Some(Instant::now());
LatencyTimerControlPlane { timer: self }
}
pub fn wait_for_user(&mut self) -> LatencyTimerUserIO<'_> {
pub fn pause(&mut self) -> LatencyTimerPause<'_> {
// stop the stopwatch and record the time that we have accumulated
let start = self.start.take().expect("latency timer should be started");
self.accumulated += start.elapsed();
LatencyTimerUserIO { timer: self }
LatencyTimerPause { timer: self }
}
pub fn cache_miss(&mut self) {
@@ -248,25 +219,13 @@ impl LatencyTimer {
}
}
impl Drop for LatencyTimerUserIO<'_> {
impl Drop for LatencyTimerPause<'_> {
fn drop(&mut self) {
// start the stopwatch again
self.timer.start = Some(Instant::now());
}
}
impl Drop for LatencyTimerControlPlane<'_> {
fn drop(&mut self) {
// stop the control-plane stopwatch and record the time that we have accumulated
let start = self
.timer
.start_cp
.take()
.expect("latency timer should be started");
self.timer.accumulated_cp += start.elapsed();
}
}
impl Drop for LatencyTimer {
fn drop(&mut self) {
let duration =
@@ -278,21 +237,7 @@ impl Drop for LatencyTimer {
bool_to_str(self.pool_miss),
self.outcome,
])
.observe(duration.as_secs_f64());
let duration_cp = self
.start_cp
.map(|start| start.elapsed())
.unwrap_or_default()
+ self.accumulated_cp;
CONTROL_PLANE_LATENCY
.with_label_values(&[
self.protocol,
bool_to_str(self.cache_miss),
bool_to_str(self.pool_miss),
self.outcome,
])
.observe(duration_cp.as_secs_f64());
.observe(duration.as_secs_f64())
}
}
@@ -332,6 +277,21 @@ static NUM_BYTES_PROXIED_COUNTER: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});
pub async fn run_until_cancelled<F: std::future::Future>(
f: F,
cancellation_token: &CancellationToken,
) -> Option<F::Output> {
match futures::future::select(
std::pin::pin!(f),
std::pin::pin!(cancellation_token.cancelled()),
)
.await
{
futures::future::Either::Left((f, _)) => Some(f),
futures::future::Either::Right(((), _)) => None,
}
}
pub async fn task_main(
config: &'static ProxyConfig,
listener: tokio::net::TcpListener,
@@ -345,71 +305,62 @@ pub async fn task_main(
// will be inherited by all accepted client sockets.
socket2::SockRef::from(&listener).set_keepalive(true)?;
let mut connections = tokio::task::JoinSet::new();
let connections = tokio_util::task::task_tracker::TaskTracker::new();
let cancel_map = Arc::new(CancelMap::default());
loop {
tokio::select! {
accept_result = listener.accept() => {
let (socket, peer_addr) = accept_result?;
while let Some(accept_result) =
run_until_cancelled(listener.accept(), &cancellation_token).await
{
let (socket, peer_addr) = accept_result?;
let session_id = uuid::Uuid::new_v4();
let cancel_map = Arc::clone(&cancel_map);
connections.spawn(
async move {
info!("accepted postgres client connection");
let session_id = uuid::Uuid::new_v4();
let cancel_map = Arc::clone(&cancel_map);
connections.spawn(
async move {
info!("accepted postgres client connection");
let mut socket = WithClientIp::new(socket);
let mut peer_addr = peer_addr;
if let Some(ip) = socket.wait_for_addr().await? {
peer_addr = ip;
tracing::Span::current().record("peer_addr", &tracing::field::display(ip));
} else if config.require_client_ip {
bail!("missing required client IP");
}
socket
.inner
.set_nodelay(true)
.context("failed to set socket option")?;
handle_client(config, &cancel_map, session_id, socket, ClientMode::Tcp, peer_addr.ip()).await
}
.instrument(info_span!("handle_client", ?session_id, peer_addr = tracing::field::Empty))
.unwrap_or_else(move |e| {
// Acknowledge that the task has finished with an error.
error!(?session_id, "per-client task finished with an error: {e:#}");
}),
);
}
// Don't modify this unless you read https://docs.rs/tokio/latest/tokio/macro.select.html carefully.
// If this future completes and the pattern doesn't match, this branch is disabled for this call to `select!`.
// This only counts for this loop and it will be enabled again on next `select!`.
//
// Prior code had this as `Some(Err(e))` which _looks_ equivalent to the current setup, but it's not.
// When `connections.join_next()` returned `Some(Ok(()))` (which we expect), it would disable the join_next and it would
// not get called again, even if there are more connections to remove.
Some(res) = connections.join_next() => {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
let mut socket = WithClientIp::new(socket);
let mut peer_addr = peer_addr;
if let Some(ip) = socket.wait_for_addr().await? {
peer_addr = ip;
tracing::Span::current().record("peer_addr", &tracing::field::display(ip));
} else if config.require_client_ip {
bail!("missing required client IP");
}
socket
.inner
.set_nodelay(true)
.context("failed to set socket option")?;
handle_client(
config,
&cancel_map,
session_id,
socket,
ClientMode::Tcp,
peer_addr.ip(),
)
.await
}
_ = cancellation_token.cancelled() => {
drop(listener);
break;
}
}
.instrument(info_span!(
"handle_client",
?session_id,
peer_addr = tracing::field::Empty
))
.unwrap_or_else(move |e| {
// Acknowledge that the task has finished with an error.
error!(?session_id, "per-client task finished with an error: {e:#}");
}),
);
}
connections.close();
drop(listener);
// Drain connections
while let Some(res) = connections.join_next().await {
if let Err(e) = res {
if !e.is_panic() && !e.is_cancelled() {
warn!("unexpected error from joined connection task: {e:?}");
}
}
}
connections.wait().await;
Ok(())
}
@@ -750,13 +701,9 @@ where
info!("compute node's state has likely changed; requesting a wake-up");
let node_info = loop {
let wake_res = match creds {
auth::BackendType::Console(api, creds) => {
api.wake_compute(extra, creds, &mut latency_timer).await
}
auth::BackendType::Console(api, creds) => api.wake_compute(extra, creds).await,
#[cfg(feature = "testing")]
auth::BackendType::Postgres(api, creds) => {
api.wake_compute(extra, creds, &mut latency_timer).await
}
auth::BackendType::Postgres(api, creds) => api.wake_compute(extra, creds).await,
// nothing to do?
auth::BackendType::Link(_) => return Err(err.into()),
// test backend
@@ -1021,12 +968,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
allow_self_signed_compute,
} = self;
let console_options = neon_options(params);
let extra = console::ConsoleReqExtra {
session_id, // aka this connection's id
application_name: params.get("application_name"),
options: console_options.as_deref(),
options: neon_options(params),
};
let mut latency_timer = LatencyTimer::new(mode.protocol_label());
@@ -1086,26 +1031,29 @@ impl<S: AsyncRead + AsyncWrite + Unpin> Client<'_, S> {
}
}
pub fn neon_options(params: &StartupMessageParams) -> Option<String> {
pub fn neon_options(params: &StartupMessageParams) -> Vec<(String, String)> {
#[allow(unstable_name_collisions)]
let options: String = params
.options_raw()?
.filter(|opt| is_neon_param(opt))
.sorted() // we sort it to use as cache key
.intersperse(" ") // TODO: use impl from std once it's stabilized
.collect();
// Don't even bother with empty options.
if options.is_empty() {
return None;
match params.options_raw() {
Some(options) => options.filter_map(neon_option).collect(),
None => vec![],
}
Some(options)
}
pub fn is_neon_param(bytes: &str) -> bool {
pub fn neon_options_str(params: &StartupMessageParams) -> String {
#[allow(unstable_name_collisions)]
neon_options(params)
.iter()
.map(|(k, v)| format!("{}:{}", k, v))
.sorted() // we sort it to use as cache key
.intersperse(" ".to_owned())
.collect()
}
pub fn neon_option(bytes: &str) -> Option<(String, String)> {
static RE: OnceCell<Regex> = OnceCell::new();
RE.get_or_init(|| Regex::new(r"^neon_\w+:").unwrap());
let re = RE.get_or_init(|| Regex::new(r"^neon_(\w+):(.+)").unwrap());
RE.get().unwrap().is_match(bytes)
let cap = re.captures(bytes)?;
let (_, [k, v]) = cap.extract();
Some((k.to_owned(), v.to_owned()))
}

View File

@@ -491,7 +491,7 @@ fn helper_create_connect_info(
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some("TEST"),
options: None,
options: vec![],
};
let creds = auth::BackendType::Test(mechanism);
(cache, extra, creds)

View File

@@ -10,6 +10,7 @@ use anyhow::bail;
use hyper::StatusCode;
pub use reqwest_middleware::{ClientWithMiddleware, Error};
pub use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio_util::task::TaskTracker;
use crate::protocol2::{ProxyProtocolAccept, WithClientIp};
use crate::proxy::{NUM_CLIENT_CONNECTION_CLOSED_COUNTER, NUM_CLIENT_CONNECTION_OPENED_COUNTER};
@@ -70,6 +71,9 @@ pub async fn task_main(
incoming: addr_incoming,
};
let ws_connections = tokio_util::task::task_tracker::TaskTracker::new();
ws_connections.close(); // allows `ws_connections.wait to complete`
let tls_listener = TlsListener::new(tls_acceptor, addr_incoming).filter(|conn| {
if let Err(err) = conn {
error!("failed to accept TLS connection for websockets: {err:?}");
@@ -86,6 +90,7 @@ pub async fn task_main(
let remote_addr = io.inner.remote_addr();
let sni_name = tls.server_name().map(|s| s.to_string());
let conn_pool = conn_pool.clone();
let ws_connections = ws_connections.clone();
async move {
let peer_addr = match client_addr {
@@ -97,6 +102,7 @@ pub async fn task_main(
move |req: Request<Body>| {
let sni_name = sni_name.clone();
let conn_pool = conn_pool.clone();
let ws_connections = ws_connections.clone();
async move {
let cancel_map = Arc::new(CancelMap::default());
@@ -106,6 +112,7 @@ pub async fn task_main(
req,
config,
conn_pool,
ws_connections,
cancel_map,
session_id,
sni_name,
@@ -129,6 +136,9 @@ pub async fn task_main(
.with_graceful_shutdown(cancellation_token.cancelled())
.await?;
// await websocket connections
ws_connections.wait().await;
Ok(())
}
@@ -170,10 +180,12 @@ where
}
}
#[allow(clippy::too_many_arguments)]
async fn request_handler(
mut request: Request<Body>,
config: &'static ProxyConfig,
conn_pool: Arc<conn_pool::GlobalConnPool>,
ws_connections: TaskTracker,
cancel_map: Arc<CancelMap>,
session_id: uuid::Uuid,
sni_hostname: Option<String>,
@@ -193,7 +205,7 @@ async fn request_handler(
let (response, websocket) = hyper_tungstenite::upgrade(&mut request, None)
.map_err(|e| ApiError::BadRequest(e.into()))?;
tokio::spawn(
ws_connections.spawn(
async move {
if let Err(e) = websocket::serve_websocket(
websocket,

View File

@@ -405,7 +405,7 @@ async fn connect_to_compute(
conn_info: &ConnInfo,
conn_id: uuid::Uuid,
session_id: uuid::Uuid,
mut latency_timer: LatencyTimer,
latency_timer: LatencyTimer,
peer_addr: IpAddr,
) -> anyhow::Result<ClientInner> {
let tls = config.tls_config.as_ref();
@@ -433,17 +433,17 @@ async fn connect_to_compute(
let extra = console::ConsoleReqExtra {
session_id: uuid::Uuid::new_v4(),
application_name: Some(APP_NAME),
options: console_options.as_deref(),
options: console_options,
};
// TODO(anna): this is a bit hacky way, consider using console notification listener.
if !config.disable_ip_check_for_http {
let allowed_ips = backend.get_allowed_ips(&extra, &mut latency_timer).await?;
let allowed_ips = backend.get_allowed_ips(&extra).await?;
if !check_peer_addr_is_in_list(&peer_addr, &allowed_ips) {
return Err(auth::AuthError::ip_address_not_allowed().into());
}
}
let node_info = backend
.wake_compute(&extra, &mut latency_timer)
.wake_compute(&extra)
.await?
.context("missing cache entry from wake_compute")?;

View File

@@ -56,6 +56,7 @@ from fixtures.remote_storage import (
RemoteStorageKind,
RemoteStorageUser,
S3Storage,
default_remote_storage,
remote_storage_to_toml_inline_table,
)
from fixtures.types import Lsn, TenantId, TimelineId
@@ -468,7 +469,7 @@ class NeonEnvBuilder:
# Cannot create more than one environment from one builder
assert self.env is None, "environment already initialized"
if default_remote_storage_if_missing and self.pageserver_remote_storage is None:
self.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
self.enable_pageserver_remote_storage(default_remote_storage())
self.env = NeonEnv(self)
return self.env

View File

@@ -372,6 +372,13 @@ def s3_storage() -> RemoteStorageKind:
return RemoteStorageKind.MOCK_S3
def default_remote_storage() -> RemoteStorageKind:
"""
The remote storage kind used in tests that do not specify a preference
"""
return RemoteStorageKind.LOCAL_FS
# serialize as toml inline table
def remote_storage_to_toml_inline_table(remote_storage: RemoteStorage) -> str:
if not isinstance(remote_storage, (LocalFsStorage, S3Storage)):

View File

@@ -35,6 +35,11 @@ def test_gc_cutoff(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
pageserver_http.configure_failpoints(("after-timeline-gc-removed-layers", "exit"))
# Because this test does a rapid series of restarts of the same node, it's possible that
# we are restarted again before we can clean up deletion lists form the previous generation,
# resulting in a subsequent startup logging a warning.
env.pageserver.allowed_errors.append(".*Dropping stale deletions for tenant.*")
for _ in range(5):
with pytest.raises(subprocess.SubprocessError):
pg_bin.run_capture(["pgbench", "-P1", "-N", "-c5", "-T500", "-Mprepared", connstr])

View File

@@ -5,7 +5,6 @@ import time
from collections import defaultdict
from typing import Any, DefaultDict, Dict, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
@@ -19,7 +18,7 @@ from fixtures.pageserver.utils import (
wait_for_upload,
wait_for_upload_queue_empty,
)
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn
from fixtures.utils import query_scalar, wait_until
@@ -45,13 +44,7 @@ def get_num_downloaded_layers(client: PageserverHttpClient):
# If you have a large relation, check that the pageserver downloads parts of it as
# require by queries.
#
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_ondemand_download_large_rel(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
def test_ondemand_download_large_rel(neon_env_builder: NeonEnvBuilder):
# thinking about using a shared environment? the test assumes that global
# metrics are for single tenant.
env = neon_env_builder.init_start(
@@ -145,13 +138,7 @@ def test_ondemand_download_large_rel(
# If you have a relation with a long history of updates, the pageserver downloads the layer
# files containing the history as needed by timetravel queries.
#
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_ondemand_download_timetravel(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
def test_ondemand_download_timetravel(neon_env_builder: NeonEnvBuilder):
# thinking about using a shared environment? the test assumes that global
# metrics are for single tenant.
@@ -229,8 +216,7 @@ def test_ondemand_download_timetravel(
assert filled_current_physical == filled_size, "we don't yet do layer eviction"
# Wait until generated image layers are uploaded to S3
if remote_storage_kind is not None:
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, timeline_id)
wait_for_upload_queue_empty(pageserver_http, env.initial_tenant, timeline_id)
env.pageserver.stop()

View File

@@ -314,6 +314,10 @@ def test_creating_tenant_conf_after_attach(neon_env_builder: NeonEnvBuilder):
assert not config_path.exists(), "detach did not remove config file"
# The re-attach's increment of the generation number may invalidate deletion queue
# updates in flight from the previous attachment.
env.pageserver.allowed_errors.append(".*Dropped remote consistent LSN updates.*")
env.pageserver.tenant_attach(tenant_id)
wait_until(
number_of_iterations=5,

View File

@@ -23,23 +23,18 @@ from fixtures.pageserver.utils import (
wait_until_tenant_active,
wait_until_tenant_state,
)
from fixtures.remote_storage import (
RemoteStorageKind,
available_remote_storages,
available_s3_storages,
)
from fixtures.remote_storage import RemoteStorageKind, available_s3_storages, s3_storage
from fixtures.types import TenantId
from fixtures.utils import run_pg_bench_small, wait_until
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_tenant_delete_smoke(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_bin: PgBin,
):
neon_env_builder.pageserver_config_override = "test_remote_failures=1"
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
@@ -78,16 +73,15 @@ def test_tenant_delete_smoke(
run_pg_bench_small(pg_bin, endpoint.connstr())
wait_for_last_flush_lsn(env, endpoint, tenant=tenant_id, timeline=timeline_id)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
parent = timeline
@@ -100,16 +94,15 @@ def test_tenant_delete_smoke(
tenant_path = env.pageserver.tenant_dir(tenant_id)
assert not tenant_path.exists()
if remote_storage_kind in available_s3_storages():
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
# Deletion updates the tenant count: the one default tenant remains
assert ps_http.get_metric_value("pageserver_tenant_manager_slots") == 1
@@ -149,9 +142,7 @@ FAILPOINTS_BEFORE_BACKGROUND = [
def combinations():
result = []
remotes = [RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
remotes.append(RemoteStorageKind.REAL_S3)
remotes = available_s3_storages()
for remote_storage_kind in remotes:
for delete_failpoint in FAILPOINTS:
@@ -165,8 +156,8 @@ def combinations():
return result
@pytest.mark.parametrize("remote_storage_kind, failpoint, simulate_failures", combinations())
@pytest.mark.parametrize("check", list(Check))
@pytest.mark.parametrize("remote_storage_kind, failpoint, simulate_failures", combinations())
def test_delete_tenant_exercise_crash_safety_failpoints(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
@@ -214,16 +205,15 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
run_pg_bench_small(pg_bin, endpoint.connstr())
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
ps_http.configure_failpoints((failpoint, "return"))
@@ -276,25 +266,23 @@ def test_delete_tenant_exercise_crash_safety_failpoints(
assert not tenant_dir.exists()
# Check remote is empty
if remote_storage_kind in available_s3_storages():
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
allowed_postfix="initdb.tar.zst",
)
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
allowed_postfix="initdb.tar.zst",
)
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_tenant_delete_is_resumed_on_attach(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_bin: PgBin,
):
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
@@ -314,16 +302,15 @@ def test_tenant_delete_is_resumed_on_attach(
wait_for_last_flush_lsn(env, endpoint, tenant=tenant_id, timeline=timeline_id)
# sanity check, data should be there
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
# failpoint before we remove index_part from s3
failpoint = "timeline-delete-before-index-delete"
@@ -354,16 +341,15 @@ def test_tenant_delete_is_resumed_on_attach(
iterations=iterations,
)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
reason = tenant_info["state"]["data"]["reason"]
# failpoint may not be the only error in the stack
@@ -389,17 +375,16 @@ def test_tenant_delete_is_resumed_on_attach(
tenant_path = env.pageserver.tenant_dir(tenant_id)
assert not tenant_path.exists()
if remote_storage_kind in available_s3_storages():
ps_http.deletion_queue_flush(execute=True)
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
ps_http.deletion_queue_flush(execute=True)
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
)
),
)
def test_long_timeline_create_cancelled_by_tenant_delete(neon_env_builder: NeonEnvBuilder):

View File

@@ -21,7 +21,6 @@ from fixtures.pageserver.utils import (
)
from fixtures.remote_storage import (
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, wait_until
@@ -59,16 +58,11 @@ class ReattachMode(str, enum.Enum):
# Basic detach and re-attach test
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@pytest.mark.parametrize(
"mode",
[ReattachMode.REATTACH_EXPLICIT, ReattachMode.REATTACH_RESET, ReattachMode.REATTACH_RESET_DROP],
)
def test_tenant_reattach(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind, mode: str
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
def test_tenant_reattach(neon_env_builder: NeonEnvBuilder, mode: str):
# Exercise retry code path by making all uploads and downloads fail for the
# first time. The retries print INFO-messages to the log; we will check
# that they are present after the test.
@@ -187,16 +181,13 @@ num_rows = 100000
#
# I don't know what's causing that...
@pytest.mark.skip(reason="fixme")
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_tenant_reattach_while_busy(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
updates_started = 0
updates_finished = 0
updates_to_perform = 0
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
# Run random UPDATEs on test table. On failure, try again.
@@ -439,13 +430,9 @@ def test_tenant_detach_regular_tenant(neon_simple_env: NeonEnv):
should not be present in pageserver's memory"
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_detach_while_attaching(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
##### First start, insert secret data and upload it to the remote storage
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()

View File

@@ -20,7 +20,6 @@ from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import (
LocalFsStorage,
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import (
@@ -449,13 +448,9 @@ def test_tenant_relocation(
# last-record LSN. We had a bug where GetPage incorrectly followed the
# timeline to the ancestor without waiting for the missing WAL to
# arrive.
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_emergency_relocate_with_branches_slow_replay(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
env.pageserver.is_testing_enabled_or_skip()
pageserver_http = env.pageserver.http_client()
@@ -603,13 +598,9 @@ def test_emergency_relocate_with_branches_slow_replay(
# exist. Update dbir" path (2), and inserts an entry in the
# DbDirectory with 'false' to indicate there is no PG_VERSION file.
#
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_emergency_relocate_with_branches_createdb(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start()
pageserver_http = env.pageserver.http_client()

View File

@@ -18,7 +18,7 @@ from fixtures.neon_fixtures import (
NeonEnvBuilder,
)
from fixtures.pageserver.utils import timeline_delete_wait_completed
from fixtures.remote_storage import RemoteStorageKind, available_remote_storages
from fixtures.remote_storage import RemoteStorageKind
from fixtures.types import Lsn, TenantId
from fixtures.utils import wait_until
from prometheus_client.samples import Sample
@@ -281,13 +281,7 @@ def test_pageserver_metrics_removed_after_detach(neon_env_builder: NeonEnvBuilde
assert post_detach_samples == set()
# Check that empty tenants work with or without the remote storage
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_pageserver_with_empty_tenants(
neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
def test_pageserver_with_empty_tenants(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(

View File

@@ -11,7 +11,6 @@ import os
from pathlib import Path
from typing import List, Tuple
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
Endpoint,
@@ -27,7 +26,6 @@ from fixtures.pageserver.utils import (
from fixtures.remote_storage import (
LocalFsStorage,
RemoteStorageKind,
available_remote_storages,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, wait_until
@@ -60,10 +58,7 @@ async def all_tenants_workload(env: NeonEnv, tenants_endpoints):
await asyncio.gather(*workers)
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_tenants_many(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
def test_tenants_many(neon_env_builder: NeonEnvBuilder):
env = neon_env_builder.init_start()
# FIXME: Is this expected?
@@ -218,11 +213,7 @@ def test_tenants_attached_after_download(neon_env_builder: NeonEnvBuilder):
def test_tenant_redownloads_truncated_file_on_startup(
neon_env_builder: NeonEnvBuilder,
):
remote_storage_kind = RemoteStorageKind.LOCAL_FS
# since we now store the layer file length metadata, we notice on startup that a layer file is of wrong size, and proceed to redownload it.
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
# we store the layer file length metadata, we notice on startup that a layer file is of wrong size, and proceed to redownload it.
env = neon_env_builder.init_start()
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)

View File

@@ -29,8 +29,7 @@ from fixtures.pageserver.utils import (
from fixtures.remote_storage import (
LocalFsStorage,
RemoteStorageKind,
available_remote_storages,
available_s3_storages,
s3_storage,
)
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import query_scalar, run_pg_bench_small, wait_until
@@ -142,25 +141,11 @@ DELETE_FAILPOINTS = [
]
def combinations():
result = []
remotes = [RemoteStorageKind.MOCK_S3]
if os.getenv("ENABLE_REAL_S3_REMOTE_STORAGE"):
remotes.append(RemoteStorageKind.REAL_S3)
for remote_storage_kind in remotes:
for delete_failpoint in DELETE_FAILPOINTS:
result.append((remote_storage_kind, delete_failpoint))
return result
# cover the two cases: remote storage configured vs not configured
@pytest.mark.parametrize("remote_storage_kind, failpoint", combinations())
@pytest.mark.parametrize("failpoint", DELETE_FAILPOINTS)
@pytest.mark.parametrize("check", list(Check))
def test_delete_timeline_exercise_crash_safety_failpoints(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
failpoint: str,
check: Check,
pg_bin: PgBin,
@@ -180,7 +165,7 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
7. Ensure failpoint is hit
8. Retry or restart without the failpoint and check the result.
"""
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start(
@@ -201,18 +186,17 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
env.pageserver.allowed_errors.append(f".*{timeline_id}.*failpoint: {failpoint}")
# It appears when we stopped flush loop during deletion and then pageserver is stopped
@@ -316,11 +300,9 @@ def test_delete_timeline_exercise_crash_safety_failpoints(
assert not (timeline_dir.parent / f"{timeline_id}.___deleted").exists()
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
@pytest.mark.parametrize("fill_branch", [True, False])
def test_timeline_resurrection_on_attach(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
fill_branch: bool,
):
"""
@@ -329,8 +311,6 @@ def test_timeline_resurrection_on_attach(
Original issue: https://github.com/neondatabase/neon/issues/3560
"""
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
##### First start, insert data and upload it to the remote storage
env = neon_env_builder.init_start()
@@ -658,20 +638,10 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
wait_timeline_detail_404(ps_http, env.initial_tenant, child_timeline_id, iterations=2)
@pytest.mark.parametrize(
"remote_storage_kind",
list(
filter(
lambda s: s in (RemoteStorageKind.MOCK_S3, RemoteStorageKind.REAL_S3),
available_remote_storages(),
)
),
)
def test_timeline_delete_works_for_remote_smoke(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
):
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
env = neon_env_builder.init_start()
@@ -804,12 +774,11 @@ def test_delete_orphaned_objects(
assert env.pageserver_remote_storage.index_path(env.initial_tenant, timeline_id).exists()
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_timeline_delete_resumed_on_attach(
neon_env_builder: NeonEnvBuilder,
remote_storage_kind: RemoteStorageKind,
pg_bin: PgBin,
):
remote_storage_kind = s3_storage()
neon_env_builder.enable_pageserver_remote_storage(remote_storage_kind)
env = neon_env_builder.init_start(initial_tenant_conf=MANY_SMALL_LAYERS_TENANT_CONFIG)
@@ -824,18 +793,17 @@ def test_timeline_delete_resumed_on_attach(
run_pg_bench_small(pg_bin, endpoint.connstr())
last_flush_lsn_upload(env, endpoint, env.initial_tenant, timeline_id)
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(env.initial_tenant),
"timelines",
str(timeline_id),
)
),
)
# failpoint before we remove index_part from s3
failpoint = "timeline-delete-during-rm"
@@ -873,18 +841,17 @@ def test_timeline_delete_resumed_on_attach(
# failpoint may not be the only error in the stack
assert reason.endswith(f"failpoint: {failpoint}"), reason
if remote_storage_kind in available_s3_storages():
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
"timelines",
str(timeline_id),
)
),
)
assert_prefix_not_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(tenant_id),
"timelines",
str(timeline_id),
)
),
)
# now we stop pageserver and remove local tenant state
env.endpoints.stop_all()
@@ -905,15 +872,14 @@ def test_timeline_delete_resumed_on_attach(
tenant_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
assert not tenant_path.exists()
if remote_storage_kind in available_s3_storages():
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(timeline_id),
"timelines",
str(timeline_id),
)
),
)
assert_prefix_empty(
neon_env_builder,
prefix="/".join(
(
"tenants",
str(timeline_id),
"timelines",
str(timeline_id),
)
),
)

View File

@@ -39,10 +39,7 @@ from fixtures.pageserver.utils import (
)
from fixtures.pg_version import PgVersion
from fixtures.port_distributor import PortDistributor
from fixtures.remote_storage import (
RemoteStorageKind,
available_remote_storages,
)
from fixtures.remote_storage import RemoteStorageKind, default_remote_storage
from fixtures.types import Lsn, TenantId, TimelineId
from fixtures.utils import get_dir_size, query_scalar, start_in_background
@@ -457,10 +454,9 @@ def is_wal_trimmed(sk: Safekeeper, tenant_id: TenantId, timeline_id: TimelineId,
return sk_wal_size_mb <= target_size_mb
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
def test_wal_backup(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
env = neon_env_builder.init_start()
@@ -503,11 +499,10 @@ def test_wal_backup(neon_env_builder: NeonEnvBuilder, remote_storage_kind: Remot
)
@pytest.mark.parametrize("remote_storage_kind", available_remote_storages())
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder, remote_storage_kind: RemoteStorageKind):
def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 3
neon_env_builder.enable_safekeeper_remote_storage(remote_storage_kind)
neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage())
env = neon_env_builder.init_start()
tenant_id = env.initial_tenant

View File

@@ -53,7 +53,8 @@ num-traits = { version = "0.2", features = ["i128"] }
prost = { version = "0.11" }
rand = { version = "0.8", features = ["small_rng"] }
regex = { version = "1" }
regex-syntax = { version = "0.7" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.8" }
reqwest = { version = "0.11", default-features = false, features = ["blocking", "default-tls", "json", "multipart", "rustls-tls", "stream"] }
ring = { version = "0.16", features = ["std"] }
rustls = { version = "0.21", features = ["dangerous_configuration"] }
@@ -65,7 +66,7 @@ subtle = { version = "2" }
time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] }
tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] }
tokio-rustls = { version = "0.24" }
tokio-util = { version = "0.7", features = ["codec", "compat", "io"] }
tokio-util = { version = "0.7", features = ["codec", "compat", "io", "rt"] }
toml_datetime = { version = "0.6", default-features = false, features = ["serde"] }
toml_edit = { version = "0.19", features = ["serde"] }
tower = { version = "0.4", default-features = false, features = ["balance", "buffer", "limit", "log", "timeout", "util"] }
@@ -90,7 +91,8 @@ memchr = { version = "2" }
nom = { version = "7" }
prost = { version = "0.11" }
regex = { version = "1" }
regex-syntax = { version = "0.7" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] }
regex-syntax = { version = "0.8" }
serde = { version = "1", features = ["alloc", "derive"] }
syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit"] }
syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit", "visit-mut"] }