Compare commits

..

20 Commits

Author SHA1 Message Date
Elizabeth Murray
04440343f8 Pagebench with grpc option. Note that grpc is on port 51050, so requires a connstring to be set. 2025-05-28 14:44:28 -07:00
Elizabeth Murray
578b7f1668 Remove "pub" for module module in pageserver_page_api. 2025-05-28 13:10:09 -07:00
Elizabeth Murray
97f18dd013 Remove unnecessary whitespace. 2025-05-28 12:54:53 -07:00
Elizabeth Murray
c8abe7e90f Remove unnecessary model changes. 2025-05-28 12:53:28 -07:00
Elizabeth Murray
7160fd16cd Response to review comments, code cleanup. 2025-05-28 12:40:21 -07:00
Elizabeth Murray
13b9d4cb67 Merge branch 'main' into elizabeth/communicator-grpc-minimal-domain-client 2025-05-28 09:40:47 -07:00
Elizabeth Murray
f0982f9a0a Clean up dependencies. 2025-05-28 08:51:01 -07:00
Elizabeth Murray
1634af6d10 Move conversion from string out of the auth interceptor. 2025-05-28 08:45:20 -07:00
Vlad Lazar
eadabeddb8 pageserver: use the same job size throughout the import lifetime (#12026)
## Problem

Import planning takes a job size limit as its input. Previously, the job
size came from a pageserver config field. This field may change while
imports are in progress. If this happens, plans will no longer be
identical and the import would fail permanently.

## Summary of Changes

Bake the job size into the import progress reported to the storage
controller. For new imports, use the value from the pagesever config,
and, for existing imports, use the value present in the shard progress.

This value is identical for all shards, but we want it to be versioned
since future versions of the planner might split the jobs up
differently. Hence, it ends up in `ShardImportProgress`.

Closes https://github.com/neondatabase/neon/issues/11983
2025-05-28 15:19:41 +00:00
Elizabeth Murray
53c1a7ca7f Add minimal GRPC client code that will be used for pagebench. 2025-05-28 08:09:45 -07:00
Alex Chi Z.
67ddf1de28 feat(pageserver): create image layers at L0-L1 boundary (#12023)
## Problem

Previous attempt https://github.com/neondatabase/neon/pull/10548 caused
some issues in staging and we reverted it. This is a re-attempt to
address https://github.com/neondatabase/neon/issues/11063.

Currently we create image layers at latest record LSN. We would create
"future image layers" (i.e., image layers with LSN larger than disk
consistent LSN) that need special handling at startup. We also waste a
lot of read operations to reconstruct from L0 layers while we could have
compacted all of the L0 layers and operate on a flat level of historic
layers.

## Summary of changes

* Run repartition at L0-L1 boundary.
* Roll out with feature flags.
* Piggyback a change that downgrades "image layer creating below
gc_cutoff" to debug level.

---------

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-28 07:00:52 +00:00
Nikita Kalyanov
541fcd8d2f chore: expose new mark_invisible API in openAPI spec for use in cplane (#12032)
## Problem
There is a new API that I plan to use. We generate client from the spec
so it should be in the spec
## Summary of changes
Document the existing API in openAPI format
2025-05-28 03:39:59 +00:00
Suhas Thalanki
e77961c1c6 background worker that collects installed extensions (#11939)
## Problem

Currently, we collect metrics of what extensions are installed on
computes at start up time. We do not have a mechanism that does this at
runtime.

## Summary of changes

Added a background thread that queries all DBs at regular intervals and
collects a list of installed extensions.
2025-05-27 19:40:51 +00:00
Tristan Partin
cdfa06caad Remove test-images compatibility hack for confirming library load paths (#11927)
This hack was needed for compatiblity tests, but after the compute
release is no longer needed.

Signed-off-by: Tristan Partin <tristan@neon.tech>
2025-05-27 17:33:16 +00:00
Alex Chi Z.
f0bb93a9c9 feat(pageserver): support evaluate boolean flags (#12024)
## Problem

Part of https://github.com/neondatabase/neon/issues/11813

## Summary of changes

* Support evaluate boolean flags.
* Add docs on how to handle errors.
* Add test cases based on real PostHog config.

Signed-off-by: Alex Chi Z <chi@neon.tech>
2025-05-27 14:29:15 +00:00
Vlad Lazar
30adf8e2bd pageserver: add tracing spans for time spent in batch and flushing (#12012)
## Problem

We have some gaps in our traces. This indicates missing spans.

## Summary of changes

This PR adds two new spans:
* WAIT_EXECUTOR: time a batched request spends in the batch waiting to
be picked up
* FLUSH_RESPONSE: time a get page request spends flushing the response
to the compute


![image](https://github.com/user-attachments/assets/41b3ddb8-438d-4375-9da3-da341fc0916a)
2025-05-27 13:57:53 +00:00
Erik Grinaker
5d538a9503 page_api: tweak errors (#12019)
## Problem

The page API gRPC errors need a few tweaks to integrate better with the
GetPage machinery.

Touches https://github.com/neondatabase/neon/issues/11728.

## Summary of changes

* Add `GetPageStatus::InternalError` for internal server errors.
* Rename `GetPageStatus::Invalid` to `InvalidRequest` for clarity.
* Rename `status` and `GetPageStatus` to `status_code` and
`GetPageStatusCode`.
* Add an `Into<tonic::Status>` implementation for `ProtocolError`.
2025-05-27 12:06:51 +00:00
Arpad Müller
f3976e5c60 remove safekeeper_proto_version = 3 from tests (#12020)
Some tests still explicitly specify version 3 of the safekeeper
walproposer protocol. Remove the explicit opt in from the tests as v3 is
the default now since #11518.

We don't touch the places where a test exercises both v2 and v3. Those
we leave for #12021.

Part of https://github.com/neondatabase/neon/issues/10326
2025-05-27 11:32:15 +00:00
Vlad Lazar
9657fbc194 pageserver: add and stabilize import chaos test (#11982)
## Problem

Test coverage of timeline imports is lacking.

## Summary of changes

This PR adds a chaos import test. It runs an import while injecting
various chaos events
in the environment. All the commits that follow the test fix various
issues that were surfaced by it.

Closes https://github.com/neondatabase/neon/issues/10191
2025-05-27 09:52:59 +00:00
a-masterov
dd501554c9 add a script to run the test for online-advisor as a regular user. (#12017)
## Problem
The regression test for the extension online_advisor fails on the
staging instance due to a lack of permission to alter the database.
## Summary of changes
A script was added to work around this problem.

---------

Co-authored-by: Alexander Lakhin <alexander.lakhin@neon.tech>
2025-05-27 08:54:59 +00:00
31 changed files with 1676 additions and 337 deletions

90
Cargo.lock generated
View File

@@ -701,7 +701,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"itoa",
"matchit",
@@ -718,7 +718,7 @@ dependencies = [
"sync_wrapper 1.0.1",
"tokio",
"tokio-tungstenite 0.26.1",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
"tracing",
@@ -761,7 +761,7 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
]
@@ -1337,7 +1337,7 @@ dependencies = [
"tokio-postgres",
"tokio-stream",
"tokio-util",
"tower 0.5.2",
"tower",
"tower-http",
"tower-otel",
"tracing",
@@ -2066,7 +2066,7 @@ dependencies = [
"test-log",
"tokio",
"tokio-util",
"tower 0.5.2",
"tower",
"tracing",
"utils",
"workspace_hack",
@@ -2330,7 +2330,7 @@ dependencies = [
"futures-core",
"futures-sink",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"pin-project",
"rand 0.8.5",
@@ -2883,9 +2883,9 @@ dependencies = [
[[package]]
name = "httparse"
version = "1.8.0"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
[[package]]
name = "httpdate"
@@ -2935,9 +2935,9 @@ dependencies = [
[[package]]
name = "hyper"
version = "1.4.1"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80"
dependencies = [
"bytes",
"futures-channel",
@@ -2977,7 +2977,7 @@ checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
dependencies = [
"futures-util",
"http 1.1.0",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"rustls 0.22.4",
"rustls-pki-types",
@@ -2992,7 +2992,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
dependencies = [
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"pin-project-lite",
"tokio",
@@ -3001,20 +3001,20 @@ dependencies = [
[[package]]
name = "hyper-util"
version = "0.1.7"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
checksum = "cf9f1e950e0d9d1d3c47184416723cf29c0d1f93bd8cccf37e4beb6b44f31710"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"hyper 1.4.1",
"hyper 1.6.0",
"libc",
"pin-project-lite",
"socket2",
"tokio",
"tower 0.4.13",
"tower-service",
"tracing",
]
@@ -4236,6 +4236,7 @@ name = "pagebench"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"camino",
"clap",
"futures",
@@ -4244,12 +4245,15 @@ dependencies = [
"humantime-serde",
"pageserver_api",
"pageserver_client",
"pageserver_client_grpc",
"pageserver_page_api",
"rand 0.8.5",
"reqwest",
"serde",
"serde_json",
"tokio",
"tokio-util",
"tonic 0.13.1",
"tracing",
"utils",
"workspace_hack",
@@ -4432,6 +4436,21 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_client_grpc"
version = "0.1.0"
dependencies = [
"bytes",
"futures",
"http 1.1.0",
"pageserver_page_api",
"thiserror 1.0.69",
"tokio",
"tonic 0.13.1",
"tracing",
"utils",
]
[[package]]
name = "pageserver_compaction"
version = "0.1.0"
@@ -5208,7 +5227,7 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper 0.14.30",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"indexmap 2.9.0",
"ipnet",
@@ -5604,7 +5623,7 @@ dependencies = [
"http-body-util",
"http-types",
"humantime-serde",
"hyper 1.4.1",
"hyper 1.6.0",
"itertools 0.10.5",
"metrics",
"once_cell",
@@ -5644,7 +5663,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-rustls 0.26.0",
"hyper-util",
"ipnet",
@@ -5701,7 +5720,7 @@ dependencies = [
"futures",
"getrandom 0.2.11",
"http 1.1.0",
"hyper 1.4.1",
"hyper 1.6.0",
"parking_lot 0.11.2",
"reqwest",
"reqwest-middleware",
@@ -6642,12 +6661,12 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.5.5"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.48.0",
"windows-sys 0.52.0",
]
[[package]]
@@ -6713,7 +6732,7 @@ dependencies = [
"http-body-util",
"http-utils",
"humantime",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"metrics",
"once_cell",
@@ -7542,7 +7561,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-timeout",
"hyper-util",
"percent-encoding",
@@ -7553,7 +7572,7 @@ dependencies = [
"tokio",
"tokio-rustls 0.26.2",
"tokio-stream",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
"tracing",
@@ -7586,21 +7605,6 @@ dependencies = [
"tonic 0.13.1",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tokio",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower"
version = "0.5.2"
@@ -8591,7 +8595,7 @@ dependencies = [
"hex",
"hmac",
"hyper 0.14.30",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"indexmap 2.9.0",
"itertools 0.12.1",
@@ -8645,7 +8649,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"toml_edit",
"tower 0.5.2",
"tower",
"tracing",
"tracing-core",
"tracing-log",

View File

@@ -8,6 +8,7 @@ members = [
"pageserver/compaction",
"pageserver/ctl",
"pageserver/client",
"pageserver/client_grpc",
"pageserver/pagebench",
"pageserver/page_api",
"proxy",
@@ -254,6 +255,7 @@ metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_client_grpc = { path = "./pageserver/client_grpc" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }

View File

@@ -136,6 +136,10 @@ struct Cli {
requires = "compute-id"
)]
pub control_plane_uri: Option<String>,
/// Interval in seconds for collecting installed extensions statistics
#[arg(long, default_value = "3600")]
pub installed_extensions_collection_interval: u64,
}
fn main() -> Result<()> {
@@ -179,6 +183,7 @@ fn main() -> Result<()> {
cgroup: cli.cgroup,
#[cfg(target_os = "linux")]
vm_monitor_addr: cli.vm_monitor_addr,
installed_extensions_collection_interval: cli.installed_extensions_collection_interval,
},
config,
)?;

View File

@@ -97,6 +97,9 @@ pub struct ComputeNodeParams {
/// the address of extension storage proxy gateway
pub remote_ext_base_url: Option<String>,
/// Interval for installed extensions collection
pub installed_extensions_collection_interval: u64,
}
/// Compute node info shared across several `compute_ctl` threads.
@@ -742,17 +745,7 @@ impl ComputeNode {
let conf = self.get_tokio_conn_conf(None);
tokio::task::spawn(async {
let res = get_installed_extensions(conf).await;
match res {
Ok(extensions) => {
info!(
"[NEON_EXT_STAT] {}",
serde_json::to_string(&extensions)
.expect("failed to serialize extensions list")
);
}
Err(err) => error!("could not get installed extensions: {err:?}"),
}
let _ = installed_extensions(conf).await;
});
}
@@ -782,6 +775,9 @@ impl ComputeNode {
// Log metrics so that we can search for slow operations in logs
info!(?metrics, postmaster_pid = %postmaster_pid, "compute start finished");
// Spawn the extension stats background task
self.spawn_extension_stats_task();
if pspec.spec.prewarm_lfc_on_startup {
self.prewarm_lfc();
}
@@ -2192,6 +2188,41 @@ LIMIT 100",
info!("Pageserver config changed");
}
}
pub fn spawn_extension_stats_task(&self) {
let conf = self.tokio_conn_conf.clone();
let installed_extensions_collection_interval =
self.params.installed_extensions_collection_interval;
tokio::spawn(async move {
// An initial sleep is added to ensure that two collections don't happen at the same time.
// The first collection happens during compute startup.
tokio::time::sleep(tokio::time::Duration::from_secs(
installed_extensions_collection_interval,
))
.await;
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
installed_extensions_collection_interval,
));
loop {
interval.tick().await;
let _ = installed_extensions(conf.clone()).await;
}
});
}
}
pub async fn installed_extensions(conf: tokio_postgres::Config) -> Result<()> {
let res = get_installed_extensions(conf).await;
match res {
Ok(extensions) => {
info!(
"[NEON_EXT_STAT] {}",
serde_json::to_string(&extensions).expect("failed to serialize extensions list")
);
}
Err(err) => error!("could not get installed extensions: {err:?}"),
}
Ok(())
}
pub fn forward_termination_signal() {

View File

@@ -20,7 +20,7 @@ first_path="$(ldconfig --verbose 2>/dev/null \
| grep --invert-match ^$'\t' \
| cut --delimiter=: --fields=1 \
| head --lines=1)"
test "$first_path" == '/usr/local/lib' || true # Remove the || true in a follow-up PR. Needed for backwards compat.
test "$first_path" == '/usr/local/lib'
echo "Waiting pageserver become ready."
while ! nc -z pageserver 6400; do

View File

@@ -0,0 +1,6 @@
#!/bin/sh
set -ex
cd "$(dirname "${0}")"
if [ -f Makefile ]; then
make installcheck
fi

View File

@@ -0,0 +1,9 @@
#!/bin/sh
set -ex
cd "$(dirname ${0})"
[ -f Makefile ] || exit 0
dropdb --if-exist contrib_regression
createdb contrib_regression
PG_REGRESS=$(dirname "$(pg_config --pgxs)")/../test/regress/pg_regress
TESTS=$(echo sql/* | sed 's|sql/||g; s|\.sql||g')
${PG_REGRESS} --use-existing --inputdir=./ --bindir='/usr/local/pgsql/bin' --dbname=contrib_regression ${TESTS}

View File

@@ -354,6 +354,9 @@ pub struct ShardImportProgressV1 {
pub completed: usize,
/// Hash of the plan
pub import_plan_hash: u64,
/// Soft limit for the job size
/// This needs to remain constant throughout the import
pub job_soft_size_limit: usize,
}
impl ShardImportStatus {

View File

@@ -37,7 +37,7 @@ pub struct LocalEvaluationFlag {
#[derive(Deserialize)]
pub struct LocalEvaluationFlagFilters {
groups: Vec<LocalEvaluationFlagFilterGroup>,
multivariate: LocalEvaluationFlagMultivariate,
multivariate: Option<LocalEvaluationFlagMultivariate>,
}
#[derive(Deserialize)]
@@ -254,7 +254,7 @@ impl FeatureStore {
}
}
/// Evaluate a multivariate feature flag. Returns `None` if the flag is not available or if there are errors
/// Evaluate a multivariate feature flag. Returns an error if the flag is not available or if there are errors
/// during the evaluation.
///
/// The parsing logic is as follows:
@@ -272,6 +272,10 @@ impl FeatureStore {
/// Example: we have a multivariate flag with 3 groups of the configured global rollout percentage: A (10%), B (20%), C (70%).
/// There is a single group with a condition that has a rollout percentage of 10% and it does not have a variant override.
/// Then, we will have 1% of the users evaluated to A, 2% to B, and 7% to C.
///
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
/// propagated beyond where the feature flag gets resolved.
pub fn evaluate_multivariate(
&self,
flag_key: &str,
@@ -290,6 +294,35 @@ impl FeatureStore {
)
}
/// Evaluate a boolean feature flag. Returns an error if the flag is not available or if there are errors
/// during the evaluation.
///
/// The parsing logic is as follows:
///
/// * Generate a consistent hash for the tenant-feature.
/// * Match each filter group.
/// - If a group is matched, it will first determine whether the user is in the range of the rollout
/// percentage.
/// - If the hash falls within the group's rollout percentage, return true.
/// * Otherwise, continue with the next group until all groups are evaluated and no group is within the
/// rollout percentage.
/// * If there are no matching groups, return an error.
///
/// Returns `Ok(())` if the feature flag evaluates to true. In the future, it will return a payload.
///
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
/// propagated beyond where the feature flag gets resolved.
pub fn evaluate_boolean(
&self,
flag_key: &str,
user_id: &str,
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<(), PostHogEvaluationError> {
let hash_on_global_rollout_percentage = Self::consistent_hash(user_id, flag_key, "boolean");
self.evaluate_boolean_inner(flag_key, hash_on_global_rollout_percentage, properties)
}
/// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID
/// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests
/// and avoid duplicate computations.
@@ -316,6 +349,11 @@ impl FeatureStore {
flag_key
)));
}
let Some(ref multivariate) = flag_config.filters.multivariate else {
return Err(PostHogEvaluationError::Internal(format!(
"No multivariate available, should use evaluate_boolean?: {flag_key}"
)));
};
// TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog
// Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it
// does not matter.
@@ -324,7 +362,7 @@ impl FeatureStore {
GroupEvaluationResult::MatchedAndOverride(variant) => return Ok(variant),
GroupEvaluationResult::MatchedAndEvaluate => {
let mut percentage = 0;
for variant in &flag_config.filters.multivariate.variants {
for variant in &multivariate.variants {
percentage += variant.rollout_percentage;
if self
.evaluate_percentage(hash_on_global_rollout_percentage, percentage)
@@ -352,6 +390,64 @@ impl FeatureStore {
)))
}
}
/// Evaluate a multivariate feature flag. Note that we directly take the mapped user ID
/// (a consistent hash ranging from 0 to 1) so that it is easier to use it in the tests
/// and avoid duplicate computations.
///
/// Use a different consistent hash for evaluating the group rollout percentage.
/// The behavior: if the condition is set to rolling out to 10% of the users, and
/// we set the variant A to 20% in the global config, then 2% of the total users will
/// be evaluated to variant A.
///
/// Note that the hash to determine group rollout percentage is shared across all groups. So if we have two
/// exactly-the-same conditions with 10% and 20% rollout percentage respectively, a total of 20% of the users
/// will be evaluated (versus 30% if group evaluation is done independently).
pub(crate) fn evaluate_boolean_inner(
&self,
flag_key: &str,
hash_on_global_rollout_percentage: f64,
properties: &HashMap<String, PostHogFlagFilterPropertyValue>,
) -> Result<(), PostHogEvaluationError> {
if let Some(flag_config) = self.flags.get(flag_key) {
if !flag_config.active {
return Err(PostHogEvaluationError::NotAvailable(format!(
"The feature flag is not active: {}",
flag_key
)));
}
if flag_config.filters.multivariate.is_some() {
return Err(PostHogEvaluationError::Internal(format!(
"This looks like a multivariate flag, should use evaluate_multivariate?: {flag_key}"
)));
};
// TODO: sort the groups so that variant overrides always get evaluated first and it follows the PostHog
// Python SDK behavior; for now we do not configure conditions without variant overrides in Neon so it
// does not matter.
for group in &flag_config.filters.groups {
match self.evaluate_group(group, hash_on_global_rollout_percentage, properties)? {
GroupEvaluationResult::MatchedAndOverride(_) => {
return Err(PostHogEvaluationError::Internal(format!(
"Boolean flag cannot have overrides: {}",
flag_key
)));
}
GroupEvaluationResult::MatchedAndEvaluate => {
return Ok(());
}
GroupEvaluationResult::Unmatched => continue,
}
}
// If no group is matched, the feature is not available, and up to the caller to decide what to do.
Err(PostHogEvaluationError::NoConditionGroupMatched)
} else {
// The feature flag is not available yet
Err(PostHogEvaluationError::NotAvailable(format!(
"Not found in the local evaluation spec: {}",
flag_key
)))
}
}
}
pub struct PostHogClientConfig {
@@ -469,95 +565,162 @@ mod tests {
fn data() -> &'static str {
r#"{
"flags": [
{
"id": 132794,
"team_id": 152860,
"name": "",
"key": "gc-compaction",
"filters": {
"groups": [
{
"variant": "enabled-stage-2",
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
},
{
"key": "pageserver_remote_size",
"type": "person",
"value": "10000000",
"operator": "lt"
}
],
"rollout_percentage": 50
},
{
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
},
{
"key": "pageserver_remote_size",
"type": "person",
"value": "10000000",
"operator": "lt"
}
],
"rollout_percentage": 80
}
],
"payloads": {},
"multivariate": {
"variants": [
{
"key": "disabled",
"name": "",
"rollout_percentage": 90
},
{
"key": "enabled-stage-1",
"name": "",
"rollout_percentage": 10
},
{
"key": "enabled-stage-2",
"name": "",
"rollout_percentage": 0
},
{
"key": "enabled-stage-3",
"name": "",
"rollout_percentage": 0
},
{
"key": "enabled",
"name": "",
"rollout_percentage": 0
}
]
}
},
"deleted": false,
"active": true,
"ensure_experience_continuity": false,
"has_encrypted_payloads": false,
"version": 6
}
"flags": [
{
"id": 141807,
"team_id": 152860,
"name": "",
"key": "image-compaction-boundary",
"filters": {
"groups": [
{
"variant": null,
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
}
],
"group_type_mapping": {},
"cohorts": {}
}"#
"rollout_percentage": 40
},
{
"variant": null,
"properties": [],
"rollout_percentage": 10
}
],
"payloads": {},
"multivariate": null
},
"deleted": false,
"active": true,
"ensure_experience_continuity": false,
"has_encrypted_payloads": false,
"version": 1
},
{
"id": 135586,
"team_id": 152860,
"name": "",
"key": "boolean-flag",
"filters": {
"groups": [
{
"variant": null,
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
}
],
"rollout_percentage": 47
}
],
"payloads": {},
"multivariate": null
},
"deleted": false,
"active": true,
"ensure_experience_continuity": false,
"has_encrypted_payloads": false,
"version": 1
},
{
"id": 132794,
"team_id": 152860,
"name": "",
"key": "gc-compaction",
"filters": {
"groups": [
{
"variant": "enabled-stage-2",
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
},
{
"key": "pageserver_remote_size",
"type": "person",
"value": "10000000",
"operator": "lt"
}
],
"rollout_percentage": 50
},
{
"properties": [
{
"key": "plan_type",
"type": "person",
"value": [
"free"
],
"operator": "exact"
},
{
"key": "pageserver_remote_size",
"type": "person",
"value": "10000000",
"operator": "lt"
}
],
"rollout_percentage": 80
}
],
"payloads": {},
"multivariate": {
"variants": [
{
"key": "disabled",
"name": "",
"rollout_percentage": 90
},
{
"key": "enabled-stage-1",
"name": "",
"rollout_percentage": 10
},
{
"key": "enabled-stage-2",
"name": "",
"rollout_percentage": 0
},
{
"key": "enabled-stage-3",
"name": "",
"rollout_percentage": 0
},
{
"key": "enabled",
"name": "",
"rollout_percentage": 0
}
]
}
},
"deleted": false,
"active": true,
"ensure_experience_continuity": false,
"has_encrypted_payloads": false,
"version": 7
}
],
"group_type_mapping": {},
"cohorts": {}
}"#
}
#[test]
@@ -633,4 +796,125 @@ mod tests {
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
}
#[test]
fn evaluate_boolean_1() {
// The `boolean-flag` feature flag only has one group that matches on the free user.
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags);
// This lacks the required properties and cannot be evaluated.
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &HashMap::new());
assert!(matches!(
variant,
Err(PostHogEvaluationError::NotAvailable(_))
),);
let properties_unmatched = HashMap::from([
(
"plan_type".to_string(),
PostHogFlagFilterPropertyValue::String("paid".to_string()),
),
(
"pageserver_remote_size".to_string(),
PostHogFlagFilterPropertyValue::Number(1000.0),
),
]);
// This does not match any group so there will be an error.
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &properties_unmatched);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
let properties = HashMap::from([
(
"plan_type".to_string(),
PostHogFlagFilterPropertyValue::String("free".to_string()),
),
(
"pageserver_remote_size".to_string(),
PostHogFlagFilterPropertyValue::Number(1000.0),
),
]);
// It matches the first group as 0.10 <= 0.50 and the properties are matched. Then it gets evaluated to the variant override.
let variant = store.evaluate_boolean_inner("boolean-flag", 0.10, &properties);
assert!(variant.is_ok());
// It matches the group conditions but not the group rollout percentage.
let variant = store.evaluate_boolean_inner("boolean-flag", 1.00, &properties);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
}
#[test]
fn evaluate_boolean_2() {
// The `image-compaction-boundary` feature flag has one group that matches on the free user and a group that matches on all users.
let mut store = FeatureStore::new();
let response: LocalEvaluationResponse = serde_json::from_str(data()).unwrap();
store.set_flags(response.flags);
// This lacks the required properties and cannot be evaluated.
let variant =
store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &HashMap::new());
assert!(matches!(
variant,
Err(PostHogEvaluationError::NotAvailable(_))
),);
let properties_unmatched = HashMap::from([
(
"plan_type".to_string(),
PostHogFlagFilterPropertyValue::String("paid".to_string()),
),
(
"pageserver_remote_size".to_string(),
PostHogFlagFilterPropertyValue::Number(1000.0),
),
]);
// This does not match the filtered group but the all user group.
let variant =
store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &properties_unmatched);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
let variant =
store.evaluate_boolean_inner("image-compaction-boundary", 0.05, &properties_unmatched);
assert!(variant.is_ok());
let properties = HashMap::from([
(
"plan_type".to_string(),
PostHogFlagFilterPropertyValue::String("free".to_string()),
),
(
"pageserver_remote_size".to_string(),
PostHogFlagFilterPropertyValue::Number(1000.0),
),
]);
// It matches the first group as 0.30 <= 0.40 and the properties are matched. Then it gets evaluated to the variant override.
let variant = store.evaluate_boolean_inner("image-compaction-boundary", 0.30, &properties);
assert!(variant.is_ok());
// It matches the group conditions but not the group rollout percentage.
let variant = store.evaluate_boolean_inner("image-compaction-boundary", 1.00, &properties);
assert!(matches!(
variant,
Err(PostHogEvaluationError::NoConditionGroupMatched)
),);
// It matches the second "all" group conditions.
let variant = store.evaluate_boolean_inner("image-compaction-boundary", 0.09, &properties);
assert!(variant.is_ok());
}
}

View File

@@ -0,0 +1,16 @@
[package]
name = "pageserver_client_grpc"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
bytes.workspace = true
futures.workspace = true
http.workspace = true
thiserror.workspace = true
tonic.workspace = true
tracing.workspace = true
pageserver_page_api.workspace = true
utils.workspace = true
tokio.workspace = true

View File

@@ -0,0 +1,192 @@
//!
//! Pageserver gRPC client library
//!
//! This library provides a gRPC client for the pageserver for the
//! communicator project.
//!
//! This library is a work in progress.
//!
//!
use std::collections::HashMap;
use bytes::Bytes;
use futures::{StreamExt};
use thiserror::Error;
use tonic::metadata::AsciiMetadataValue;
use pageserver_page_api::proto;
use pageserver_page_api::proto::PageServiceClient;
use utils::shard::ShardIndex;
use std::fmt::Debug;
use tracing::error;
use tokio::sync::RwLock;
use tonic::transport::{Channel, Endpoint};
#[derive(Error, Debug)]
pub enum PageserverClientError {
#[error("could not connect to service: {0}")]
ConnectError(#[from] tonic::transport::Error),
#[error("could not perform request: {0}`")]
RequestError(#[from] tonic::Status),
#[error("protocol error: {0}")]
ProtocolError(#[from] pageserver_page_api::ProtocolError),
#[error("could not perform request: {0}`")]
InvalidUri(#[from] http::uri::InvalidUri),
#[error("could not perform request: {0}`")]
Other(String),
}
pub struct PageserverClient {
endpoint_map: HashMap<ShardIndex, Endpoint>,
channels: tokio::sync::RwLock<HashMap<ShardIndex, Channel>>,
auth_interceptor: AuthInterceptor,
}
impl PageserverClient {
/// TODO: this doesn't currently react to changes in the shard map.
pub fn new(
tenant_id: AsciiMetadataValue,
timeline_id: AsciiMetadataValue,
auth_token: Option<String>,
shard_map: HashMap<ShardIndex, String>,
) -> Result<Self, PageserverClientError> {
let endpoint_map: HashMap<ShardIndex, Endpoint> = shard_map
.into_iter()
.map(|(shard, url)| {
let endpoint = Endpoint::from_shared(url)
.map_err(|_e| PageserverClientError::Other("Unable to parse endpoint {url}".to_string()))?;
Ok::<(ShardIndex, Endpoint), PageserverClientError>((shard, endpoint))
})
.collect::<Result<_, _>>()?;
Ok(Self {
endpoint_map,
channels: RwLock::new(HashMap::new()),
auth_interceptor: AuthInterceptor::new(
tenant_id,
timeline_id,
auth_token,
),
})
}
//
// TODO: This opens a new gRPC stream for every request, which is extremely inefficient
pub async fn get_page(
&self,
shard: ShardIndex,
request: pageserver_page_api::GetPageRequest,
) -> Result<Vec<Bytes>, PageserverClientError> {
// FIXME: calculate the shard number correctly
let chan = self.get_client(shard).await?;
let mut client =
PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard));
let request = proto::GetPageRequest::try_from(request)?;
let request_stream = futures::stream::once(std::future::ready(request));
let mut response_stream = client
.get_pages(tonic::Request::new(request_stream))
.await?
.into_inner();
let Some(response) = response_stream.next().await else {
return Err(PageserverClientError::Other(
"no response received for getpage request".to_string(),
));
};
match response {
Err(status) => {
return Err(PageserverClientError::RequestError(status));
}
Ok(resp) => {
let response: pageserver_page_api::GetPageResponse = resp.try_into().unwrap();
return Ok(response.page_images.to_vec());
}
}
}
//
// TODO: this should use a connection pool with concurrency limits,
// not a single connection to the shard.
//
async fn get_client(&self, shard: ShardIndex) -> Result<Channel, PageserverClientError> {
// Get channel from the hashmap
let mut channels = self.channels.write();
if let Some(channel) = channels.await.get(&shard) {
return Ok(channel.clone());
}
// Create a new channel if it doesn't exist
let shard_endpoint = self
.endpoint_map
.get(&shard);
let endpoint = match shard_endpoint{
Some(_endpoint) => _endpoint,
None => {
error!("Shard {shard} not found in shard map");
return Err(PageserverClientError::Other(format!(
"Shard {shard} not found in shard map"
)));
}
};
let channel = endpoint.connect().await?;
channels = self.channels.write();
channels.await.insert(shard, channel.clone());
Ok(channel.clone())
}
}
/// Inject tenant_id, timeline_id and authentication token to all pageserver requests.
#[derive(Clone)]
struct AuthInterceptor {
tenant_id: AsciiMetadataValue,
shard_id: Option<AsciiMetadataValue>,
timeline_id: AsciiMetadataValue,
auth_header: Option<AsciiMetadataValue>, // including "Bearer " prefix
}
impl AuthInterceptor {
fn new(tenant_id: AsciiMetadataValue,
timeline_id: AsciiMetadataValue,
auth_token: Option<String>) -> Self {
Self {
tenant_id: tenant_id,
shard_id: None,
timeline_id: timeline_id,
auth_header: auth_token
.map(|t| format!("Bearer {t}"))
.map(|t| t.parse().expect("could not parse auth token")),
}
}
fn for_shard(&self, shard_id: ShardIndex) -> Self {
let mut with_shard = self.clone();
with_shard.shard_id = Some(
shard_id
.to_string()
.parse()
.expect("could not parse shard id"),
);
with_shard
}
}
impl tonic::service::Interceptor for AuthInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
req.metadata_mut()
.insert("neon-tenant-id", self.tenant_id.clone());
if let Some(shard_id) = &self.shard_id {
req.metadata_mut().insert("neon-shard-id", shard_id.clone());
}
req.metadata_mut()
.insert("neon-timeline-id", self.timeline_id.clone());
if let Some(auth_header) = &self.auth_header {
req.metadata_mut()
.insert("authorization", auth_header.clone());
}
Ok(req)
}
}

View File

@@ -54,9 +54,9 @@ service PageService {
// RPCs use regular unary requests, since they are not as frequent and
// performance-critical, and this simplifies implementation.
//
// NB: a status response (e.g. errors) will terminate the stream. The stream
// may be shared by e.g. multiple Postgres backends, so we should avoid this.
// Most errors are therefore sent as GetPageResponse.status instead.
// NB: a gRPC status response (e.g. errors) will terminate the stream. The
// stream may be shared by multiple Postgres backends, so we avoid this by
// sending them as GetPageResponse.status_code instead.
rpc GetPages (stream GetPageRequest) returns (stream GetPageResponse);
// Returns the size of a relation, as # of blocks.
@@ -159,8 +159,8 @@ message GetPageRequest {
// A GetPageRequest class. Primarily intended for observability, but may also be
// used for prioritization in the future.
enum GetPageClass {
// Unknown class. For forwards compatibility: used when the client sends a
// class that the server doesn't know about.
// Unknown class. For backwards compatibility: used when an older client version sends a class
// that a newer server version has removed.
GET_PAGE_CLASS_UNKNOWN = 0;
// A normal request. This is the default.
GET_PAGE_CLASS_NORMAL = 1;
@@ -180,31 +180,37 @@ message GetPageResponse {
// The original request's ID.
uint64 request_id = 1;
// The response status code.
GetPageStatus status = 2;
GetPageStatusCode status_code = 2;
// A string describing the status, if any.
string reason = 3;
// The 8KB page images, in the same order as the request. Empty if status != OK.
// The 8KB page images, in the same order as the request. Empty if status_code != OK.
repeated bytes page_image = 4;
}
// A GetPageResponse status code. Since we use a bidirectional stream, we don't
// want to send errors as gRPC statuses, since this would terminate the stream.
enum GetPageStatus {
// Unknown status. For forwards compatibility: used when the server sends a
// status code that the client doesn't know about.
GET_PAGE_STATUS_UNKNOWN = 0;
// A GetPageResponse status code.
//
// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
// (potentially shared by many backends), and a gRPC status response would terminate the stream so
// we send GetPageResponse messages with these codes instead.
enum GetPageStatusCode {
// Unknown status. For forwards compatibility: used when an older client version receives a new
// status code from a newer server version.
GET_PAGE_STATUS_CODE_UNKNOWN = 0;
// The request was successful.
GET_PAGE_STATUS_OK = 1;
GET_PAGE_STATUS_CODE_OK = 1;
// The page did not exist. The tenant/timeline/shard has already been
// validated during stream setup.
GET_PAGE_STATUS_NOT_FOUND = 2;
GET_PAGE_STATUS_CODE_NOT_FOUND = 2;
// The request was invalid.
GET_PAGE_STATUS_INVALID = 3;
GET_PAGE_STATUS_CODE_INVALID_REQUEST = 3;
// The request failed due to an internal server error.
GET_PAGE_STATUS_CODE_INTERNAL_ERROR = 4;
// The tenant is rate limited. Slow down and retry later.
GET_PAGE_STATUS_SLOW_DOWN = 4;
// TODO: consider adding a GET_PAGE_STATUS_LAYER_DOWNLOAD in the case of a
// layer download. This could free up the server task to process other
// requests while the layer download is in progress.
GET_PAGE_STATUS_CODE_SLOW_DOWN = 5;
// NB: shutdown errors are emitted as a gRPC Unavailable status.
//
// TODO: consider adding a GET_PAGE_STATUS_CODE_LAYER_DOWNLOAD in the case of a layer download.
// This could free up the server task to process other requests while the download is in progress.
}
// Fetches the size of a relation at a given LSN, as # of blocks. Only valid on

View File

@@ -35,6 +35,12 @@ impl ProtocolError {
}
}
impl From<ProtocolError> for tonic::Status {
fn from(err: ProtocolError) -> Self {
tonic::Status::invalid_argument(format!("{err}"))
}
}
/// The LSN a request should read at.
#[derive(Clone, Copy, Debug)]
pub struct ReadLsn {
@@ -328,7 +334,7 @@ pub type RequestID = u64;
/// A GetPage request class.
#[derive(Clone, Copy, Debug)]
pub enum GetPageClass {
/// Unknown status. For backwards compatibility: used when an older client version sends a class
/// Unknown class. For backwards compatibility: used when an older client version sends a class
/// that a newer server version has removed.
Unknown,
/// A normal request. This is the default.
@@ -386,7 +392,7 @@ pub struct GetPageResponse {
/// The original request's ID.
pub request_id: RequestID,
/// The response status code.
pub status: GetPageStatus,
pub status_code: GetPageStatusCode,
/// A string describing the status, if any.
pub reason: Option<String>,
/// The 8KB page images, in the same order as the request. Empty if status != OK.
@@ -397,7 +403,7 @@ impl From<proto::GetPageResponse> for GetPageResponse {
fn from(pb: proto::GetPageResponse) -> Self {
Self {
request_id: pb.request_id,
status: pb.status.into(),
status_code: pb.status_code.into(),
reason: Some(pb.reason).filter(|r| !r.is_empty()),
page_images: pb.page_image.into(),
}
@@ -408,16 +414,20 @@ impl From<GetPageResponse> for proto::GetPageResponse {
fn from(response: GetPageResponse) -> Self {
Self {
request_id: response.request_id,
status: response.status.into(),
status_code: response.status_code.into(),
reason: response.reason.unwrap_or_default(),
page_image: response.page_images.into_vec(),
}
}
}
/// A GetPage response status.
/// A GetPage response status code.
///
/// These are effectively equivalent to gRPC statuses. However, we use a bidirectional stream
/// (potentially shared by many backends), and a gRPC status response would terminate the stream so
/// we send GetPageResponse messages with these codes instead.
#[derive(Clone, Copy, Debug)]
pub enum GetPageStatus {
pub enum GetPageStatusCode {
/// Unknown status. For forwards compatibility: used when an older client version receives a new
/// status code from a newer server version.
Unknown,
@@ -427,46 +437,50 @@ pub enum GetPageStatus {
/// setup.
NotFound,
/// The request was invalid.
Invalid,
InvalidRequest,
/// The request failed due to an internal server error.
InternalError,
/// The tenant is rate limited. Slow down and retry later.
SlowDown,
}
impl From<proto::GetPageStatus> for GetPageStatus {
fn from(pb: proto::GetPageStatus) -> Self {
impl From<proto::GetPageStatusCode> for GetPageStatusCode {
fn from(pb: proto::GetPageStatusCode) -> Self {
match pb {
proto::GetPageStatus::Unknown => Self::Unknown,
proto::GetPageStatus::Ok => Self::Ok,
proto::GetPageStatus::NotFound => Self::NotFound,
proto::GetPageStatus::Invalid => Self::Invalid,
proto::GetPageStatus::SlowDown => Self::SlowDown,
proto::GetPageStatusCode::Unknown => Self::Unknown,
proto::GetPageStatusCode::Ok => Self::Ok,
proto::GetPageStatusCode::NotFound => Self::NotFound,
proto::GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
proto::GetPageStatusCode::InternalError => Self::InternalError,
proto::GetPageStatusCode::SlowDown => Self::SlowDown,
}
}
}
impl From<i32> for GetPageStatus {
fn from(status: i32) -> Self {
proto::GetPageStatus::try_from(status)
.unwrap_or(proto::GetPageStatus::Unknown)
impl From<i32> for GetPageStatusCode {
fn from(status_code: i32) -> Self {
proto::GetPageStatusCode::try_from(status_code)
.unwrap_or(proto::GetPageStatusCode::Unknown)
.into()
}
}
impl From<GetPageStatus> for proto::GetPageStatus {
fn from(status: GetPageStatus) -> Self {
match status {
GetPageStatus::Unknown => Self::Unknown,
GetPageStatus::Ok => Self::Ok,
GetPageStatus::NotFound => Self::NotFound,
GetPageStatus::Invalid => Self::Invalid,
GetPageStatus::SlowDown => Self::SlowDown,
impl From<GetPageStatusCode> for proto::GetPageStatusCode {
fn from(status_code: GetPageStatusCode) -> Self {
match status_code {
GetPageStatusCode::Unknown => Self::Unknown,
GetPageStatusCode::Ok => Self::Ok,
GetPageStatusCode::NotFound => Self::NotFound,
GetPageStatusCode::InvalidRequest => Self::InvalidRequest,
GetPageStatusCode::InternalError => Self::InternalError,
GetPageStatusCode::SlowDown => Self::SlowDown,
}
}
}
impl From<GetPageStatus> for i32 {
fn from(status: GetPageStatus) -> Self {
proto::GetPageStatus::from(status).into()
impl From<GetPageStatusCode> for i32 {
fn from(status_code: GetPageStatusCode) -> Self {
proto::GetPageStatusCode::from(status_code).into()
}
}

View File

@@ -20,9 +20,13 @@ serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tokio.workspace = true
tonic.workspace = true
tokio-util.workspace = true
async-trait = "0.1"
pageserver_client.workspace = true
pageserver_api.workspace = true
pageserver_client_grpc.workspace = true
pageserver_page_api.workspace = true
utils = { path = "../../libs/utils/" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -6,25 +6,40 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tonic::metadata::AsciiMetadataValue;
use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
use pageserver_api::shard::TenantShardId;
use pageserver_client::page_service::PagestreamClient;
use rand::prelude::*;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::info;
use utils::id::TenantTimelineId;
use utils::id::TenantId;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use futures::{
future::BoxFuture,
stream::FuturesOrdered,
FutureExt, StreamExt,
};
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
use crate::util::{request_stats, tokio_thread_local_stats};
use async_trait::async_trait;
use rand::distributions::weighted::WeightedIndex;
use utils::shard::ShardIndex;
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "false")]
grpc: bool,
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
@@ -303,7 +318,19 @@ async fn main_impl(
.unwrap();
Box::pin(async move {
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
if args.grpc {
let grpc = GrpcProtocol::new(
args.page_service_connstring.clone(),
worker_id.timeline.tenant_id,
worker_id.timeline.timeline_id).await;
client_proto(args, grpc, worker_id, ss, cancel, rps_period, ranges, weights).await
} else {
let pg = PgProtocol::new(
args.page_service_connstring.clone(),
worker_id.timeline.tenant_id,
worker_id.timeline.timeline_id).await;
client_proto(args, pg, worker_id, ss, cancel, rps_period, ranges, weights).await
}
})
};
@@ -354,9 +381,208 @@ async fn main_impl(
anyhow::Ok(())
}
/// Common interface for both Pg and Grpc versions.
#[async_trait]
trait Protocol: Send {
/// Constructor/factory.
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self
where
Self: Sized;
async fn client_libpq(
/// Fire off a “get page” request and store the start time.
async fn add_to_inflight(
&mut self,
start: Instant,
args: &Args,
ranges: Vec<KeyRange>,
weights: WeightedIndex<i128>,
);
/// Wait for the next response and return its start time.
async fn get_start_time(&mut self) -> Instant;
/// How many in-flight requests do we have?
fn len(&self) -> usize;
}
///////////////////////////////////////////////////////////////////////////////
// PgProtocol
///////////////////////////////////////////////////////////////////////////////
struct PgProtocol {
libpq_pagestream: PagestreamClient,
libpq_vector: VecDeque<Instant>,
}
#[async_trait]
impl Protocol for PgProtocol {
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self {
let client = pageserver_client::page_service::Client::new(conn_string)
.await
.unwrap()
.pagestream(tenant_id, timeline_id)
.await
.unwrap();
Self {
libpq_pagestream: client,
libpq_vector: VecDeque::new(),
}
}
async fn add_to_inflight(
&mut self,
start: Instant,
args: &Args,
ranges: Vec<KeyRange>,
weights: WeightedIndex<i128>,
) {
// build your PagestreamGetPageRequest exactly as before…
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key.to_rel_block().unwrap();
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
};
let _ = self.libpq_pagestream.getpage_send(req).await;
self.libpq_vector.push_back(start);
}
async fn get_start_time(&mut self) -> Instant {
let start = self.libpq_vector.pop_front().unwrap();
let _ = self.libpq_pagestream.getpage_recv().await;
start
}
fn len(&self) -> usize {
self.libpq_vector.len()
}
}
///////////////////////////////////////////////////////////////////////////////
// GrpcProtocol
///////////////////////////////////////////////////////////////////////////////
type GetPageFut = BoxFuture<'static, (Instant, Option<pageserver_client_grpc::PageserverClientError>)>;
struct GrpcProtocol {
grpc_page_client: Arc<pageserver_client_grpc::PageserverClient>,
grpc_vector: FuturesOrdered<GetPageFut>,
}
#[async_trait]
impl Protocol for GrpcProtocol {
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self {
let shard_map = std::collections::HashMap::from([(
ShardIndex::unsharded(),
conn_string.clone(),
)]);
let tenant_ascii : AsciiMetadataValue = tenant_id.to_string().parse().unwrap();
let timeline_ascii : AsciiMetadataValue = timeline_id.to_string().parse().unwrap();
let client = pageserver_client_grpc::PageserverClient::new(
tenant_ascii,
timeline_ascii,
None,
shard_map,
).unwrap();
Self {
grpc_page_client: Arc::new(client),
grpc_vector: FuturesOrdered::new(),
}
}
async fn add_to_inflight(
&mut self,
start: Instant,
args: &Args,
ranges: Vec<KeyRange>,
weights: WeightedIndex<i128>,
) {
// build your GetPageRequest exactly as before…
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key.to_rel_block().unwrap();
pageserver_page_api::GetPageRequest {
request_id: 0,
request_class: pageserver_page_api::GetPageClass::Normal,
read_lsn: pageserver_page_api::ReadLsn {
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since_lsn: Some(r.timeline_lsn),
},
rel: pageserver_page_api::RelTag {
spcnode: rel_tag.spcnode,
dbnode: rel_tag.dbnode,
relnode: rel_tag.relnode,
forknum: rel_tag.forknum,
},
block_numbers: vec![block_no].into(),
}
};
let client_clone = self.grpc_page_client.clone();
let getpage_fut : GetPageFut = async move {
let result = client_clone.get_page(ShardIndex::unsharded(), req).await;
match result {
Ok(_) => {
(start, None)
}
Err(e) => {
(start, Some(e))
}
}
}.boxed();
self.grpc_vector.push_back(getpage_fut);
}
async fn get_start_time(&mut self) -> Instant {
let (start, err) = self.grpc_vector.next().await.unwrap();
if let Some(e) = err {
tracing::error!("getpage request failed: {e}");
}
start
}
fn len(&self) -> usize {
self.grpc_vector.len()
}
}
async fn client_proto(
args: &Args,
mut protocol: impl Protocol,
worker_id: WorkerId,
shared_state: Arc<SharedState>,
cancel: CancellationToken,
@@ -364,18 +590,11 @@ async fn client_libpq(
ranges: Vec<KeyRange>,
weights: rand::distributions::weighted::WeightedIndex<i128>,
) {
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
let mut client = client
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
.await
.unwrap();
shared_state.start_work_barrier.wait().await;
let client_start = Instant::now();
let mut ticks_processed = 0;
let mut inflight = VecDeque::new();
while !cancel.is_cancelled() {
// Detect if a request took longer than the RPS rate
if let Some(period) = &rps_period {
@@ -390,37 +609,12 @@ async fn client_libpq(
ticks_processed = periods_passed_until_now;
}
while inflight.len() < args.queue_depth.get() {
while protocol.len() < args.queue_depth.get() {
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
};
client.getpage_send(req).await.unwrap();
inflight.push_back(start);
protocol.add_to_inflight(start, args, ranges.clone(), weights.clone()).await;
}
let start = inflight.pop_front().unwrap();
client.getpage_recv().await.unwrap();
let start = protocol.get_start_time().await;
let end = Instant::now();
shared_state.live_stats.request_done();
ticks_processed += 1;
@@ -436,9 +630,11 @@ async fn client_libpq(
if let Some(period) = &rps_period {
let next_at = client_start
+ Duration::from_micros(
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
);
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
);
tokio::time::sleep_until(next_at.into()).await;
}
}
}

View File

@@ -45,6 +45,10 @@ impl FeatureResolver {
}
/// Evaluate a multivariate feature flag. Currently, we do not support any properties.
///
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
/// propagated beyond where the feature flag gets resolved.
pub fn evaluate_multivariate(
&self,
flag_key: &str,
@@ -62,4 +66,29 @@ impl FeatureResolver {
))
}
}
/// Evaluate a boolean feature flag. Currently, we do not support any properties.
///
/// Returns `Ok(())` if the flag is evaluated to true, otherwise returns an error.
///
/// Error handling: the caller should inspect the error and decide the behavior when a feature flag
/// cannot be evaluated (i.e., default to false if it cannot be resolved). The error should *not* be
/// propagated beyond where the feature flag gets resolved.
pub fn evaluate_boolean(
&self,
flag_key: &str,
tenant_id: TenantId,
) -> Result<(), PostHogEvaluationError> {
if let Some(inner) = &self.inner {
inner.feature_store().evaluate_boolean(
flag_key,
&tenant_id.to_string(),
&HashMap::new(),
)
} else {
Err(PostHogEvaluationError::NotAvailable(
"PostHog integration is not enabled".to_string(),
))
}
}
}

View File

@@ -353,6 +353,33 @@ paths:
"200":
description: OK
/v1/tenant/{tenant_shard_id}/timeline/{timeline_id}/mark_invisible:
parameters:
- name: tenant_shard_id
in: path
required: true
schema:
type: string
- name: timeline_id
in: path
required: true
schema:
type: string
format: hex
put:
requestBody:
content:
application/json:
schema:
type: object
properties:
is_visible:
type: boolean
default: false
responses:
"200":
description: OK
/v1/tenant/{tenant_shard_id}/location_config:
parameters:
- name: tenant_shard_id

View File

@@ -370,6 +370,18 @@ impl From<crate::tenant::secondary::SecondaryTenantError> for ApiError {
}
}
impl From<crate::tenant::FinalizeTimelineImportError> for ApiError {
fn from(err: crate::tenant::FinalizeTimelineImportError) -> ApiError {
use crate::tenant::FinalizeTimelineImportError::*;
match err {
ImportTaskStillRunning => {
ApiError::ResourceUnavailable("Import task still running".into())
}
ShuttingDown => ApiError::ShuttingDown,
}
}
}
// Helper function to construct a TimelineInfo struct for a timeline
async fn build_timeline_info(
timeline: &Arc<Timeline>,
@@ -3533,10 +3545,7 @@ async fn activate_post_import_handler(
tenant.wait_to_become_active(ACTIVE_TENANT_TIMEOUT).await?;
tenant
.finalize_importing_timeline(timeline_id)
.await
.map_err(ApiError::InternalServerError)?;
tenant.finalize_importing_timeline(timeline_id).await?;
match tenant.get_timeline(timeline_id, false) {
Ok(_timeline) => {

View File

@@ -769,6 +769,9 @@ struct BatchedGetPageRequest {
timer: SmgrOpTimer,
lsn_range: LsnRange,
ctx: RequestContext,
// If the request is perf enabled, this contains a context
// with a perf span tracking the time spent waiting for the executor.
batch_wait_ctx: Option<RequestContext>,
}
#[cfg(feature = "testing")]
@@ -781,6 +784,7 @@ struct BatchedTestRequest {
/// so that we don't keep the [`Timeline::gate`] open while the batch
/// is being built up inside the [`spsc_fold`] (pagestream pipelining).
#[derive(IntoStaticStr)]
#[allow(clippy::large_enum_variant)]
enum BatchedFeMessage {
Exists {
span: Span,
@@ -1298,6 +1302,22 @@ impl PageServerHandler {
}
};
let batch_wait_ctx = if ctx.has_perf_span() {
Some(
RequestContextBuilder::from(&ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"WAIT_EXECUTOR",
)
})
.attached_child(),
)
} else {
None
};
BatchedFeMessage::GetPage {
span,
shard: shard.downgrade(),
@@ -1309,6 +1329,7 @@ impl PageServerHandler {
request_lsn: req.hdr.request_lsn
},
ctx,
batch_wait_ctx,
}],
// The executor grabs the batch when it becomes idle.
// Hence, [`GetPageBatchBreakReason::ExecutorSteal`] is the
@@ -1464,7 +1485,7 @@ impl PageServerHandler {
let mut flush_timers = Vec::with_capacity(handler_results.len());
for handler_result in &mut handler_results {
let flush_timer = match handler_result {
Ok((_, timer)) => Some(
Ok((_response, timer, _ctx)) => Some(
timer
.observe_execution_end(flushing_start_time)
.expect("we are the first caller"),
@@ -1484,7 +1505,7 @@ impl PageServerHandler {
// Some handler errors cause exit from pagestream protocol.
// Other handler errors are sent back as an error message and we stay in pagestream protocol.
for (handler_result, flushing_timer) in handler_results.into_iter().zip(flush_timers) {
let response_msg = match handler_result {
let (response_msg, ctx) = match handler_result {
Err(e) => match &e.err {
PageStreamError::Shutdown => {
// If we fail to fulfil a request during shutdown, which may be _because_ of
@@ -1509,15 +1530,30 @@ impl PageServerHandler {
error!("error reading relation or page version: {full:#}")
});
PagestreamBeMessage::Error(PagestreamErrorResponse {
req: e.req,
message: e.err.to_string(),
})
(
PagestreamBeMessage::Error(PagestreamErrorResponse {
req: e.req,
message: e.err.to_string(),
}),
None,
)
}
},
Ok((response_msg, _op_timer_already_observed)) => response_msg,
Ok((response_msg, _op_timer_already_observed, ctx)) => (response_msg, Some(ctx)),
};
let ctx = ctx.map(|req_ctx| {
RequestContextBuilder::from(&req_ctx)
.perf_span(|crnt_perf_span| {
info_span!(
target: PERF_TRACE_TARGET,
parent: crnt_perf_span,
"FLUSH_RESPONSE",
)
})
.attached_child()
});
//
// marshal & transmit response message
//
@@ -1540,6 +1576,17 @@ impl PageServerHandler {
)),
None => futures::future::Either::Right(flush_fut),
};
let flush_fut = if let Some(req_ctx) = ctx.as_ref() {
futures::future::Either::Left(
flush_fut.maybe_perf_instrument(req_ctx, |current_perf_span| {
current_perf_span.clone()
}),
)
} else {
futures::future::Either::Right(flush_fut)
};
// do it while respecting cancellation
let _: () = async move {
tokio::select! {
@@ -1569,7 +1616,7 @@ impl PageServerHandler {
ctx: &RequestContext,
) -> Result<
(
Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>>,
Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>,
Span,
),
QueryError,
@@ -1596,7 +1643,7 @@ impl PageServerHandler {
self.handle_get_rel_exists_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
.map(|msg| (msg, timer, ctx))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -1615,7 +1662,7 @@ impl PageServerHandler {
self.handle_get_nblocks_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
.map(|msg| (msg, timer, ctx))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -1662,7 +1709,7 @@ impl PageServerHandler {
self.handle_db_size_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
.map(|msg| (msg, timer, ctx))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -1681,7 +1728,7 @@ impl PageServerHandler {
self.handle_get_slru_segment_request(&shard, &req, &ctx)
.instrument(span.clone())
.await
.map(|msg| (msg, timer))
.map(|msg| (msg, timer, ctx))
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
],
span,
@@ -2033,12 +2080,25 @@ impl PageServerHandler {
return Ok(());
}
};
let batch = match batch {
let mut batch = match batch {
Ok(batch) => batch,
Err(e) => {
return Err(e);
}
};
if let BatchedFeMessage::GetPage {
pages,
span: _,
shard: _,
batch_break_reason: _,
} = &mut batch
{
for req in pages {
req.batch_wait_ctx.take();
}
}
self.pagestream_handle_batched_message(
pgb_writer,
batch,
@@ -2351,7 +2411,8 @@ impl PageServerHandler {
io_concurrency: IoConcurrency,
batch_break_reason: GetPageBatchBreakReason,
ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
{
debug_assert_current_span_has_tenant_and_timeline_id();
timeline
@@ -2458,6 +2519,7 @@ impl PageServerHandler {
page,
}),
req.timer,
req.ctx,
)
})
.map_err(|e| BatchedPageStreamError {
@@ -2502,7 +2564,8 @@ impl PageServerHandler {
timeline: &Timeline,
requests: Vec<BatchedTestRequest>,
_ctx: &RequestContext,
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer), BatchedPageStreamError>> {
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
{
// real requests would do something with the timeline
let mut results = Vec::with_capacity(requests.len());
for _req in requests.iter() {
@@ -2529,6 +2592,10 @@ impl PageServerHandler {
req: req.req.clone(),
}),
req.timer,
RequestContext::new(
TaskKind::PageRequestHandler,
DownloadBehavior::Warn,
),
)
})
.map_err(|e| BatchedPageStreamError {

View File

@@ -864,6 +864,14 @@ impl Debug for SetStoppingError {
}
}
#[derive(thiserror::Error, Debug)]
pub(crate) enum FinalizeTimelineImportError {
#[error("Import task not done yet")]
ImportTaskStillRunning,
#[error("Shutting down")]
ShuttingDown,
}
/// Arguments to [`TenantShard::create_timeline`].
///
/// Not usable as an idempotency key for timeline creation because if [`CreateTimelineParamsBranch::ancestor_start_lsn`]
@@ -1150,10 +1158,20 @@ impl TenantShard {
ctx,
)?;
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
anyhow::ensure!(
disk_consistent_lsn.is_valid(),
"Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn"
);
if !disk_consistent_lsn.is_valid() {
// As opposed to normal timelines which get initialised with a disk consitent LSN
// via initdb, imported timelines start from 0. If the import task stops before
// it advances disk consitent LSN, allow it to resume.
let in_progress_import = import_pgdata
.as_ref()
.map(|import| !import.is_done())
.unwrap_or(false);
if !in_progress_import {
anyhow::bail!("Timeline {tenant_id}/{timeline_id} has invalid disk_consistent_lsn");
}
}
assert_eq!(
disk_consistent_lsn,
metadata.disk_consistent_lsn(),
@@ -1247,20 +1265,25 @@ impl TenantShard {
}
}
// Sanity check: a timeline should have some content.
anyhow::ensure!(
ancestor.is_some()
|| timeline
.layers
.read()
.await
.layer_map()
.expect("currently loading, layer manager cannot be shutdown already")
.iter_historic_layers()
.next()
.is_some(),
"Timeline has no ancestor and no layer files"
);
if disk_consistent_lsn.is_valid() {
// Sanity check: a timeline should have some content.
// Exception: importing timelines might not yet have any
anyhow::ensure!(
ancestor.is_some()
|| timeline
.layers
.read()
.await
.layer_map()
.expect(
"currently loading, layer manager cannot be shutdown already"
)
.iter_historic_layers()
.next()
.is_some(),
"Timeline has no ancestor and no layer files"
);
}
Ok(TimelineInitAndSyncResult::ReadyToActivate)
}
@@ -2860,13 +2883,13 @@ impl TenantShard {
pub(crate) async fn finalize_importing_timeline(
&self,
timeline_id: TimelineId,
) -> anyhow::Result<()> {
) -> Result<(), FinalizeTimelineImportError> {
let timeline = {
let locked = self.timelines_importing.lock().unwrap();
match locked.get(&timeline_id) {
Some(importing_timeline) => {
if !importing_timeline.import_task_handle.is_finished() {
return Err(anyhow::anyhow!("Import task not done yet"));
return Err(FinalizeTimelineImportError::ImportTaskStillRunning);
}
importing_timeline.timeline.clone()
@@ -2879,8 +2902,13 @@ impl TenantShard {
timeline
.remote_client
.schedule_index_upload_for_import_pgdata_finalize()?;
timeline.remote_client.wait_completion().await?;
.schedule_index_upload_for_import_pgdata_finalize()
.map_err(|_err| FinalizeTimelineImportError::ShuttingDown)?;
timeline
.remote_client
.wait_completion()
.await
.map_err(|_err| FinalizeTimelineImportError::ShuttingDown)?;
self.timelines_importing
.lock()
@@ -3484,8 +3512,9 @@ impl TenantShard {
let mut timelines_importing = self.timelines_importing.lock().unwrap();
timelines_importing
.drain()
.for_each(|(_timeline_id, importing_timeline)| {
importing_timeline.shutdown();
.for_each(|(timeline_id, importing_timeline)| {
let span = tracing::info_span!("importing_timeline_shutdown", %timeline_id);
js.spawn(async move { importing_timeline.shutdown().instrument(span).await });
});
}
// test_long_timeline_create_then_tenant_delete is leaning on this message
@@ -5286,6 +5315,7 @@ impl TenantShard {
l0_compaction_trigger: self.l0_compaction_trigger.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
basebackup_prepare_sender: self.basebackup_prepare_sender.clone(),
feature_resolver: self.feature_resolver.clone(),
}
}
@@ -8330,10 +8360,24 @@ mod tests {
}
tline.freeze_and_flush().await?;
// Force layers to L1
tline
.compact(
&cancel,
{
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceL0Compaction);
flags
},
&ctx,
)
.await?;
if iter % 5 == 0 {
let scan_lsn = Lsn(lsn.0 + 1);
info!("scanning at {}", scan_lsn);
let (_, before_delta_file_accessed) =
scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone())
scan_with_statistics(&tline, &keyspace, scan_lsn, &ctx, io_concurrency.clone())
.await?;
tline
.compact(
@@ -8342,13 +8386,14 @@ mod tests {
let mut flags = EnumSet::new();
flags.insert(CompactFlags::ForceImageLayerCreation);
flags.insert(CompactFlags::ForceRepartition);
flags.insert(CompactFlags::ForceL0Compaction);
flags
},
&ctx,
)
.await?;
let (_, after_delta_file_accessed) =
scan_with_statistics(&tline, &keyspace, lsn, &ctx, io_concurrency.clone())
scan_with_statistics(&tline, &keyspace, scan_lsn, &ctx, io_concurrency.clone())
.await?;
assert!(
after_delta_file_accessed < before_delta_file_accessed,
@@ -8789,6 +8834,8 @@ mod tests {
let cancel = CancellationToken::new();
// Image layer creation happens on the disk_consistent_lsn so we need to force set it now.
tline.force_set_disk_consistent_lsn(Lsn(0x40));
tline
.compact(
&cancel,
@@ -8802,8 +8849,7 @@ mod tests {
)
.await
.unwrap();
// Image layers are created at last_record_lsn
// Image layers are created at repartition LSN
let images = tline
.inspect_image_layers(Lsn(0x40), &ctx, io_concurrency.clone())
.await

View File

@@ -103,6 +103,7 @@ use crate::context::{
DownloadBehavior, PerfInstrumentFutureExt, RequestContext, RequestContextBuilder,
};
use crate::disk_usage_eviction_task::{DiskUsageEvictionInfo, EvictionCandidate, finite_f32};
use crate::feature_resolver::FeatureResolver;
use crate::keyspace::{KeyPartitioning, KeySpace};
use crate::l0_flush::{self, L0FlushGlobalState};
use crate::metrics::{
@@ -198,6 +199,7 @@ pub struct TimelineResources {
pub l0_compaction_trigger: Arc<Notify>,
pub l0_flush_global_state: l0_flush::L0FlushGlobalState,
pub basebackup_prepare_sender: BasebackupPrepareSender,
pub feature_resolver: FeatureResolver,
}
pub struct Timeline {
@@ -444,6 +446,8 @@ pub struct Timeline {
/// A channel to send async requests to prepare a basebackup for the basebackup cache.
basebackup_prepare_sender: BasebackupPrepareSender,
feature_resolver: FeatureResolver,
}
pub(crate) enum PreviousHeatmap {
@@ -3072,6 +3076,8 @@ impl Timeline {
wait_lsn_log_slow: tokio::sync::Semaphore::new(1),
basebackup_prepare_sender: resources.basebackup_prepare_sender,
feature_resolver: resources.feature_resolver,
};
result.repartition_threshold =
@@ -4906,6 +4912,7 @@ impl Timeline {
LastImageLayerCreationStatus::Initial,
false, // don't yield for L0, we're flushing L0
)
.instrument(info_span!("create_image_layers", mode = %ImageLayerCreationMode::Initial, partition_mode = "initial", lsn = %self.initdb_lsn))
.await?;
debug_assert!(
matches!(is_complete, LastImageLayerCreationStatus::Complete),
@@ -5462,7 +5469,8 @@ impl Timeline {
/// Returns the image layers generated and an enum indicating whether the process is fully completed.
/// true = we have generate all image layers, false = we preempt the process for L0 compaction.
#[tracing::instrument(skip_all, fields(%lsn, %mode))]
///
/// `partition_mode` is only for logging purpose and is not used anywhere in this function.
async fn create_image_layers(
self: &Arc<Timeline>,
partitioning: &KeyPartitioning,

View File

@@ -1278,11 +1278,55 @@ impl Timeline {
}
let gc_cutoff = *self.applied_gc_cutoff_lsn.read();
let l0_l1_boundary_lsn = {
// We do the repartition on the L0-L1 boundary. All data below the boundary
// are compacted by L0 with low read amplification, thus making the `repartition`
// function run fast.
let guard = self.layers.read().await;
guard
.all_persistent_layers()
.iter()
.map(|x| {
// Use the end LSN of delta layers OR the start LSN of image layers.
if x.is_delta {
x.lsn_range.end
} else {
x.lsn_range.start
}
})
.max()
};
let (partition_mode, partition_lsn) = if cfg!(test)
|| cfg!(feature = "testing")
|| self
.feature_resolver
.evaluate_boolean("image-compaction-boundary", self.tenant_shard_id.tenant_id)
.is_ok()
{
let last_repartition_lsn = self.partitioning.read().1;
let lsn = match l0_l1_boundary_lsn {
Some(boundary) => gc_cutoff
.max(boundary)
.max(last_repartition_lsn)
.max(self.initdb_lsn)
.max(self.ancestor_lsn),
None => self.get_last_record_lsn(),
};
if lsn <= self.initdb_lsn || lsn <= self.ancestor_lsn {
// Do not attempt to create image layers below the initdb or ancestor LSN -- no data below it
("l0_l1_boundary", self.get_last_record_lsn())
} else {
("l0_l1_boundary", lsn)
}
} else {
("latest_record", self.get_last_record_lsn())
};
// 2. Repartition and create image layers if necessary
match self
.repartition(
self.get_last_record_lsn(),
partition_lsn,
self.get_compaction_target_size(),
options.flags,
ctx,
@@ -1301,18 +1345,19 @@ impl Timeline {
.extend(sparse_partitioning.into_dense().parts);
// 3. Create new image layers for partitions that have been modified "enough".
let mode = if options
.flags
.contains(CompactFlags::ForceImageLayerCreation)
{
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
};
let (image_layers, outcome) = self
.create_image_layers(
&partitioning,
lsn,
if options
.flags
.contains(CompactFlags::ForceImageLayerCreation)
{
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
},
mode,
&image_ctx,
self.last_image_layer_creation_status
.load()
@@ -1320,6 +1365,7 @@ impl Timeline {
.clone(),
options.flags.contains(CompactFlags::YieldForL0),
)
.instrument(info_span!("create_image_layers", mode = %mode, partition_mode = %partition_mode, lsn = %lsn))
.await
.inspect_err(|err| {
if let CreateImageLayersError::GetVectoredError(
@@ -1344,7 +1390,8 @@ impl Timeline {
}
Ok(_) => {
info!("skipping repartitioning due to image compaction LSN being below GC cutoff");
// This happens very frequently so we don't want to log it.
debug!("skipping repartitioning due to image compaction LSN being below GC cutoff");
}
// Suppress errors when cancelled.

View File

@@ -25,8 +25,11 @@ pub(crate) struct ImportingTimeline {
}
impl ImportingTimeline {
pub(crate) fn shutdown(self) {
pub(crate) async fn shutdown(self) {
self.import_task_handle.abort();
let _ = self.import_task_handle.await;
self.timeline.remote_client.shutdown().await;
}
}
@@ -93,6 +96,11 @@ pub async fn doit(
);
}
timeline
.remote_client
.schedule_index_upload_for_file_changes()?;
timeline.remote_client.wait_completion().await?;
// Communicate that shard is done.
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.

View File

@@ -30,6 +30,7 @@
use std::collections::HashSet;
use std::hash::{Hash, Hasher};
use std::num::NonZeroUsize;
use std::ops::Range;
use std::sync::Arc;
@@ -100,8 +101,24 @@ async fn run_v1(
tasks: Vec::default(),
};
let import_config = &timeline.conf.timeline_import_config;
let plan = planner.plan(import_config).await?;
// Use the job size limit encoded in the progress if we are resuming an import.
// This ensures that imports have stable plans even if the pageserver config changes.
let import_config = {
match &import_progress {
Some(progress) => {
let base = &timeline.conf.timeline_import_config;
TimelineImportConfig {
import_job_soft_size_limit: NonZeroUsize::new(progress.job_soft_size_limit)
.unwrap(),
import_job_concurrency: base.import_job_concurrency,
import_job_checkpoint_threshold: base.import_job_checkpoint_threshold,
}
}
None => timeline.conf.timeline_import_config.clone(),
}
};
let plan = planner.plan(&import_config).await?;
// Hash the plan and compare with the hash of the plan we got back from the storage controller.
// If the two match, it means that the planning stage had the same output.
@@ -113,20 +130,20 @@ async fn run_v1(
let plan_hash = hasher.finish();
if let Some(progress) = &import_progress {
if plan_hash != progress.import_plan_hash {
anyhow::bail!("Import plan does not match storcon metadata");
}
// Handle collisions on jobs of unequal length
if progress.jobs != plan.jobs.len() {
anyhow::bail!("Import plan job length does not match storcon metadata")
}
if plan_hash != progress.import_plan_hash {
anyhow::bail!("Import plan does not match storcon metadata");
}
}
pausable_failpoint!("import-timeline-pre-execute-pausable");
let start_from_job_idx = import_progress.map(|progress| progress.completed);
plan.execute(timeline, start_from_job_idx, plan_hash, import_config, ctx)
plan.execute(timeline, start_from_job_idx, plan_hash, &import_config, ctx)
.await
}
@@ -218,6 +235,19 @@ impl Planner {
checkpoint_buf,
)));
// Sort the tasks by the key ranges they handle.
// The plan being generated here needs to be stable across invocations
// of this method.
self.tasks.sort_by_key(|task| match task {
AnyImportTask::SingleKey(key) => (key.key, key.key.next()),
AnyImportTask::RelBlocks(rel_blocks) => {
(rel_blocks.key_range.start, rel_blocks.key_range.end)
}
AnyImportTask::SlruBlocks(slru_blocks) => {
(slru_blocks.key_range.start, slru_blocks.key_range.end)
}
});
// Assigns parts of key space to later parallel jobs
let mut last_end_key = Key::MIN;
let mut current_chunk = Vec::new();
@@ -426,6 +456,8 @@ impl Plan {
}));
},
maybe_complete_job_idx = work.next() => {
pausable_failpoint!("import-task-complete-pausable");
match maybe_complete_job_idx {
Some(Ok((job_idx, res))) => {
assert!(last_completed_job_idx.checked_add(1).unwrap() == job_idx);
@@ -438,8 +470,12 @@ impl Plan {
jobs: jobs_in_plan,
completed: last_completed_job_idx,
import_plan_hash,
job_soft_size_limit: import_config.import_job_soft_size_limit.into(),
};
timeline.remote_client.schedule_index_upload_for_file_changes()?;
timeline.remote_client.wait_completion().await?;
storcon_client.put_timeline_import_status(
timeline.tenant_shard_id,
timeline.timeline_id,
@@ -640,7 +676,11 @@ impl Hash for ImportSingleKeyTask {
let ImportSingleKeyTask { key, buf } = self;
key.hash(state);
buf.hash(state);
// The key value might not have a stable binary representation.
// For instance, the db directory uses an unstable hash-map.
// To work around this we are a bit lax here and only hash the
// size of the buffer which must be consistent.
buf.len().hash(state);
}
}
@@ -915,7 +955,7 @@ impl ChunkProcessingJob {
let guard = timeline.layers.read().await;
let existing_layer = guard.try_get_from_key(&desc.key());
if let Some(layer) = existing_layer {
if layer.metadata().generation != timeline.generation {
if layer.metadata().generation == timeline.generation {
return Err(anyhow::anyhow!(
"Import attempted to rewrite layer file in the same generation: {}",
layer.local_path()

View File

@@ -3922,6 +3922,11 @@ impl Service {
})
}
#[instrument(skip_all, fields(
tenant_id=%req.tenant_shard_id.tenant_id,
shard_id=%req.tenant_shard_id.shard_slug(),
timeline_id=%req.timeline_id,
))]
pub(crate) async fn handle_timeline_shard_import_progress(
self: &Arc<Self>,
req: TimelineImportStatusRequest,
@@ -3971,6 +3976,11 @@ impl Service {
})
}
#[instrument(skip_all, fields(
tenant_id=%req.tenant_shard_id.tenant_id,
shard_id=%req.tenant_shard_id.shard_slug(),
timeline_id=%req.timeline_id,
))]
pub(crate) async fn handle_timeline_shard_import_progress_upcall(
self: &Arc<Self>,
req: PutTimelineImportStatusRequest,

View File

@@ -404,6 +404,29 @@ class PageserverTracingConfig:
return ("tracing", value)
@dataclass
class PageserverImportConfig:
import_job_concurrency: int
import_job_soft_size_limit: int
import_job_checkpoint_threshold: int
@staticmethod
def default() -> PageserverImportConfig:
return PageserverImportConfig(
import_job_concurrency=4,
import_job_soft_size_limit=512 * 1024,
import_job_checkpoint_threshold=4,
)
def to_config_key_value(self) -> tuple[str, dict[str, Any]]:
value = {
"import_job_concurrency": self.import_job_concurrency,
"import_job_soft_size_limit": self.import_job_soft_size_limit,
"import_job_checkpoint_threshold": self.import_job_checkpoint_threshold,
}
return ("timeline_import_config", value)
class NeonEnvBuilder:
"""
Builder object to create a Neon runtime environment
@@ -454,6 +477,7 @@ class NeonEnvBuilder:
pageserver_wal_receiver_protocol: PageserverWalReceiverProtocol | None = None,
pageserver_get_vectored_concurrent_io: str | None = None,
pageserver_tracing_config: PageserverTracingConfig | None = None,
pageserver_import_config: PageserverImportConfig | None = None,
):
self.repo_dir = repo_dir
self.rust_log_override = rust_log_override
@@ -511,6 +535,7 @@ class NeonEnvBuilder:
)
self.pageserver_tracing_config = pageserver_tracing_config
self.pageserver_import_config = pageserver_import_config
self.pageserver_default_tenant_config_compaction_algorithm: dict[str, Any] | None = (
pageserver_default_tenant_config_compaction_algorithm
@@ -1179,6 +1204,10 @@ class NeonEnv:
self.pageserver_wal_receiver_protocol = config.pageserver_wal_receiver_protocol
self.pageserver_get_vectored_concurrent_io = config.pageserver_get_vectored_concurrent_io
self.pageserver_tracing_config = config.pageserver_tracing_config
if config.pageserver_import_config is None:
self.pageserver_import_config = PageserverImportConfig.default()
else:
self.pageserver_import_config = config.pageserver_import_config
# Create the neon_local's `NeonLocalInitConf`
cfg: dict[str, Any] = {
@@ -1258,12 +1287,6 @@ class NeonEnv:
"no_sync": True,
# Look for gaps in WAL received from safekeepeers
"validate_wal_contiguity": True,
# TODO(vlad): make these configurable through the builder
"timeline_import_config": {
"import_job_concurrency": 4,
"import_job_soft_size_limit": 512 * 1024,
"import_job_checkpoint_threshold": 4,
},
}
# Batching (https://github.com/neondatabase/neon/issues/9377):
@@ -1325,6 +1348,12 @@ class NeonEnv:
ps_cfg[key] = value
if self.pageserver_import_config is not None:
key, value = self.pageserver_import_config.to_config_key_value()
if key not in ps_cfg:
ps_cfg[key] = value
# Create a corresponding NeonPageserver object
ps = NeonPageserver(
self, ps_id, port=pageserver_port, az_id=ps_cfg["availability_zone"]

View File

@@ -1,7 +1,10 @@
import base64
import concurrent.futures
import json
import random
import threading
import time
from enum import Enum
from enum import Enum, StrEnum
from pathlib import Path
from threading import Event
@@ -11,7 +14,14 @@ import pytest
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.fast_import import FastImport
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PgProtocol, VanillaPostgres
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PageserverImportConfig,
PgBin,
PgProtocol,
StorageControllerMigrationConfig,
VanillaPostgres,
)
from fixtures.pageserver.http import (
ImportPgdataIdemptencyKey,
)
@@ -494,6 +504,259 @@ def test_import_respects_tenant_shutdown(
wait_until(cplane_notified)
@skip_in_debug_build("Validation query takes too long in debug builds")
def test_import_chaos(
neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer
):
"""
Perform a timeline import while injecting chaos in the environment.
We expect that the import completes eventually, that it passes validation and
the resulting timeline can be written to.
"""
TARGET_RELBOCK_SIZE = 512 * 1024 * 1024 # 512 MiB
ALLOWED_IMPORT_RUNTIME = 90 # seconds
SHARD_COUNT = 4
neon_env_builder.num_pageservers = SHARD_COUNT
neon_env_builder.pageserver_import_config = PageserverImportConfig(
import_job_concurrency=1,
import_job_soft_size_limit=64 * 1024,
import_job_checkpoint_threshold=4,
)
# Set up mock control plane HTTP server to listen for import completions
import_completion_signaled = Event()
# There's some Python magic at play here. A list can be updated from the
# handler thread, but an optional cannot. Hence, use a list with one element.
import_error = []
def handler(request: Request) -> Response:
assert request.json is not None
body = request.json
if "error" in body:
if body["error"]:
import_error.append(body["error"])
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
# Plug the cplane mock in
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
# The import will specifiy a local filesystem path mocking remote storage
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
vanilla_pg.start()
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
nrows = 0
while True:
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
log.info(
f"relblock size: {relblock_size / 8192} pages (target: {TARGET_RELBOCK_SIZE // 8192}) pages"
)
if relblock_size >= TARGET_RELBOCK_SIZE:
break
addrows = int((TARGET_RELBOCK_SIZE - relblock_size) // 8192)
assert addrows >= 1, "forward progress"
vanilla_pg.safe_psql(
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
)
nrows += addrows
vanilla_pg.stop()
env = neon_env_builder.init_configs()
env.start()
# Pause after every import task to extend the test runtime and allow
# for more chaos injection.
for ps in env.pageservers:
ps.add_persistent_failpoint("import-task-complete-pausable", "sleep(5)")
env.storage_controller.allowed_errors.extend(
[
# The shard might have moved or the pageserver hosting the shard restarted
".*Call to node.*management API.*failed.*",
# Migrations have their time outs set to 0
".*Timed out after.*downloading layers.*",
".*Failed to prepare by downloading layers.*",
# The test may kill the storage controller or pageservers
".*request was dropped before completing.*",
]
)
for ps in env.pageservers:
ps.allowed_errors.extend(
[
# We might re-write a layer in a different generation if the import
# needs to redo some of the progress since not each job is checkpointed.
".*was unlinked but was not dangling.*",
# The test may kill the storage controller or pageservers
".*request was dropped before completing.*",
# Test can SIGTERM pageserver while it is downloading
".*removing local file.*temp_download.*",
".*Failed to flush heatmap.*",
# Test can SIGTERM the storage controller while pageserver
# is attempting to upcall.
".*storage controller upcall failed.*timeline_import_status.*",
# TODO(vlad): TenantManager::reset_tenant returns a blanked anyhow error.
# It should return ResourceUnavailable or something that doesn't error log.
".*activate_post_import.*InternalServerError.*tenant map is shutting down.*",
# TODO(vlad): How can this happen?
".*Failed to download a remote file: deserialize index part file.*",
".*Cancelled request finished with an error.*",
]
)
importbucket_path = neon_env_builder.repo_dir / "test_import_chaos_bucket"
mock_import_bucket(vanilla_pg, importbucket_path)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
idempotency = ImportPgdataIdemptencyKey.random()
env.storage_controller.tenant_create(
tenant_id, shard_count=SHARD_COUNT, placement_policy={"Attached": 1}
)
env.storage_controller.reconcile_until_idle()
env.storage_controller.timeline_create(
tenant_id,
{
"new_timeline_id": str(timeline_id),
"import_pgdata": {
"idempotency_key": str(idempotency),
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
},
},
)
def chaos(stop_chaos: threading.Event):
class ChaosType(StrEnum):
MIGRATE_SHARD = "migrate_shard"
RESTART_IMMEDIATE = "restart_immediate"
RESTART = "restart"
STORCON_RESTART_IMMEDIATE = "storcon_restart_immediate"
while not stop_chaos.is_set():
chaos_type = random.choices(
population=[
ChaosType.MIGRATE_SHARD,
ChaosType.RESTART,
ChaosType.RESTART_IMMEDIATE,
ChaosType.STORCON_RESTART_IMMEDIATE,
],
weights=[0.25, 0.25, 0.25, 0.25],
k=1,
)[0]
try:
if chaos_type == ChaosType.MIGRATE_SHARD:
target_shard_number = random.randint(0, SHARD_COUNT - 1)
target_shard = TenantShardId(tenant_id, target_shard_number, SHARD_COUNT)
placements = env.storage_controller.get_tenants_placement()
log.info(f"{placements=}")
target_ps = placements[str(target_shard)]["intent"]["attached"]
if len(placements[str(target_shard)]["intent"]["secondary"]) == 0:
dest_ps = None
else:
dest_ps = placements[str(target_shard)]["intent"]["secondary"][0]
if target_ps is None or dest_ps is None:
continue
config = StorageControllerMigrationConfig(
secondary_warmup_timeout="0s",
secondary_download_request_timeout="0s",
prewarm=False,
)
env.storage_controller.tenant_shard_migrate(target_shard, dest_ps, config)
log.info(
f"CHAOS: Migrating shard {target_shard} from pageserver {target_ps} to {dest_ps}"
)
elif chaos_type == ChaosType.RESTART_IMMEDIATE:
target_ps = random.choice(env.pageservers)
log.info(f"CHAOS: Immediate restart of pageserver {target_ps.id}")
target_ps.stop(immediate=True)
target_ps.start()
elif chaos_type == ChaosType.RESTART:
target_ps = random.choice(env.pageservers)
log.info(f"CHAOS: Normal restart of pageserver {target_ps.id}")
target_ps.stop(immediate=False)
target_ps.start()
elif chaos_type == ChaosType.STORCON_RESTART_IMMEDIATE:
log.info("CHAOS: Immediate restart of storage controller")
env.storage_controller.stop(immediate=True)
env.storage_controller.start()
except Exception as e:
log.warning(f"CHAOS: Error during chaos operation {chaos_type}: {e}")
# Sleep before next chaos event
time.sleep(1)
log.info("Chaos injector stopped")
def wait_for_import_completion():
start = time.time()
done = import_completion_signaled.wait(ALLOWED_IMPORT_RUNTIME)
if not done:
raise TimeoutError(f"Import did not signal completion within {ALLOWED_IMPORT_RUNTIME}")
end = time.time()
log.info(f"Import completion signalled after {end - start}s {import_error=}")
if import_error:
raise RuntimeError(f"Import error: {import_error}")
with concurrent.futures.ThreadPoolExecutor() as executor:
stop_chaos = threading.Event()
wait_for_import_completion_fut = executor.submit(wait_for_import_completion)
chaos_fut = executor.submit(chaos, stop_chaos)
try:
wait_for_import_completion_fut.result()
except Exception as e:
raise e
finally:
stop_chaos.set()
chaos_fut.result()
import_branch_name = "imported"
env.neon_cli.mappings_map_branch(import_branch_name, tenant_id, timeline_id)
endpoint = env.endpoints.create_start(branch_name=import_branch_name, tenant_id=tenant_id)
# Validate the imported data is legit
assert endpoint.safe_psql_many(
[
"set effective_io_concurrency=32;",
"SET statement_timeout='300s';",
"select count(*), sum(data::bigint)::bigint from t",
]
) == [[], [], [(nrows, nrows * (nrows + 1) // 2)]]
endpoint.stop()
# Validate writes
workload = Workload(env, tenant_id, timeline_id, branch_name=import_branch_name)
workload.init()
workload.write_rows(64)
workload.validate()
def test_fast_import_with_pageserver_ingest(
test_output_dir,
vanilla_pg: VanillaPostgres,

View File

@@ -20,6 +20,9 @@ from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind
from fixtures.utils import query_scalar, wait_until
@pytest.mark.skip(
reason="We won't create future layers any more after https://github.com/neondatabase/neon/pull/10548"
)
@pytest.mark.parametrize(
"attach_mode",
["default_generation", "same_generation"],

View File

@@ -4158,17 +4158,12 @@ def test_storcon_create_delete_sk_down(
env.storage_controller.stop()
env.storage_controller.start()
config_lines = [
"neon.safekeeper_proto_version = 3",
]
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
with env.endpoints.create("main", tenant_id=tenant_id) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
with env.endpoints.create(
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
) as ep:
with env.endpoints.create("child_of_main", tenant_id=tenant_id) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
@@ -4249,17 +4244,12 @@ def test_storcon_few_sk(
env.safekeepers[0].assert_log_contains(f"creating new timeline {tenant_id}/{timeline_id}")
config_lines = [
"neon.safekeeper_proto_version = 3",
]
with env.endpoints.create("main", tenant_id=tenant_id, config_lines=config_lines) as ep:
with env.endpoints.create("main", tenant_id=tenant_id) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")
with env.endpoints.create(
"child_of_main", tenant_id=tenant_id, config_lines=config_lines
) as ep:
with env.endpoints.create("child_of_main", tenant_id=tenant_id) as ep:
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=safekeeper_list)
ep.safe_psql("CREATE TABLE IF NOT EXISTS t(key int, value text)")

View File

@@ -2012,10 +2012,7 @@ def test_explicit_timeline_creation(neon_env_builder: NeonEnvBuilder):
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
config_lines = [
"neon.safekeeper_proto_version = 3",
]
ep = env.endpoints.create("main", config_lines=config_lines)
ep = env.endpoints.create("main")
# expected to fail because timeline is not created on safekeepers
with pytest.raises(Exception, match=r".*timed out.*"):
@@ -2043,10 +2040,7 @@ def test_explicit_timeline_creation_storcon(neon_env_builder: NeonEnvBuilder):
}
env = neon_env_builder.init_start()
config_lines = [
"neon.safekeeper_proto_version = 3",
]
ep = env.endpoints.create("main", config_lines=config_lines)
ep = env.endpoints.create("main")
# endpoint should start.
ep.start(safekeeper_generation=1, safekeepers=[1, 2, 3])

View File

@@ -637,10 +637,7 @@ async def quorum_sanity_single(
# create timeline on `members_sks`
Safekeeper.create_timeline(tenant_id, timeline_id, env.pageservers[0], mconf, members_sks)
config_lines = [
"neon.safekeeper_proto_version = 3",
]
ep = env.endpoints.create(branch_name, config_lines=config_lines)
ep = env.endpoints.create(branch_name)
ep.start(safekeeper_generation=1, safekeepers=compute_sks_ids)
ep.safe_psql("create table t(key int, value text)")