mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-19 14:10:37 +00:00
Compare commits
4 Commits
tristan957
...
erik/grpc-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2d2d29bf38 | ||
|
|
e8ebb8e433 | ||
|
|
232591e457 | ||
|
|
8daf272561 |
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -4236,6 +4236,7 @@ name = "pagebench"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
"camino",
|
||||
"clap",
|
||||
"futures",
|
||||
@@ -4244,12 +4245,15 @@ dependencies = [
|
||||
"humantime-serde",
|
||||
"pageserver_api",
|
||||
"pageserver_client",
|
||||
"pageserver_page_api",
|
||||
"rand 0.8.5",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util",
|
||||
"tonic 0.13.1",
|
||||
"tracing",
|
||||
"utils",
|
||||
"workspace_hack",
|
||||
|
||||
@@ -57,6 +57,21 @@ use tracing::{error, info};
|
||||
use url::Url;
|
||||
use utils::failpoint_support;
|
||||
|
||||
// Compatibility hack: if the control plane specified any remote-ext-config
|
||||
// use the default value for extension storage proxy gateway.
|
||||
// Remove this once the control plane is updated to pass the gateway URL
|
||||
fn parse_remote_ext_base_url(arg: &str) -> Result<String> {
|
||||
const FALLBACK_PG_EXT_GATEWAY_BASE_URL: &str =
|
||||
"http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local";
|
||||
|
||||
Ok(if arg.starts_with("http") {
|
||||
arg
|
||||
} else {
|
||||
FALLBACK_PG_EXT_GATEWAY_BASE_URL
|
||||
}
|
||||
.to_owned())
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(rename_all = "kebab-case")]
|
||||
struct Cli {
|
||||
@@ -65,7 +80,7 @@ struct Cli {
|
||||
|
||||
/// The base URL for the remote extension storage proxy gateway.
|
||||
/// Should be in the form of `http(s)://<gateway-hostname>[:<port>]`.
|
||||
#[arg(short = 'r', long, alias = "remote-ext-config")]
|
||||
#[arg(short = 'r', long, value_parser = parse_remote_ext_base_url, alias = "remote-ext-config")]
|
||||
pub remote_ext_base_url: Option<String>,
|
||||
|
||||
/// The port to bind the external listening HTTP server to. Clients running
|
||||
@@ -261,4 +276,18 @@ mod test {
|
||||
fn verify_cli() {
|
||||
Cli::command().debug_assert()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_pg_ext_gateway_base_url() {
|
||||
let arg = "http://pg-ext-s3-gateway2";
|
||||
let result = super::parse_remote_ext_base_url(arg).unwrap();
|
||||
assert_eq!(result, arg);
|
||||
|
||||
let arg = "pg-ext-s3-gateway";
|
||||
let result = super::parse_remote_ext_base_url(arg).unwrap();
|
||||
assert_eq!(
|
||||
result,
|
||||
"http://pg-ext-s3-gateway.pg-ext-s3-gateway.svc.cluster.local"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,8 +339,6 @@ async fn run_dump_restore(
|
||||
destination_connstring: String,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let dumpdir = workdir.join("dumpdir");
|
||||
let num_jobs = num_cpus::get().to_string();
|
||||
info!("using {num_jobs} jobs for dump/restore");
|
||||
|
||||
let common_args = [
|
||||
// schema mapping (prob suffices to specify them on one side)
|
||||
@@ -356,7 +354,7 @@ async fn run_dump_restore(
|
||||
"directory".to_string(),
|
||||
// concurrency
|
||||
"--jobs".to_string(),
|
||||
num_jobs,
|
||||
num_cpus::get().to_string(),
|
||||
// progress updates
|
||||
"--verbose".to_string(),
|
||||
];
|
||||
|
||||
@@ -71,7 +71,7 @@ impl AsyncAuthorizeRequest<Body> for Authorize {
|
||||
// Unauthorized because when we eventually do use
|
||||
// [`Validation`], we will hit the above `Err` match arm which
|
||||
// returns 401 Unauthorized.
|
||||
Some(ref scope) if scope.contains(&ComputeClaimsScope::Admin) => {
|
||||
Some(ComputeClaimsScope::Admin) => {
|
||||
let Some(ref audience) = data.claims.audience else {
|
||||
return Err(JsonResponse::error(
|
||||
StatusCode::UNAUTHORIZED,
|
||||
|
||||
@@ -709,7 +709,7 @@ struct EndpointGenerateJwtCmdArgs {
|
||||
endpoint_id: String,
|
||||
|
||||
#[clap(short = 's', long, help = "Scope to generate the JWT with", value_parser = ComputeClaimsScope::from_str)]
|
||||
scope: Vec<ComputeClaimsScope>,
|
||||
scope: Option<ComputeClaimsScope>,
|
||||
}
|
||||
|
||||
#[derive(clap::Subcommand)]
|
||||
@@ -1580,7 +1580,7 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
|
||||
.with_context(|| format!("postgres endpoint {endpoint_id} is not found"))?
|
||||
};
|
||||
|
||||
let jwt = endpoint.generate_jwt(Some(args.scope.clone()))?;
|
||||
let jwt = endpoint.generate_jwt(args.scope)?;
|
||||
|
||||
print!("{jwt}");
|
||||
}
|
||||
|
||||
@@ -632,16 +632,14 @@ impl Endpoint {
|
||||
}
|
||||
|
||||
/// Generate a JWT with the correct claims.
|
||||
pub fn generate_jwt(&self, scope: Option<Vec<ComputeClaimsScope>>) -> Result<String> {
|
||||
pub fn generate_jwt(&self, scope: Option<ComputeClaimsScope>) -> Result<String> {
|
||||
self.env.generate_auth_token(&ComputeClaims {
|
||||
audience: match scope {
|
||||
Some(ref scope) if scope.contains(&ComputeClaimsScope::Admin) => {
|
||||
Some(vec![COMPUTE_AUDIENCE.to_owned()])
|
||||
}
|
||||
Some(ComputeClaimsScope::Admin) => Some(vec![COMPUTE_AUDIENCE.to_owned()]),
|
||||
_ => None,
|
||||
},
|
||||
compute_id: match scope {
|
||||
Some(ref scope) if scope.contains(&ComputeClaimsScope::Admin) => None,
|
||||
Some(ComputeClaimsScope::Admin) => None,
|
||||
_ => Some(self.endpoint_id.clone()),
|
||||
},
|
||||
scope,
|
||||
@@ -920,7 +918,7 @@ impl Endpoint {
|
||||
self.external_http_address.port()
|
||||
),
|
||||
)
|
||||
.bearer_auth(self.generate_jwt(None::<Vec<ComputeClaimsScope>>)?)
|
||||
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
@@ -997,7 +995,7 @@ impl Endpoint {
|
||||
self.external_http_address.port()
|
||||
))
|
||||
.header(CONTENT_TYPE.as_str(), "application/json")
|
||||
.bearer_auth(self.generate_jwt(None::<Vec<ComputeClaimsScope>>)?)
|
||||
.bearer_auth(self.generate_jwt(None::<ComputeClaimsScope>)?)
|
||||
.body(
|
||||
serde_json::to_string(&ConfigurationRequest {
|
||||
spec,
|
||||
|
||||
@@ -41,8 +41,8 @@ pub struct ComputeClaims {
|
||||
/// [`ComputeClaimsScope::Admin`].
|
||||
pub compute_id: Option<String>,
|
||||
|
||||
/// The scopes of what the token is authorized for.
|
||||
pub scope: Option<Vec<ComputeClaimsScope>>,
|
||||
/// The scope of what the token authorizes.
|
||||
pub scope: Option<ComputeClaimsScope>,
|
||||
|
||||
/// The recipient the token is intended for.
|
||||
///
|
||||
|
||||
@@ -27,7 +27,6 @@ pub use prometheus::{
|
||||
|
||||
pub mod launch_timestamp;
|
||||
mod wrappers;
|
||||
pub use prometheus;
|
||||
pub use wrappers::{CountedReader, CountedWriter};
|
||||
mod hll;
|
||||
pub use hll::{HyperLogLog, HyperLogLogState, HyperLogLogVec};
|
||||
|
||||
@@ -354,9 +354,6 @@ pub struct ShardImportProgressV1 {
|
||||
pub completed: usize,
|
||||
/// Hash of the plan
|
||||
pub import_plan_hash: u64,
|
||||
/// Soft limit for the job size
|
||||
/// This needs to remain constant throughout the import
|
||||
pub job_soft_size_limit: usize,
|
||||
}
|
||||
|
||||
impl ShardImportStatus {
|
||||
@@ -1934,7 +1931,7 @@ pub enum PagestreamFeMessage {
|
||||
}
|
||||
|
||||
// Wrapped in libpq CopyData
|
||||
#[derive(strum_macros::EnumProperty)]
|
||||
#[derive(Debug, strum_macros::EnumProperty)]
|
||||
pub enum PagestreamBeMessage {
|
||||
Exists(PagestreamExistsResponse),
|
||||
Nblocks(PagestreamNblocksResponse),
|
||||
@@ -2045,7 +2042,7 @@ pub enum PagestreamProtocolVersion {
|
||||
|
||||
pub type RequestId = u64;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamRequest {
|
||||
pub reqid: RequestId,
|
||||
pub request_lsn: Lsn,
|
||||
@@ -2064,7 +2061,7 @@ pub struct PagestreamNblocksRequest {
|
||||
pub rel: RelTag,
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
|
||||
pub struct PagestreamGetPageRequest {
|
||||
pub hdr: PagestreamRequest,
|
||||
pub rel: RelTag,
|
||||
|
||||
@@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize};
|
||||
// FIXME: should move 'forknum' as last field to keep this consistent with Postgres.
|
||||
// Then we could replace the custom Ord and PartialOrd implementations below with
|
||||
// deriving them. This will require changes in walredoproc.c.
|
||||
#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
|
||||
#[derive(Debug, Default, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)]
|
||||
pub struct RelTag {
|
||||
pub forknum: u8,
|
||||
pub spcnode: Oid,
|
||||
|
||||
@@ -4,7 +4,6 @@ use std::{sync::Arc, time::Duration};
|
||||
|
||||
use arc_swap::ArcSwap;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{Instrument, info_span};
|
||||
|
||||
use crate::{FeatureStore, PostHogClient, PostHogClientConfig};
|
||||
|
||||
@@ -27,35 +26,31 @@ impl FeatureResolverBackgroundLoop {
|
||||
pub fn spawn(self: Arc<Self>, handle: &tokio::runtime::Handle, refresh_period: Duration) {
|
||||
let this = self.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
handle.spawn(
|
||||
async move {
|
||||
tracing::info!("Starting PostHog feature resolver");
|
||||
let mut ticker = tokio::time::interval(refresh_period);
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = ticker.tick() => {}
|
||||
_ = cancel.cancelled() => break
|
||||
}
|
||||
let resp = match this
|
||||
.posthog_client
|
||||
.get_feature_flags_local_evaluation()
|
||||
.await
|
||||
{
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
tracing::warn!("Cannot get feature flags: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let feature_store = FeatureStore::new_with_flags(resp.flags);
|
||||
this.feature_store.store(Arc::new(feature_store));
|
||||
tracing::info!("Feature flag updated");
|
||||
handle.spawn(async move {
|
||||
tracing::info!("Starting PostHog feature resolver");
|
||||
let mut ticker = tokio::time::interval(refresh_period);
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = ticker.tick() => {}
|
||||
_ = cancel.cancelled() => break
|
||||
}
|
||||
tracing::info!("PostHog feature resolver stopped");
|
||||
let resp = match this
|
||||
.posthog_client
|
||||
.get_feature_flags_local_evaluation()
|
||||
.await
|
||||
{
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
tracing::warn!("Cannot get feature flags: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let feature_store = FeatureStore::new_with_flags(resp.flags);
|
||||
this.feature_store.store(Arc::new(feature_store));
|
||||
}
|
||||
.instrument(info_span!("posthog_feature_resolver")),
|
||||
);
|
||||
tracing::info!("PostHog feature resolver stopped");
|
||||
});
|
||||
}
|
||||
|
||||
pub fn feature_store(&self) -> Arc<FeatureStore> {
|
||||
|
||||
@@ -448,18 +448,6 @@ impl FeatureStore {
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Infer whether a feature flag is a boolean flag by checking if it has a multivariate filter.
|
||||
pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
|
||||
if let Some(flag_config) = self.flags.get(flag_key) {
|
||||
Ok(flag_config.filters.multivariate.is_none())
|
||||
} else {
|
||||
Err(PostHogEvaluationError::NotAvailable(format!(
|
||||
"Not found in the local evaluation spec: {}",
|
||||
flag_key
|
||||
)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PostHogClientConfig {
|
||||
@@ -540,15 +528,7 @@ impl PostHogClient {
|
||||
.bearer_auth(&self.config.server_api_key)
|
||||
.send()
|
||||
.await?;
|
||||
let status = response.status();
|
||||
let body = response.text().await?;
|
||||
if !status.is_success() {
|
||||
return Err(anyhow::anyhow!(
|
||||
"Failed to get feature flags: {}, {}",
|
||||
status,
|
||||
body
|
||||
));
|
||||
}
|
||||
Ok(serde_json::from_str(&body)?)
|
||||
}
|
||||
|
||||
|
||||
@@ -264,56 +264,10 @@ mod propagation_of_cached_label_value {
|
||||
}
|
||||
}
|
||||
|
||||
criterion_group!(histograms, histograms::bench_bucket_scalability);
|
||||
mod histograms {
|
||||
use std::time::Instant;
|
||||
|
||||
use criterion::{BenchmarkId, Criterion};
|
||||
use metrics::core::Collector;
|
||||
|
||||
pub fn bench_bucket_scalability(c: &mut Criterion) {
|
||||
let mut g = c.benchmark_group("bucket_scalability");
|
||||
|
||||
for n in [1, 4, 8, 16, 32, 64, 128, 256] {
|
||||
g.bench_with_input(BenchmarkId::new("nbuckets", n), &n, |b, n| {
|
||||
b.iter_custom(|iters| {
|
||||
let buckets: Vec<f64> = (0..*n).map(|i| i as f64 * 100.0).collect();
|
||||
let histo = metrics::Histogram::with_opts(
|
||||
metrics::prometheus::HistogramOpts::new("name", "help")
|
||||
.buckets(buckets.clone()),
|
||||
)
|
||||
.unwrap();
|
||||
let start = Instant::now();
|
||||
for i in 0..usize::try_from(iters).unwrap() {
|
||||
histo.observe(buckets[i % buckets.len()]);
|
||||
}
|
||||
let elapsed = start.elapsed();
|
||||
// self-test
|
||||
let mfs = histo.collect();
|
||||
assert_eq!(mfs.len(), 1);
|
||||
let metrics = mfs[0].get_metric();
|
||||
assert_eq!(metrics.len(), 1);
|
||||
let histo = metrics[0].get_histogram();
|
||||
let buckets = histo.get_bucket();
|
||||
assert!(
|
||||
buckets
|
||||
.iter()
|
||||
.enumerate()
|
||||
.all(|(i, b)| b.get_cumulative_count()
|
||||
>= i as u64 * (iters / buckets.len() as u64))
|
||||
);
|
||||
elapsed
|
||||
})
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
criterion_main!(
|
||||
label_values,
|
||||
single_metric_multicore_scalability,
|
||||
propagation_of_cached_label_value,
|
||||
histograms,
|
||||
propagation_of_cached_label_value
|
||||
);
|
||||
|
||||
/*
|
||||
@@ -336,14 +290,6 @@ propagation_of_cached_label_value__naive/nthreads/8 time: [211.50 ns 214.44 ns
|
||||
propagation_of_cached_label_value__long_lived_reference_per_thread/nthreads/1 time: [14.135 ns 14.147 ns 14.160 ns]
|
||||
propagation_of_cached_label_value__long_lived_reference_per_thread/nthreads/4 time: [14.243 ns 14.255 ns 14.268 ns]
|
||||
propagation_of_cached_label_value__long_lived_reference_per_thread/nthreads/8 time: [14.470 ns 14.682 ns 14.895 ns]
|
||||
bucket_scalability/nbuckets/1 time: [30.352 ns 30.353 ns 30.354 ns]
|
||||
bucket_scalability/nbuckets/4 time: [30.464 ns 30.465 ns 30.467 ns]
|
||||
bucket_scalability/nbuckets/8 time: [30.569 ns 30.575 ns 30.584 ns]
|
||||
bucket_scalability/nbuckets/16 time: [30.961 ns 30.965 ns 30.969 ns]
|
||||
bucket_scalability/nbuckets/32 time: [35.691 ns 35.707 ns 35.722 ns]
|
||||
bucket_scalability/nbuckets/64 time: [47.829 ns 47.898 ns 47.974 ns]
|
||||
bucket_scalability/nbuckets/128 time: [73.479 ns 73.512 ns 73.545 ns]
|
||||
bucket_scalability/nbuckets/256 time: [127.92 ns 127.94 ns 127.96 ns]
|
||||
|
||||
Results on an i3en.3xlarge instance
|
||||
|
||||
@@ -398,14 +344,6 @@ propagation_of_cached_label_value__naive/nthreads/8 time: [434.87 ns 456.4
|
||||
propagation_of_cached_label_value__long_lived_reference_per_thread/nthreads/1 time: [3.3767 ns 3.3974 ns 3.4220 ns]
|
||||
propagation_of_cached_label_value__long_lived_reference_per_thread/nthreads/4 time: [3.6105 ns 4.2355 ns 5.1463 ns]
|
||||
propagation_of_cached_label_value__long_lived_reference_per_thread/nthreads/8 time: [4.0889 ns 4.9714 ns 6.0779 ns]
|
||||
bucket_scalability/nbuckets/1 time: [4.8455 ns 4.8542 ns 4.8646 ns]
|
||||
bucket_scalability/nbuckets/4 time: [4.5663 ns 4.5722 ns 4.5787 ns]
|
||||
bucket_scalability/nbuckets/8 time: [4.5531 ns 4.5670 ns 4.5842 ns]
|
||||
bucket_scalability/nbuckets/16 time: [4.6392 ns 4.6524 ns 4.6685 ns]
|
||||
bucket_scalability/nbuckets/32 time: [6.0302 ns 6.0439 ns 6.0589 ns]
|
||||
bucket_scalability/nbuckets/64 time: [10.608 ns 10.644 ns 10.691 ns]
|
||||
bucket_scalability/nbuckets/128 time: [22.178 ns 22.316 ns 22.483 ns]
|
||||
bucket_scalability/nbuckets/256 time: [42.190 ns 42.328 ns 42.492 ns]
|
||||
|
||||
Results on a Hetzner AX102 AMD Ryzen 9 7950X3D 16-Core Processor
|
||||
|
||||
@@ -424,13 +362,5 @@ propagation_of_cached_label_value__naive/nthreads/8 time: [164.24 ns 170.1
|
||||
propagation_of_cached_label_value__long_lived_reference_per_thread/nthreads/1 time: [2.2915 ns 2.2960 ns 2.3012 ns]
|
||||
propagation_of_cached_label_value__long_lived_reference_per_thread/nthreads/4 time: [2.5726 ns 2.6158 ns 2.6624 ns]
|
||||
propagation_of_cached_label_value__long_lived_reference_per_thread/nthreads/8 time: [2.7068 ns 2.8243 ns 2.9824 ns]
|
||||
bucket_scalability/nbuckets/1 time: [6.3998 ns 6.4288 ns 6.4684 ns]
|
||||
bucket_scalability/nbuckets/4 time: [6.3603 ns 6.3620 ns 6.3637 ns]
|
||||
bucket_scalability/nbuckets/8 time: [6.1646 ns 6.1654 ns 6.1667 ns]
|
||||
bucket_scalability/nbuckets/16 time: [6.1341 ns 6.1391 ns 6.1454 ns]
|
||||
bucket_scalability/nbuckets/32 time: [8.2206 ns 8.2254 ns 8.2301 ns]
|
||||
bucket_scalability/nbuckets/64 time: [13.988 ns 13.994 ns 14.000 ns]
|
||||
bucket_scalability/nbuckets/128 time: [28.180 ns 28.216 ns 28.251 ns]
|
||||
bucket_scalability/nbuckets/256 time: [54.914 ns 54.931 ns 54.951 ns]
|
||||
|
||||
*/
|
||||
|
||||
@@ -584,6 +584,7 @@ impl TryFrom<GetSlruSegmentResponse> for proto::GetSlruSegmentResponse {
|
||||
type Error = ProtocolError;
|
||||
|
||||
fn try_from(segment: GetSlruSegmentResponse) -> Result<Self, Self::Error> {
|
||||
// TODO: can a segment legitimately be empty?
|
||||
if segment.is_empty() {
|
||||
return Err(ProtocolError::Missing("segment"));
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
camino.workspace = true
|
||||
clap.workspace = true
|
||||
futures.workspace = true
|
||||
@@ -15,14 +16,17 @@ hdrhistogram.workspace = true
|
||||
humantime.workspace = true
|
||||
humantime-serde.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace=true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tracing.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
tokio-util.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
pageserver_client.workspace = true
|
||||
pageserver_api.workspace = true
|
||||
pageserver_page_api.workspace = true
|
||||
utils = { path = "../../libs/utils/" }
|
||||
workspace_hack = { version = "0.1", path = "../../workspace_hack" }
|
||||
|
||||
@@ -7,11 +7,15 @@ use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
use camino::Utf8PathBuf;
|
||||
use pageserver_api::key::Key;
|
||||
use pageserver_api::keyspace::KeySpaceAccum;
|
||||
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
|
||||
use pageserver_api::models::{
|
||||
PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamRequest,
|
||||
};
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_page_api::proto;
|
||||
use rand::prelude::*;
|
||||
use tokio::task::JoinSet;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -22,6 +26,12 @@ use utils::lsn::Lsn;
|
||||
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
|
||||
use crate::util::{request_stats, tokio_thread_local_stats};
|
||||
|
||||
#[derive(clap::ValueEnum, Clone, Debug)]
|
||||
enum Protocol {
|
||||
Libpq,
|
||||
Grpc,
|
||||
}
|
||||
|
||||
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
|
||||
#[derive(clap::Parser)]
|
||||
pub(crate) struct Args {
|
||||
@@ -35,6 +45,8 @@ pub(crate) struct Args {
|
||||
num_clients: NonZeroUsize,
|
||||
#[clap(long)]
|
||||
runtime: Option<humantime::Duration>,
|
||||
#[clap(long, value_enum, default_value = "libpq")]
|
||||
protocol: Protocol,
|
||||
/// Each client sends requests at the given rate.
|
||||
///
|
||||
/// If a request takes too long and we should be issuing a new request already,
|
||||
@@ -303,7 +315,20 @@ async fn main_impl(
|
||||
.unwrap();
|
||||
|
||||
Box::pin(async move {
|
||||
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
|
||||
let client: Box<dyn Client> = match args.protocol {
|
||||
Protocol::Libpq => Box::new(
|
||||
LibpqClient::new(args.page_service_connstring.clone(), worker_id.timeline)
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
|
||||
Protocol::Grpc => Box::new(
|
||||
GrpcClient::new(args.page_service_connstring.clone(), worker_id.timeline)
|
||||
.await
|
||||
.unwrap(),
|
||||
),
|
||||
};
|
||||
run_worker(args, client, ss, cancel, rps_period, ranges, weights).await
|
||||
})
|
||||
};
|
||||
|
||||
@@ -355,23 +380,15 @@ async fn main_impl(
|
||||
anyhow::Ok(())
|
||||
}
|
||||
|
||||
async fn client_libpq(
|
||||
async fn run_worker(
|
||||
args: &Args,
|
||||
worker_id: WorkerId,
|
||||
mut client: Box<dyn Client>,
|
||||
shared_state: Arc<SharedState>,
|
||||
cancel: CancellationToken,
|
||||
rps_period: Option<Duration>,
|
||||
ranges: Vec<KeyRange>,
|
||||
weights: rand::distributions::weighted::WeightedIndex<i128>,
|
||||
) {
|
||||
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let mut client = client
|
||||
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
shared_state.start_work_barrier.wait().await;
|
||||
let client_start = Instant::now();
|
||||
let mut ticks_processed = 0;
|
||||
@@ -415,12 +432,12 @@ async fn client_libpq(
|
||||
blkno: block_no,
|
||||
}
|
||||
};
|
||||
client.getpage_send(req).await.unwrap();
|
||||
client.send_get_page(req).await.unwrap();
|
||||
inflight.push_back(start);
|
||||
}
|
||||
|
||||
let start = inflight.pop_front().unwrap();
|
||||
client.getpage_recv().await.unwrap();
|
||||
client.recv_get_page().await.unwrap();
|
||||
let end = Instant::now();
|
||||
shared_state.live_stats.request_done();
|
||||
ticks_processed += 1;
|
||||
@@ -442,3 +459,101 @@ async fn client_libpq(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A benchmark client, to allow switching out the transport protocol.
|
||||
///
|
||||
/// For simplicity, this just uses separate asynchronous send/recv methods. The send method could
|
||||
/// return a future that resolves when the response is received, but we don't really need it.
|
||||
#[async_trait]
|
||||
trait Client: Send {
|
||||
/// Sends an asynchronous GetPage request to the pageserver.
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()>;
|
||||
|
||||
/// Receives the next GetPage response from the pageserver.
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse>;
|
||||
}
|
||||
|
||||
/// A libpq-based Pageserver client.
|
||||
struct LibpqClient {
|
||||
inner: pageserver_client::page_service::PagestreamClient,
|
||||
}
|
||||
|
||||
impl LibpqClient {
|
||||
async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result<Self> {
|
||||
let inner = pageserver_client::page_service::Client::new(connstring)
|
||||
.await?
|
||||
.pagestream(ttid.tenant_id, ttid.timeline_id)
|
||||
.await?;
|
||||
Ok(Self { inner })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Client for LibpqClient {
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
|
||||
self.inner.getpage_send(req).await
|
||||
}
|
||||
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
self.inner.getpage_recv().await
|
||||
}
|
||||
}
|
||||
|
||||
/// A gRPC client using the raw, no-frills gRPC client.
|
||||
struct GrpcClient {
|
||||
req_tx: tokio::sync::mpsc::Sender<proto::GetPageRequest>,
|
||||
resp_rx: tonic::Streaming<proto::GetPageResponse>,
|
||||
}
|
||||
|
||||
impl GrpcClient {
|
||||
async fn new(connstring: String, ttid: TenantTimelineId) -> anyhow::Result<Self> {
|
||||
let mut client = pageserver_page_api::proto::PageServiceClient::connect(connstring).await?;
|
||||
|
||||
let (req_tx, req_rx) = tokio::sync::mpsc::channel(1);
|
||||
let req_stream = tokio_stream::wrappers::ReceiverStream::new(req_rx);
|
||||
let mut req = tonic::Request::new(req_stream);
|
||||
let metadata = req.metadata_mut();
|
||||
metadata.insert("neon-tenant-id", ttid.tenant_id.to_string().try_into()?);
|
||||
metadata.insert("neon-timeline-id", ttid.timeline_id.to_string().try_into()?);
|
||||
metadata.insert("neon-shard-id", "0000".try_into()?);
|
||||
|
||||
let resp = client.get_pages(req).await?;
|
||||
let resp_stream = resp.into_inner();
|
||||
|
||||
Ok(Self {
|
||||
req_tx,
|
||||
resp_rx: resp_stream,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Client for GrpcClient {
|
||||
async fn send_get_page(&mut self, req: PagestreamGetPageRequest) -> anyhow::Result<()> {
|
||||
let req = proto::GetPageRequest {
|
||||
request_id: 0,
|
||||
request_class: proto::GetPageClass::Normal as i32,
|
||||
read_lsn: Some(proto::ReadLsn {
|
||||
request_lsn: req.hdr.request_lsn.0,
|
||||
not_modified_since_lsn: req.hdr.not_modified_since.0,
|
||||
}),
|
||||
rel: Some(req.rel.into()),
|
||||
block_number: vec![req.blkno],
|
||||
};
|
||||
self.req_tx.send(req).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn recv_get_page(&mut self) -> anyhow::Result<PagestreamGetPageResponse> {
|
||||
let resp = self.resp_rx.message().await?.unwrap();
|
||||
anyhow::ensure!(
|
||||
resp.status_code == proto::GetPageStatusCode::Ok as i32,
|
||||
"unexpected status code: {}",
|
||||
resp.status_code
|
||||
);
|
||||
Ok(PagestreamGetPageResponse {
|
||||
page: resp.page_image[0].clone(),
|
||||
req: PagestreamGetPageRequest::default(), // dummy
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -804,7 +804,7 @@ fn start_pageserver(
|
||||
} else {
|
||||
None
|
||||
},
|
||||
basebackup_cache.clone(),
|
||||
basebackup_cache,
|
||||
);
|
||||
|
||||
// Spawn a Pageserver gRPC server task. It will spawn separate tasks for
|
||||
@@ -816,12 +816,10 @@ fn start_pageserver(
|
||||
let mut page_service_grpc = None;
|
||||
if let Some(grpc_listener) = grpc_listener {
|
||||
page_service_grpc = Some(page_service::spawn_grpc(
|
||||
conf,
|
||||
tenant_manager.clone(),
|
||||
grpc_auth,
|
||||
otel_guard.as_ref().map(|g| g.dispatch.clone()),
|
||||
grpc_listener,
|
||||
basebackup_cache,
|
||||
)?);
|
||||
}
|
||||
|
||||
|
||||
@@ -837,30 +837,7 @@ async fn collect_eviction_candidates(
|
||||
continue;
|
||||
}
|
||||
let info = tl.get_local_layers_for_disk_usage_eviction().await;
|
||||
debug!(
|
||||
tenant_id=%tl.tenant_shard_id.tenant_id,
|
||||
shard_id=%tl.tenant_shard_id.shard_slug(),
|
||||
timeline_id=%tl.timeline_id,
|
||||
"timeline resident layers count: {}", info.resident_layers.len()
|
||||
);
|
||||
|
||||
tenant_candidates.extend(info.resident_layers.into_iter());
|
||||
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));
|
||||
|
||||
if cancel.is_cancelled() {
|
||||
return Ok(EvictionCandidates::Cancelled);
|
||||
}
|
||||
}
|
||||
|
||||
// Also consider layers of timelines being imported for eviction
|
||||
for tl in tenant.list_importing_timelines() {
|
||||
let info = tl.timeline.get_local_layers_for_disk_usage_eviction().await;
|
||||
debug!(
|
||||
tenant_id=%tl.timeline.tenant_shard_id.tenant_id,
|
||||
shard_id=%tl.timeline.tenant_shard_id.shard_slug(),
|
||||
timeline_id=%tl.timeline.timeline_id,
|
||||
"timeline resident layers count: {}", info.resident_layers.len()
|
||||
);
|
||||
debug!(tenant_id=%tl.tenant_shard_id.tenant_id, shard_id=%tl.tenant_shard_id.shard_slug(), timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
|
||||
|
||||
tenant_candidates.extend(info.resident_layers.into_iter());
|
||||
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));
|
||||
|
||||
@@ -91,14 +91,4 @@ impl FeatureResolver {
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_feature_flag_boolean(&self, flag_key: &str) -> Result<bool, PostHogEvaluationError> {
|
||||
if let Some(inner) = &self.inner {
|
||||
inner.feature_store().is_feature_flag_boolean(flag_key)
|
||||
} else {
|
||||
Err(PostHogEvaluationError::NotAvailable(
|
||||
"PostHog integration is not enabled".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3663,46 +3663,6 @@ async fn read_tar_eof(mut reader: (impl tokio::io::AsyncRead + Unpin)) -> anyhow
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn tenant_evaluate_feature_flag(
|
||||
request: Request<Body>,
|
||||
_cancel: CancellationToken,
|
||||
) -> Result<Response<Body>, ApiError> {
|
||||
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
|
||||
check_permission(&request, Some(tenant_shard_id.tenant_id))?;
|
||||
|
||||
let flag: String = must_parse_query_param(&request, "flag")?;
|
||||
let as_type: String = must_parse_query_param(&request, "as")?;
|
||||
|
||||
let state = get_state(&request);
|
||||
|
||||
async {
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
if as_type == "boolean" {
|
||||
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
|
||||
let result = result.map(|_| true).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, result)
|
||||
} else if as_type == "multivariate" {
|
||||
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, result)
|
||||
} else {
|
||||
// Auto infer the type of the feature flag.
|
||||
let is_boolean = tenant.feature_resolver.is_feature_flag_boolean(&flag).map_err(|e| ApiError::InternalServerError(anyhow::anyhow!("{e}")))?;
|
||||
if is_boolean {
|
||||
let result = tenant.feature_resolver.evaluate_boolean(&flag, tenant_shard_id.tenant_id);
|
||||
let result = result.map(|_| true).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, result)
|
||||
} else {
|
||||
let result = tenant.feature_resolver.evaluate_multivariate(&flag, tenant_shard_id.tenant_id).map_err(|e| e.to_string());
|
||||
json_response(StatusCode::OK, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
.instrument(info_span!("tenant_evaluate_feature_flag", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug()))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Common functionality of all the HTTP API handlers.
|
||||
///
|
||||
/// - Adds a tracing span to each request (by `request_span`)
|
||||
@@ -4079,8 +4039,5 @@ pub fn make_router(
|
||||
"/v1/tenant/:tenant_shard_id/timeline/:timeline_id/activate_post_import",
|
||||
|r| api_handler(r, activate_post_import_handler),
|
||||
)
|
||||
.get("/v1/tenant/:tenant_shard_id/feature_flag", |r| {
|
||||
api_handler(r, tenant_evaluate_feature_flag)
|
||||
})
|
||||
.any(handler_404))
|
||||
}
|
||||
|
||||
@@ -1312,44 +1312,11 @@ impl EvictionsWithLowResidenceDuration {
|
||||
//
|
||||
// Roughly logarithmic scale.
|
||||
const STORAGE_IO_TIME_BUCKETS: &[f64] = &[
|
||||
0.00005, // 50us
|
||||
0.00006, // 60us
|
||||
0.00007, // 70us
|
||||
0.00008, // 80us
|
||||
0.00009, // 90us
|
||||
0.0001, // 100us
|
||||
0.000110, // 110us
|
||||
0.000120, // 120us
|
||||
0.000130, // 130us
|
||||
0.000140, // 140us
|
||||
0.000150, // 150us
|
||||
0.000160, // 160us
|
||||
0.000170, // 170us
|
||||
0.000180, // 180us
|
||||
0.000190, // 190us
|
||||
0.000200, // 200us
|
||||
0.000210, // 210us
|
||||
0.000220, // 220us
|
||||
0.000230, // 230us
|
||||
0.000240, // 240us
|
||||
0.000250, // 250us
|
||||
0.000300, // 300us
|
||||
0.000350, // 350us
|
||||
0.000400, // 400us
|
||||
0.000450, // 450us
|
||||
0.000500, // 500us
|
||||
0.000600, // 600us
|
||||
0.000700, // 700us
|
||||
0.000800, // 800us
|
||||
0.000900, // 900us
|
||||
0.001000, // 1ms
|
||||
0.002000, // 2ms
|
||||
0.003000, // 3ms
|
||||
0.004000, // 4ms
|
||||
0.005000, // 5ms
|
||||
0.01000, // 10ms
|
||||
0.02000, // 20ms
|
||||
0.05000, // 50ms
|
||||
0.000030, // 30 usec
|
||||
0.001000, // 1000 usec
|
||||
0.030, // 30 ms
|
||||
1.000, // 1000 ms
|
||||
30.000, // 30000 ms
|
||||
];
|
||||
|
||||
/// VirtualFile fs operation variants.
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
//! The Page Service listens for client connections and serves their GetPage@LSN
|
||||
//! requests.
|
||||
|
||||
use std::any::Any;
|
||||
use std::borrow::Cow;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::os::fd::AsRawFd;
|
||||
@@ -31,6 +32,7 @@ use pageserver_api::models::{
|
||||
};
|
||||
use pageserver_api::reltag::SlruKind;
|
||||
use pageserver_api::shard::TenantShardId;
|
||||
use pageserver_page_api as page_api;
|
||||
use pageserver_page_api::proto;
|
||||
use postgres_backend::{
|
||||
AuthType, PostgresBackend, PostgresBackendReader, QueryError, is_expected_io_error,
|
||||
@@ -39,6 +41,7 @@ use postgres_ffi::BLCKSZ;
|
||||
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
|
||||
use pq_proto::framed::ConnectionError;
|
||||
use pq_proto::{BeMessage, FeMessage, FeStartupPacket, RowDescriptor};
|
||||
use smallvec::{SmallVec, smallvec};
|
||||
use strum_macros::IntoStaticStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufWriter};
|
||||
use tokio::task::JoinHandle;
|
||||
@@ -76,6 +79,7 @@ use crate::tenant::mgr::{
|
||||
GetActiveTenantError, GetTenantError, ShardResolveResult, ShardSelector, TenantManager,
|
||||
};
|
||||
use crate::tenant::storage_layer::IoConcurrency;
|
||||
use crate::tenant::timeline::handle::{HandleUpgradeError, WeakHandle};
|
||||
use crate::tenant::timeline::{self, WaitLsnError};
|
||||
use crate::tenant::{GetTimelineError, PageReconstructError, Timeline};
|
||||
use crate::{CancellableTask, PERF_TRACE_TARGET, timed_after_cancellation};
|
||||
@@ -165,15 +169,14 @@ pub fn spawn(
|
||||
|
||||
/// Spawns a gRPC server for the page service.
|
||||
///
|
||||
/// TODO: move this onto GrpcPageServiceHandler::spawn().
|
||||
/// TODO: this doesn't support TLS. We need TLS reloading via ReloadingCertificateResolver, so we
|
||||
/// need to reimplement the TCP+TLS accept loop ourselves.
|
||||
pub fn spawn_grpc(
|
||||
conf: &'static PageServerConf,
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
auth: Option<Arc<SwappableJwtAuth>>,
|
||||
perf_trace_dispatch: Option<Dispatch>,
|
||||
listener: std::net::TcpListener,
|
||||
basebackup_cache: Arc<BasebackupCache>,
|
||||
) -> anyhow::Result<CancellableTask> {
|
||||
let cancel = CancellationToken::new();
|
||||
let ctx = RequestContextBuilder::new(TaskKind::PageRequestHandler)
|
||||
@@ -202,21 +205,16 @@ pub fn spawn_grpc(
|
||||
.max_concurrent_streams(Some(GRPC_MAX_CONCURRENT_STREAMS));
|
||||
|
||||
// Main page service.
|
||||
let page_service_handler = PageServerHandler::new(
|
||||
let page_service_handler = GrpcPageServiceHandler {
|
||||
tenant_manager,
|
||||
auth.clone(),
|
||||
PageServicePipeliningConfig::Serial, // TODO: unused with gRPC
|
||||
conf.get_vectored_concurrent_io,
|
||||
ConnectionPerfSpanFields::default(),
|
||||
basebackup_cache,
|
||||
ctx,
|
||||
cancel.clone(),
|
||||
gate.enter().expect("just created"),
|
||||
);
|
||||
};
|
||||
|
||||
let mut received_at_interceptor = ReceivedAtInterceptor;
|
||||
let mut tenant_interceptor = TenantMetadataInterceptor;
|
||||
let mut auth_interceptor = TenantAuthInterceptor::new(auth);
|
||||
let interceptors = move |mut req: tonic::Request<()>| {
|
||||
req = received_at_interceptor.call(req)?;
|
||||
req = tenant_interceptor.call(req)?;
|
||||
req = auth_interceptor.call(req)?;
|
||||
Ok(req)
|
||||
@@ -709,6 +707,89 @@ enum PageStreamError {
|
||||
BadRequest(Cow<'static, str>),
|
||||
}
|
||||
|
||||
impl PageStreamError {
|
||||
/// Converts a PageStreamError into a proto::GetPageResponse with the appropriate status
|
||||
/// code, or a gRPC status if it should terminate the stream (e.g. shutdown). This is a
|
||||
/// convenience method for use from a get_pages gRPC stream.
|
||||
#[allow(clippy::result_large_err)]
|
||||
fn into_get_page_response(
|
||||
self,
|
||||
request_id: page_api::RequestID,
|
||||
) -> Result<proto::GetPageResponse, tonic::Status> {
|
||||
use page_api::GetPageStatusCode;
|
||||
use tonic::Code;
|
||||
|
||||
// We dispatch to Into<tonic::Status> first, and then map it to a GetPageResponse.
|
||||
let status: tonic::Status = self.into();
|
||||
let status_code = match status.code() {
|
||||
// We shouldn't see an OK status here, because we're emitting an error.
|
||||
Code::Ok => {
|
||||
debug_assert_ne!(status.code(), Code::Ok);
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"unexpected OK status: {status:?}",
|
||||
)));
|
||||
}
|
||||
|
||||
// These are per-request errors, returned as GetPageResponses.
|
||||
Code::AlreadyExists => GetPageStatusCode::InvalidRequest,
|
||||
Code::DataLoss => GetPageStatusCode::InternalError,
|
||||
Code::FailedPrecondition => GetPageStatusCode::InvalidRequest,
|
||||
Code::InvalidArgument => GetPageStatusCode::InvalidRequest,
|
||||
Code::Internal => GetPageStatusCode::InternalError,
|
||||
Code::NotFound => GetPageStatusCode::NotFound,
|
||||
Code::OutOfRange => GetPageStatusCode::InvalidRequest,
|
||||
Code::ResourceExhausted => GetPageStatusCode::SlowDown,
|
||||
|
||||
// These should terminate the stream.
|
||||
Code::Aborted => return Err(status),
|
||||
Code::Cancelled => return Err(status),
|
||||
Code::DeadlineExceeded => return Err(status),
|
||||
Code::PermissionDenied => return Err(status),
|
||||
Code::Unauthenticated => return Err(status),
|
||||
Code::Unavailable => return Err(status),
|
||||
Code::Unimplemented => return Err(status),
|
||||
Code::Unknown => return Err(status),
|
||||
};
|
||||
|
||||
Ok(page_api::GetPageResponse {
|
||||
request_id,
|
||||
status_code,
|
||||
reason: Some(status.message().to_string()),
|
||||
page_images: SmallVec::new(),
|
||||
}
|
||||
.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageStreamError> for tonic::Status {
|
||||
fn from(err: PageStreamError) -> Self {
|
||||
use tonic::Code;
|
||||
let code = match &err {
|
||||
PageStreamError::Reconnect(_) => Code::Unavailable,
|
||||
PageStreamError::Shutdown => Code::Unavailable,
|
||||
PageStreamError::Read(err) => match err {
|
||||
PageReconstructError::Cancelled => Code::Unavailable,
|
||||
PageReconstructError::MissingKey(_) => Code::NotFound,
|
||||
PageReconstructError::AncestorLsnTimeout(err) => match err {
|
||||
WaitLsnError::Timeout(_) => Code::Internal,
|
||||
WaitLsnError::BadState(_) => Code::Internal,
|
||||
WaitLsnError::Shutdown => Code::Unavailable,
|
||||
},
|
||||
PageReconstructError::Other(_) => Code::Internal,
|
||||
PageReconstructError::WalRedo(_) => Code::Internal,
|
||||
},
|
||||
PageStreamError::LsnTimeout(err) => match err {
|
||||
WaitLsnError::Timeout(_) => Code::Internal,
|
||||
WaitLsnError::BadState(_) => Code::Internal,
|
||||
WaitLsnError::Shutdown => Code::Unavailable,
|
||||
},
|
||||
PageStreamError::NotFound(_) => Code::NotFound,
|
||||
PageStreamError::BadRequest(_) => Code::InvalidArgument,
|
||||
};
|
||||
tonic::Status::new(code, format!("{err}"))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for PageStreamError {
|
||||
fn from(value: PageReconstructError) -> Self {
|
||||
match value {
|
||||
@@ -789,37 +870,37 @@ enum BatchedFeMessage {
|
||||
Exists {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamExistsRequest,
|
||||
},
|
||||
Nblocks {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamNblocksRequest,
|
||||
},
|
||||
GetPage {
|
||||
span: Span,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
pages: SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
batch_break_reason: GetPageBatchBreakReason,
|
||||
},
|
||||
DbSize {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamDbSizeRequest,
|
||||
},
|
||||
GetSlruSegment {
|
||||
span: Span,
|
||||
timer: SmgrOpTimer,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
req: models::PagestreamGetSlruSegmentRequest,
|
||||
},
|
||||
#[cfg(feature = "testing")]
|
||||
Test {
|
||||
span: Span,
|
||||
shard: timeline::handle::WeakHandle<TenantManagerTypes>,
|
||||
shard: WeakHandle<TenantManagerTypes>,
|
||||
requests: Vec<BatchedTestRequest>,
|
||||
},
|
||||
RespondError {
|
||||
@@ -1068,26 +1149,6 @@ impl PageServerHandler {
|
||||
let neon_fe_msg =
|
||||
PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?;
|
||||
|
||||
// TODO: turn in to async closure once available to avoid repeating received_at
|
||||
async fn record_op_start_and_throttle(
|
||||
shard: &timeline::handle::Handle<TenantManagerTypes>,
|
||||
op: metrics::SmgrQueryType,
|
||||
received_at: Instant,
|
||||
) -> Result<SmgrOpTimer, QueryError> {
|
||||
// It's important to start the smgr op metric recorder as early as possible
|
||||
// so that the _started counters are incremented before we do
|
||||
// any serious waiting, e.g., for throttle, batching, or actual request handling.
|
||||
let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
|
||||
let now = Instant::now();
|
||||
timer.observe_throttle_start(now);
|
||||
let throttled = tokio::select! {
|
||||
res = shard.pagestream_throttle.throttle(1, now) => res,
|
||||
_ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
|
||||
};
|
||||
timer.observe_throttle_done(throttled);
|
||||
Ok(timer)
|
||||
}
|
||||
|
||||
let batched_msg = match neon_fe_msg {
|
||||
PagestreamFeMessage::Exists(req) => {
|
||||
let shard = timeline_handles
|
||||
@@ -1095,7 +1156,7 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_get_rel_exists_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetRelExists,
|
||||
received_at,
|
||||
@@ -1113,7 +1174,7 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_get_nblocks_request", rel = %req.rel, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetRelSize,
|
||||
received_at,
|
||||
@@ -1131,7 +1192,7 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_db_size_request", dbnode = %req.dbnode, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetDbSize,
|
||||
received_at,
|
||||
@@ -1149,7 +1210,7 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_get_slru_segment_request", kind = %req.kind, segno = %req.segno, req_lsn = %req.hdr.request_lsn, shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer = record_op_start_and_throttle(
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetSlruSegment,
|
||||
received_at,
|
||||
@@ -1274,7 +1335,7 @@ impl PageServerHandler {
|
||||
// request handler log messages contain the request-specific fields.
|
||||
let span = mkspan!(shard.tenant_shard_id.shard_slug());
|
||||
|
||||
let timer = record_op_start_and_throttle(
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
received_at,
|
||||
@@ -1321,7 +1382,7 @@ impl PageServerHandler {
|
||||
BatchedFeMessage::GetPage {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
pages: smallvec::smallvec![BatchedGetPageRequest {
|
||||
pages: smallvec![BatchedGetPageRequest {
|
||||
req,
|
||||
timer,
|
||||
lsn_range: LsnRange {
|
||||
@@ -1343,9 +1404,12 @@ impl PageServerHandler {
|
||||
.get(tenant_id, timeline_id, ShardSelector::Zero)
|
||||
.await?;
|
||||
let span = tracing::info_span!(parent: &parent_span, "handle_test_request", shard_id = %shard.tenant_shard_id.shard_slug());
|
||||
let timer =
|
||||
record_op_start_and_throttle(&shard, metrics::SmgrQueryType::Test, received_at)
|
||||
.await?;
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&shard,
|
||||
metrics::SmgrQueryType::Test,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
BatchedFeMessage::Test {
|
||||
span,
|
||||
shard: shard.downgrade(),
|
||||
@@ -1356,6 +1420,26 @@ impl PageServerHandler {
|
||||
Ok(Some(batched_msg))
|
||||
}
|
||||
|
||||
/// Starts a SmgrOpTimer at received_at and throttles the request.
|
||||
async fn record_op_start_and_throttle(
|
||||
shard: &timeline::handle::Handle<TenantManagerTypes>,
|
||||
op: metrics::SmgrQueryType,
|
||||
received_at: Instant,
|
||||
) -> Result<SmgrOpTimer, QueryError> {
|
||||
// It's important to start the smgr op metric recorder as early as possible
|
||||
// so that the _started counters are incremented before we do
|
||||
// any serious waiting, e.g., for throttle, batching, or actual request handling.
|
||||
let mut timer = shard.query_metrics.start_smgr_op(op, received_at);
|
||||
let now = Instant::now();
|
||||
timer.observe_throttle_start(now);
|
||||
let throttled = tokio::select! {
|
||||
res = shard.pagestream_throttle.throttle(1, now) => res,
|
||||
_ = shard.cancel.cancelled() => return Err(QueryError::Shutdown),
|
||||
};
|
||||
timer.observe_throttle_done(throttled);
|
||||
Ok(timer)
|
||||
}
|
||||
|
||||
/// Post-condition: `batch` is Some()
|
||||
#[instrument(skip_all, level = tracing::Level::TRACE)]
|
||||
#[allow(clippy::boxed_local)]
|
||||
@@ -1453,8 +1537,11 @@ impl PageServerHandler {
|
||||
let (mut handler_results, span) = {
|
||||
// TODO: we unfortunately have to pin the future on the heap, since GetPage futures are huge and
|
||||
// won't fit on the stack.
|
||||
let mut boxpinned =
|
||||
Box::pin(self.pagestream_dispatch_batched_message(batch, io_concurrency, ctx));
|
||||
let mut boxpinned = Box::pin(Self::pagestream_dispatch_batched_message(
|
||||
batch,
|
||||
io_concurrency,
|
||||
ctx,
|
||||
));
|
||||
log_slow(
|
||||
log_slow_name,
|
||||
LOG_SLOW_GETPAGE_THRESHOLD,
|
||||
@@ -1610,7 +1697,6 @@ impl PageServerHandler {
|
||||
/// Helper which dispatches a batched message to the appropriate handler.
|
||||
/// Returns a vec of results, along with the extracted trace span.
|
||||
async fn pagestream_dispatch_batched_message(
|
||||
&mut self,
|
||||
batch: BatchedFeMessage,
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
@@ -1640,10 +1726,10 @@ impl PageServerHandler {
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![
|
||||
self.handle_get_rel_exists_request(&shard, &req, &ctx)
|
||||
Self::handle_get_rel_exists_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map(|msg| (PagestreamBeMessage::Exists(msg), timer, ctx))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1659,10 +1745,10 @@ impl PageServerHandler {
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![
|
||||
self.handle_get_nblocks_request(&shard, &req, &ctx)
|
||||
Self::handle_get_nblocks_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map(|msg| (PagestreamBeMessage::Nblocks(msg), timer, ctx))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1680,16 +1766,15 @@ impl PageServerHandler {
|
||||
{
|
||||
let npages = pages.len();
|
||||
trace!(npages, "handling getpage request");
|
||||
let res = self
|
||||
.handle_get_page_at_lsn_request_batched(
|
||||
&shard,
|
||||
pages,
|
||||
io_concurrency,
|
||||
batch_break_reason,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(span.clone())
|
||||
.await;
|
||||
let res = Self::handle_get_page_at_lsn_request_batched(
|
||||
&shard,
|
||||
pages,
|
||||
io_concurrency,
|
||||
batch_break_reason,
|
||||
&ctx,
|
||||
)
|
||||
.instrument(span.clone())
|
||||
.await;
|
||||
assert_eq!(res.len(), npages);
|
||||
res
|
||||
},
|
||||
@@ -1706,10 +1791,10 @@ impl PageServerHandler {
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![
|
||||
self.handle_db_size_request(&shard, &req, &ctx)
|
||||
Self::handle_db_size_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map(|msg| (PagestreamBeMessage::DbSize(msg), timer, ctx))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1725,10 +1810,10 @@ impl PageServerHandler {
|
||||
let (shard, ctx) = upgrade_handle_and_set_context!(shard);
|
||||
(
|
||||
vec![
|
||||
self.handle_get_slru_segment_request(&shard, &req, &ctx)
|
||||
Self::handle_get_slru_segment_request(&shard, &req, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await
|
||||
.map(|msg| (msg, timer, ctx))
|
||||
.map(|msg| (PagestreamBeMessage::GetSlruSegment(msg), timer, ctx))
|
||||
.map_err(|err| BatchedPageStreamError { err, req: req.hdr }),
|
||||
],
|
||||
span,
|
||||
@@ -1746,8 +1831,7 @@ impl PageServerHandler {
|
||||
{
|
||||
let npages = requests.len();
|
||||
trace!(npages, "handling getpage request");
|
||||
let res = self
|
||||
.handle_test_request_batch(&shard, requests, &ctx)
|
||||
let res = Self::handle_test_request_batch(&shard, requests, &ctx)
|
||||
.instrument(span.clone())
|
||||
.await;
|
||||
assert_eq!(res.len(), npages);
|
||||
@@ -2301,11 +2385,10 @@ impl PageServerHandler {
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_rel_exists_request(
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamExistsRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
) -> Result<PagestreamExistsResponse, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
@@ -2327,19 +2410,15 @@ impl PageServerHandler {
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse {
|
||||
req: *req,
|
||||
exists,
|
||||
}))
|
||||
Ok(PagestreamExistsResponse { req: *req, exists })
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_nblocks_request(
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamNblocksRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
) -> Result<PagestreamNblocksResponse, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
@@ -2361,19 +2440,18 @@ impl PageServerHandler {
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse {
|
||||
Ok(PagestreamNblocksResponse {
|
||||
req: *req,
|
||||
n_blocks,
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_db_size_request(
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamDbSizeRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
) -> Result<PagestreamDbSizeResponse, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
@@ -2397,23 +2475,19 @@ impl PageServerHandler {
|
||||
.await?;
|
||||
let db_size = total_blocks as i64 * BLCKSZ as i64;
|
||||
|
||||
Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse {
|
||||
req: *req,
|
||||
db_size,
|
||||
}))
|
||||
Ok(PagestreamDbSizeResponse { req: *req, db_size })
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn handle_get_page_at_lsn_request_batched(
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
requests: SmallVec<[BatchedGetPageRequest; 1]>,
|
||||
io_concurrency: IoConcurrency,
|
||||
batch_break_reason: GetPageBatchBreakReason,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<(PagestreamBeMessage, SmgrOpTimer, RequestContext), BatchedPageStreamError>>
|
||||
{
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
//debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
timeline
|
||||
.query_metrics
|
||||
@@ -2532,11 +2606,10 @@ impl PageServerHandler {
|
||||
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_get_slru_segment_request(
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
req: &PagestreamGetSlruSegmentRequest,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<PagestreamBeMessage, PageStreamError> {
|
||||
) -> Result<PagestreamGetSlruSegmentResponse, PageStreamError> {
|
||||
let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn();
|
||||
let lsn = Self::wait_or_get_last_lsn(
|
||||
timeline,
|
||||
@@ -2551,16 +2624,13 @@ impl PageServerHandler {
|
||||
.ok_or(PageStreamError::BadRequest("invalid SLRU kind".into()))?;
|
||||
let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?;
|
||||
|
||||
Ok(PagestreamBeMessage::GetSlruSegment(
|
||||
PagestreamGetSlruSegmentResponse { req: *req, segment },
|
||||
))
|
||||
Ok(PagestreamGetSlruSegmentResponse { req: *req, segment })
|
||||
}
|
||||
|
||||
// NB: this impl mimics what we do for batched getpage requests.
|
||||
#[cfg(feature = "testing")]
|
||||
#[instrument(skip_all, fields(shard_id))]
|
||||
async fn handle_test_request_batch(
|
||||
&mut self,
|
||||
timeline: &Timeline,
|
||||
requests: Vec<BatchedTestRequest>,
|
||||
_ctx: &RequestContext,
|
||||
@@ -3300,57 +3370,342 @@ where
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements the page service over gRPC.
|
||||
/// Serves the page service over gRPC. Dispatches to PageServerHandler for request processing.
|
||||
///
|
||||
/// TODO: not yet implemented, all methods return unimplemented.
|
||||
/// TODO: add trace spans, interceptors, and sampling.
|
||||
/// TODO: rename to PageServiceHandler when libpq impl is removed.
|
||||
pub struct GrpcPageServiceHandler {
|
||||
tenant_manager: Arc<TenantManager>,
|
||||
ctx: RequestContext,
|
||||
}
|
||||
|
||||
impl GrpcPageServiceHandler {
|
||||
/// Errors if the request is executed on a non-zero shard. Only shard 0 has a complete view of
|
||||
/// relations and their sizes, as well as SLRU segments and other data.
|
||||
#[allow(clippy::result_large_err)]
|
||||
fn ensure_shard_zero(req: &tonic::Request<impl Any>) -> Result<(), tonic::Status> {
|
||||
match Self::extract::<ShardIndex>(req).shard_number.0 {
|
||||
0 => Ok(()),
|
||||
shard => Err(tonic::Status::invalid_argument(format!(
|
||||
"request must execute on shard zero (is shard {shard})",
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Extracts the given type from the request extensions. It must have been set by an
|
||||
/// interceptor.
|
||||
fn extract<T: Send + Sync + 'static>(req: &tonic::Request<impl Any>) -> &T {
|
||||
req.extensions()
|
||||
.get::<T>()
|
||||
.expect("extension should be set by interceptor")
|
||||
}
|
||||
|
||||
/// Generates a PagestreamRequest header from a ReadLsn and request ID.
|
||||
fn make_hdr(read_lsn: page_api::ReadLsn, req_id: u64) -> PagestreamRequest {
|
||||
PagestreamRequest {
|
||||
reqid: req_id,
|
||||
request_lsn: read_lsn.request_lsn,
|
||||
not_modified_since: read_lsn.not_modified_since_lsn.unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Acquires a timeline handle for the given request. The request must have been decorated by
|
||||
/// TenantMetadataInterceptor first.
|
||||
async fn get_request_timeline(
|
||||
&self,
|
||||
req: &tonic::Request<impl Any>,
|
||||
) -> Result<timeline::handle::Handle<TenantManagerTypes>, GetActiveTimelineError> {
|
||||
let ttid = *Self::extract::<TenantTimelineId>(req);
|
||||
let shard_index = *Self::extract::<ShardIndex>(req);
|
||||
let shard_selector = ShardSelector::Known(shard_index);
|
||||
|
||||
// TODO: untangle this from TenantManagerWrapper::resolve() and Cache::get(), to avoid the
|
||||
// unnecessary overhead.
|
||||
TimelineHandles::new(self.tenant_manager.clone())
|
||||
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Starts a SmgrOpTimer at received_at, throttles the request, and records execution start.
|
||||
/// Only errors if the timeline is shutting down.
|
||||
///
|
||||
/// TODO: revamp request timers -- in particular,
|
||||
/// TODO: consider moving throttling out and returning SlowDown errors.
|
||||
async fn record_op_start_and_throttle(
|
||||
timeline: &timeline::handle::Handle<TenantManagerTypes>,
|
||||
op: metrics::SmgrQueryType,
|
||||
received_at: Instant,
|
||||
) -> Result<SmgrOpTimer, tonic::Status> {
|
||||
let mut timer = PageServerHandler::record_op_start_and_throttle(timeline, op, received_at)
|
||||
.await
|
||||
.map_err(|err| match err {
|
||||
// record_op_start_and_throttle() only returns Shutdown.
|
||||
QueryError::Shutdown => tonic::Status::unavailable(format!("{err}")),
|
||||
err => tonic::Status::internal(format!("unexpected error: {err}")),
|
||||
})?;
|
||||
timer.observe_execution_start(Instant::now());
|
||||
Ok(timer)
|
||||
}
|
||||
|
||||
/// Processes a GetPage batch request, via the GetPages bidirectional streaming RPC.
|
||||
///
|
||||
/// NB: errors will terminate the stream. Per-request errors should return a GetPageResponse
|
||||
/// with an appropriate status code instead.
|
||||
async fn handle_get_page_request(
|
||||
ctx: &RequestContext,
|
||||
timeline: &WeakHandle<TenantManagerTypes>,
|
||||
req: proto::GetPageRequest,
|
||||
io_concurrency: IoConcurrency,
|
||||
) -> Result<proto::GetPageResponse, tonic::Status> {
|
||||
let received_at = Instant::now();
|
||||
let timeline = timeline.upgrade().map_err(|err| match err {
|
||||
HandleUpgradeError::ShutDown => tonic::Status::unavailable("timeline is shutting down"),
|
||||
})?;
|
||||
let ctx = ctx.with_scope_page_service_pagestream(&timeline);
|
||||
|
||||
// Validate the request and convert it to a Pagestream request.
|
||||
let req: page_api::GetPageRequest = req.try_into()?;
|
||||
|
||||
let effective_lsn = match PageServerHandler::effective_request_lsn(
|
||||
&timeline,
|
||||
timeline.get_last_record_lsn(),
|
||||
req.read_lsn.request_lsn,
|
||||
req.read_lsn.not_modified_since_lsn.unwrap_or_default(),
|
||||
&timeline.get_applied_gc_cutoff_lsn(),
|
||||
) {
|
||||
Ok(lsn) => lsn,
|
||||
Err(err) => return err.into_get_page_response(req.request_id),
|
||||
};
|
||||
|
||||
let mut batch = SmallVec::with_capacity(req.block_numbers.len());
|
||||
for blkno in req.block_numbers {
|
||||
// TODO: this creates one timer per page and throttles it. We should have a timer for
|
||||
// the entire batch, and throttle only the batch, but this is equivalent to what
|
||||
// PageServerHandler does already so we keep it for now.
|
||||
let timer = Self::record_op_start_and_throttle(
|
||||
&timeline,
|
||||
metrics::SmgrQueryType::GetPageAtLsn,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
|
||||
batch.push(BatchedGetPageRequest {
|
||||
req: PagestreamGetPageRequest {
|
||||
hdr: Self::make_hdr(req.read_lsn, req.request_id),
|
||||
rel: req.rel,
|
||||
blkno,
|
||||
},
|
||||
lsn_range: LsnRange {
|
||||
effective_lsn,
|
||||
request_lsn: req.read_lsn.request_lsn,
|
||||
},
|
||||
timer,
|
||||
ctx: ctx.attached_child(),
|
||||
batch_wait_ctx: None, // TODO: add tracing
|
||||
});
|
||||
}
|
||||
|
||||
let results = PageServerHandler::handle_get_page_at_lsn_request_batched(
|
||||
&timeline,
|
||||
batch,
|
||||
io_concurrency,
|
||||
GetPageBatchBreakReason::BatchFull, // TODO: not relevant for gRPC batches
|
||||
&ctx,
|
||||
)
|
||||
.await;
|
||||
|
||||
let mut resp = page_api::GetPageResponse {
|
||||
request_id: req.request_id,
|
||||
status_code: page_api::GetPageStatusCode::Ok,
|
||||
reason: None,
|
||||
page_images: SmallVec::with_capacity(results.len()),
|
||||
};
|
||||
|
||||
for result in results {
|
||||
match result {
|
||||
Ok((PagestreamBeMessage::GetPage(r), _, _)) => resp.page_images.push(r.page),
|
||||
Ok((resp, _, _)) => {
|
||||
return Err(tonic::Status::internal(format!(
|
||||
"unexpected response: {resp:?}"
|
||||
)));
|
||||
}
|
||||
Err(err) => return err.err.into_get_page_response(req.request_id),
|
||||
};
|
||||
}
|
||||
|
||||
Ok(resp.into())
|
||||
}
|
||||
}
|
||||
|
||||
/// Implements the gRPC page service.
|
||||
///
|
||||
/// TODO: when the libpq impl is removed, simplify this:
|
||||
/// * Add Tower middleware for timeline handle, rate limiting, and timing.
|
||||
/// * Remove the intermediate Pagestream types.
|
||||
/// * Inline the handler code.
|
||||
#[tonic::async_trait]
|
||||
impl proto::PageService for PageServerHandler {
|
||||
impl proto::PageService for GrpcPageServiceHandler {
|
||||
type GetBaseBackupStream = Pin<
|
||||
Box<dyn Stream<Item = Result<proto::GetBaseBackupResponseChunk, tonic::Status>> + Send>,
|
||||
>;
|
||||
|
||||
type GetPagesStream =
|
||||
Pin<Box<dyn Stream<Item = Result<proto::GetPageResponse, tonic::Status>> + Send>>;
|
||||
|
||||
async fn check_rel_exists(
|
||||
&self,
|
||||
_: tonic::Request<proto::CheckRelExistsRequest>,
|
||||
req: tonic::Request<proto::CheckRelExistsRequest>,
|
||||
) -> Result<tonic::Response<proto::CheckRelExistsResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
let received_at = Self::extract::<ReceivedAt>(&req).0;
|
||||
let timeline = self.get_request_timeline(&req).await?;
|
||||
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
|
||||
|
||||
// Validate the request and convert it to a Pagestream request.
|
||||
Self::ensure_shard_zero(&req)?;
|
||||
let req: page_api::CheckRelExistsRequest = req.into_inner().try_into()?;
|
||||
let req = PagestreamExistsRequest {
|
||||
hdr: Self::make_hdr(req.read_lsn, 0),
|
||||
rel: req.rel,
|
||||
};
|
||||
|
||||
// Execute the request and convert the response.
|
||||
let _timer = Self::record_op_start_and_throttle(
|
||||
&timeline,
|
||||
metrics::SmgrQueryType::GetRelExists,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp = PageServerHandler::handle_get_rel_exists_request(&timeline, &req, &ctx).await?;
|
||||
let resp: page_api::CheckRelExistsResponse = resp.exists;
|
||||
Ok(tonic::Response::new(resp.into()))
|
||||
}
|
||||
|
||||
async fn get_base_backup(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetBaseBackupRequest>,
|
||||
) -> Result<tonic::Response<Self::GetBaseBackupStream>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
Err(tonic::Status::unimplemented("not implemented")) // TODO
|
||||
}
|
||||
|
||||
async fn get_db_size(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetDbSizeRequest>,
|
||||
req: tonic::Request<proto::GetDbSizeRequest>,
|
||||
) -> Result<tonic::Response<proto::GetDbSizeResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
let received_at = Self::extract::<ReceivedAt>(&req).0;
|
||||
let timeline = self.get_request_timeline(&req).await?;
|
||||
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
|
||||
|
||||
// Validate the request and convert it to a Pagestream request.
|
||||
Self::ensure_shard_zero(&req)?;
|
||||
let req: page_api::GetDbSizeRequest = req.into_inner().try_into()?;
|
||||
let req = PagestreamDbSizeRequest {
|
||||
hdr: Self::make_hdr(req.read_lsn, 0),
|
||||
dbnode: req.db_oid,
|
||||
};
|
||||
|
||||
// Execute the request and convert the response.
|
||||
let _timer = Self::record_op_start_and_throttle(
|
||||
&timeline,
|
||||
metrics::SmgrQueryType::GetDbSize,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp = PageServerHandler::handle_db_size_request(&timeline, &req, &ctx).await?;
|
||||
let resp = resp.db_size as page_api::GetDbSizeResponse;
|
||||
Ok(tonic::Response::new(resp.into()))
|
||||
}
|
||||
|
||||
async fn get_pages(
|
||||
&self,
|
||||
_: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
|
||||
req: tonic::Request<tonic::Streaming<proto::GetPageRequest>>,
|
||||
) -> Result<tonic::Response<Self::GetPagesStream>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
// Extract the timeline from the request and check that it exists.
|
||||
let ttid = *Self::extract::<TenantTimelineId>(&req);
|
||||
let shard_index = *Self::extract::<ShardIndex>(&req);
|
||||
let shard_selector = ShardSelector::Known(shard_index);
|
||||
|
||||
let mut handles = TimelineHandles::new(self.tenant_manager.clone());
|
||||
handles
|
||||
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
|
||||
.await?;
|
||||
|
||||
let ctx = self.ctx.attached_child();
|
||||
let mut reqs = req.into_inner();
|
||||
|
||||
let resps = async_stream::try_stream! {
|
||||
let timeline = handles
|
||||
.get(ttid.tenant_id, ttid.timeline_id, shard_selector)
|
||||
.await?
|
||||
.downgrade();
|
||||
while let Some(req) = reqs.message().await? {
|
||||
// TODO: implement IoConcurrency sidecar.
|
||||
yield Self::handle_get_page_request(&ctx, &timeline, req, IoConcurrency::Sequential).await?
|
||||
}
|
||||
};
|
||||
|
||||
Ok(tonic::Response::new(Box::pin(resps)))
|
||||
}
|
||||
|
||||
async fn get_rel_size(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetRelSizeRequest>,
|
||||
req: tonic::Request<proto::GetRelSizeRequest>,
|
||||
) -> Result<tonic::Response<proto::GetRelSizeResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
let received_at = Self::extract::<ReceivedAt>(&req).0;
|
||||
let timeline = self.get_request_timeline(&req).await?;
|
||||
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
|
||||
|
||||
// Validate the request and convert it to a Pagestream request.
|
||||
Self::ensure_shard_zero(&req)?;
|
||||
let req: page_api::GetRelSizeRequest = req.into_inner().try_into()?;
|
||||
let req = PagestreamNblocksRequest {
|
||||
hdr: Self::make_hdr(req.read_lsn, 0),
|
||||
rel: req.rel,
|
||||
};
|
||||
|
||||
// Execute the request and convert the response.
|
||||
let _timer = Self::record_op_start_and_throttle(
|
||||
&timeline,
|
||||
metrics::SmgrQueryType::GetRelSize,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp = PageServerHandler::handle_get_nblocks_request(&timeline, &req, &ctx).await?;
|
||||
let resp: page_api::GetRelSizeResponse = resp.n_blocks;
|
||||
Ok(tonic::Response::new(resp.into()))
|
||||
}
|
||||
|
||||
async fn get_slru_segment(
|
||||
&self,
|
||||
_: tonic::Request<proto::GetSlruSegmentRequest>,
|
||||
req: tonic::Request<proto::GetSlruSegmentRequest>,
|
||||
) -> Result<tonic::Response<proto::GetSlruSegmentResponse>, tonic::Status> {
|
||||
Err(tonic::Status::unimplemented("not implemented"))
|
||||
let received_at = Self::extract::<ReceivedAt>(&req).0;
|
||||
let timeline = self.get_request_timeline(&req).await?;
|
||||
let ctx = self.ctx.with_scope_page_service_pagestream(&timeline);
|
||||
|
||||
// Validate the request and convert it to a Pagestream request.
|
||||
Self::ensure_shard_zero(&req)?;
|
||||
let req: page_api::GetSlruSegmentRequest = req.into_inner().try_into()?;
|
||||
let req = PagestreamGetSlruSegmentRequest {
|
||||
hdr: Self::make_hdr(req.read_lsn, 0),
|
||||
kind: req.kind as u8,
|
||||
segno: req.segno,
|
||||
};
|
||||
|
||||
// Execute the request and convert the response.
|
||||
let _timer = Self::record_op_start_and_throttle(
|
||||
&timeline,
|
||||
metrics::SmgrQueryType::GetSlruSegment,
|
||||
received_at,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp =
|
||||
PageServerHandler::handle_get_slru_segment_request(&timeline, &req, &ctx).await?;
|
||||
let resp: page_api::GetSlruSegmentResponse = resp.segment;
|
||||
Ok(tonic::Response::new(resp.try_into()?))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3370,10 +3725,24 @@ impl From<GetActiveTenantError> for QueryError {
|
||||
}
|
||||
}
|
||||
|
||||
/// gRPC interceptor that records the start time of request processing as a ReceivedAt extension.
|
||||
///
|
||||
/// TODO: generalize this for other observability information.
|
||||
#[derive(Clone)]
|
||||
struct ReceivedAtInterceptor;
|
||||
|
||||
#[derive(Clone)]
|
||||
struct ReceivedAt(Instant);
|
||||
|
||||
impl tonic::service::Interceptor for ReceivedAtInterceptor {
|
||||
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
|
||||
req.extensions_mut().insert(ReceivedAt(Instant::now()));
|
||||
Ok(req)
|
||||
}
|
||||
}
|
||||
|
||||
/// gRPC interceptor that decodes tenant metadata and stores it as request extensions of type
|
||||
/// TenantTimelineId and ShardIndex.
|
||||
///
|
||||
/// TODO: consider looking up the timeline handle here and storing it.
|
||||
#[derive(Clone)]
|
||||
struct TenantMetadataInterceptor;
|
||||
|
||||
@@ -3486,14 +3855,36 @@ impl From<GetActiveTimelineError> for QueryError {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<crate::tenant::timeline::handle::HandleUpgradeError> for QueryError {
|
||||
fn from(e: crate::tenant::timeline::handle::HandleUpgradeError) -> Self {
|
||||
impl From<HandleUpgradeError> for QueryError {
|
||||
fn from(e: HandleUpgradeError) -> Self {
|
||||
match e {
|
||||
crate::tenant::timeline::handle::HandleUpgradeError::ShutDown => QueryError::Shutdown,
|
||||
HandleUpgradeError::ShutDown => QueryError::Shutdown,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GetActiveTimelineError> for tonic::Status {
|
||||
fn from(err: GetActiveTimelineError) -> Self {
|
||||
use tonic::Code;
|
||||
let code = match &err {
|
||||
GetActiveTimelineError::Tenant(err) => match err {
|
||||
GetActiveTenantError::Broken(_) => Code::Internal,
|
||||
GetActiveTenantError::Cancelled => Code::Unavailable,
|
||||
GetActiveTenantError::NotFound(_) => Code::NotFound,
|
||||
GetActiveTenantError::SwitchedTenant => Code::Unavailable,
|
||||
GetActiveTenantError::WaitForActiveTimeout { .. } => Code::Unavailable,
|
||||
GetActiveTenantError::WillNotBecomeActive(_) => Code::Unavailable,
|
||||
},
|
||||
GetActiveTimelineError::Timeline(err) => match err {
|
||||
GetTimelineError::NotFound { .. } => Code::NotFound,
|
||||
GetTimelineError::NotActive { .. } => Code::Unavailable,
|
||||
GetTimelineError::ShuttingDown => Code::Unavailable,
|
||||
},
|
||||
};
|
||||
tonic::Status::new(code, format!("{err}"))
|
||||
}
|
||||
}
|
||||
|
||||
fn set_tracing_field_shard_id(timeline: &Timeline) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();
|
||||
tracing::Span::current().record(
|
||||
|
||||
@@ -274,7 +274,7 @@ impl Timeline {
|
||||
io_concurrency: IoConcurrency,
|
||||
ctx: &RequestContext,
|
||||
) -> Vec<Result<Bytes, PageReconstructError>> {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
//debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
|
||||
let mut slots_filled = 0;
|
||||
let page_count = pages.len();
|
||||
|
||||
@@ -300,7 +300,7 @@ pub struct TenantShard {
|
||||
/// as in progress.
|
||||
/// * Imported timelines are removed when the storage controller calls the post timeline
|
||||
/// import activation endpoint.
|
||||
timelines_importing: std::sync::Mutex<HashMap<TimelineId, Arc<ImportingTimeline>>>,
|
||||
timelines_importing: std::sync::Mutex<HashMap<TimelineId, ImportingTimeline>>,
|
||||
|
||||
/// The last tenant manifest known to be in remote storage. None if the manifest has not yet
|
||||
/// been either downloaded or uploaded. Always Some after tenant attach.
|
||||
@@ -383,7 +383,7 @@ pub struct TenantShard {
|
||||
|
||||
l0_flush_global_state: L0FlushGlobalState,
|
||||
|
||||
pub(crate) feature_resolver: FeatureResolver,
|
||||
feature_resolver: FeatureResolver,
|
||||
}
|
||||
impl std::fmt::Debug for TenantShard {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
@@ -672,7 +672,6 @@ pub enum MaybeOffloaded {
|
||||
pub enum TimelineOrOffloaded {
|
||||
Timeline(Arc<Timeline>),
|
||||
Offloaded(Arc<OffloadedTimeline>),
|
||||
Importing(Arc<ImportingTimeline>),
|
||||
}
|
||||
|
||||
impl TimelineOrOffloaded {
|
||||
@@ -684,9 +683,6 @@ impl TimelineOrOffloaded {
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => {
|
||||
TimelineOrOffloadedArcRef::Offloaded(offloaded)
|
||||
}
|
||||
TimelineOrOffloaded::Importing(importing) => {
|
||||
TimelineOrOffloadedArcRef::Importing(importing)
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn tenant_shard_id(&self) -> TenantShardId {
|
||||
@@ -699,16 +695,12 @@ impl TimelineOrOffloaded {
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => &timeline.delete_progress,
|
||||
TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.delete_progress,
|
||||
TimelineOrOffloaded::Importing(importing) => &importing.delete_progress,
|
||||
}
|
||||
}
|
||||
fn maybe_remote_client(&self) -> Option<Arc<RemoteTimelineClient>> {
|
||||
match self {
|
||||
TimelineOrOffloaded::Timeline(timeline) => Some(timeline.remote_client.clone()),
|
||||
TimelineOrOffloaded::Offloaded(_offloaded) => None,
|
||||
TimelineOrOffloaded::Importing(importing) => {
|
||||
Some(importing.timeline.remote_client.clone())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -716,7 +708,6 @@ impl TimelineOrOffloaded {
|
||||
pub enum TimelineOrOffloadedArcRef<'a> {
|
||||
Timeline(&'a Arc<Timeline>),
|
||||
Offloaded(&'a Arc<OffloadedTimeline>),
|
||||
Importing(&'a Arc<ImportingTimeline>),
|
||||
}
|
||||
|
||||
impl TimelineOrOffloadedArcRef<'_> {
|
||||
@@ -724,14 +715,12 @@ impl TimelineOrOffloadedArcRef<'_> {
|
||||
match self {
|
||||
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.tenant_shard_id,
|
||||
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.tenant_shard_id,
|
||||
TimelineOrOffloadedArcRef::Importing(importing) => importing.timeline.tenant_shard_id,
|
||||
}
|
||||
}
|
||||
pub fn timeline_id(&self) -> TimelineId {
|
||||
match self {
|
||||
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.timeline_id,
|
||||
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.timeline_id,
|
||||
TimelineOrOffloadedArcRef::Importing(importing) => importing.timeline.timeline_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -748,12 +737,6 @@ impl<'a> From<&'a Arc<OffloadedTimeline>> for TimelineOrOffloadedArcRef<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> From<&'a Arc<ImportingTimeline>> for TimelineOrOffloadedArcRef<'a> {
|
||||
fn from(timeline: &'a Arc<ImportingTimeline>) -> Self {
|
||||
Self::Importing(timeline)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
|
||||
pub enum GetTimelineError {
|
||||
#[error("Timeline is shutting down")]
|
||||
@@ -1806,25 +1789,20 @@ impl TenantShard {
|
||||
},
|
||||
) => {
|
||||
let timeline_id = timeline.timeline_id;
|
||||
let import_task_gate = Gate::default();
|
||||
let import_task_guard = import_task_gate.enter().unwrap();
|
||||
let import_task_handle =
|
||||
tokio::task::spawn(self.clone().create_timeline_import_pgdata_task(
|
||||
timeline.clone(),
|
||||
import_pgdata,
|
||||
guard,
|
||||
import_task_guard,
|
||||
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn),
|
||||
));
|
||||
|
||||
let prev = self.timelines_importing.lock().unwrap().insert(
|
||||
timeline_id,
|
||||
Arc::new(ImportingTimeline {
|
||||
ImportingTimeline {
|
||||
timeline: timeline.clone(),
|
||||
import_task_handle,
|
||||
import_task_gate,
|
||||
delete_progress: TimelineDeleteProgress::default(),
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
assert!(prev.is_none());
|
||||
@@ -2442,17 +2420,6 @@ impl TenantShard {
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Lists timelines the tenant contains.
|
||||
/// It's up to callers to omit certain timelines that are not considered ready for use.
|
||||
pub fn list_importing_timelines(&self) -> Vec<Arc<ImportingTimeline>> {
|
||||
self.timelines_importing
|
||||
.lock()
|
||||
.unwrap()
|
||||
.values()
|
||||
.map(Arc::clone)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Lists timelines the tenant manages, including offloaded ones.
|
||||
///
|
||||
/// It's up to callers to omit certain timelines that are not considered ready for use.
|
||||
@@ -2886,25 +2853,19 @@ impl TenantShard {
|
||||
|
||||
let (timeline, timeline_create_guard) = uninit_timeline.finish_creation_myself();
|
||||
|
||||
let import_task_gate = Gate::default();
|
||||
let import_task_guard = import_task_gate.enter().unwrap();
|
||||
|
||||
let import_task_handle = tokio::spawn(self.clone().create_timeline_import_pgdata_task(
|
||||
timeline.clone(),
|
||||
index_part,
|
||||
timeline_create_guard,
|
||||
import_task_guard,
|
||||
timeline_ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn),
|
||||
));
|
||||
|
||||
let prev = self.timelines_importing.lock().unwrap().insert(
|
||||
timeline.timeline_id,
|
||||
Arc::new(ImportingTimeline {
|
||||
ImportingTimeline {
|
||||
timeline: timeline.clone(),
|
||||
import_task_handle,
|
||||
import_task_gate,
|
||||
delete_progress: TimelineDeleteProgress::default(),
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
// Idempotency is enforced higher up the stack
|
||||
@@ -2963,7 +2924,6 @@ impl TenantShard {
|
||||
timeline: Arc<Timeline>,
|
||||
index_part: import_pgdata::index_part_format::Root,
|
||||
timeline_create_guard: TimelineCreateGuard,
|
||||
_import_task_guard: GateGuard,
|
||||
ctx: RequestContext,
|
||||
) {
|
||||
debug_assert_current_span_has_tenant_and_timeline_id();
|
||||
@@ -3875,9 +3835,6 @@ impl TenantShard {
|
||||
.build_timeline_client(offloaded.timeline_id, self.remote_storage.clone());
|
||||
Arc::new(remote_client)
|
||||
}
|
||||
TimelineOrOffloadedArcRef::Importing(_) => {
|
||||
unreachable!("Importing timelines are not included in the iterator")
|
||||
}
|
||||
};
|
||||
|
||||
// Shut down the timeline's remote client: this means that the indices we write
|
||||
@@ -5087,14 +5044,6 @@ impl TenantShard {
|
||||
info!("timeline already exists but is offloaded");
|
||||
Err(CreateTimelineError::Conflict)
|
||||
}
|
||||
Err(TimelineExclusionError::AlreadyExists {
|
||||
existing: TimelineOrOffloaded::Importing(_existing),
|
||||
..
|
||||
}) => {
|
||||
// If there's a timeline already importing, then we would hit
|
||||
// the [`TimelineExclusionError::AlreadyCreating`] branch above.
|
||||
unreachable!("Importing timelines hold the creation guard")
|
||||
}
|
||||
Err(TimelineExclusionError::AlreadyExists {
|
||||
existing: TimelineOrOffloaded::Timeline(existing),
|
||||
arg,
|
||||
@@ -5832,7 +5781,6 @@ pub(crate) mod harness {
|
||||
pub conf: &'static PageServerConf,
|
||||
pub tenant_conf: pageserver_api::models::TenantConfig,
|
||||
pub tenant_shard_id: TenantShardId,
|
||||
pub shard_identity: ShardIdentity,
|
||||
pub generation: Generation,
|
||||
pub shard: ShardIndex,
|
||||
pub remote_storage: GenericRemoteStorage,
|
||||
@@ -5900,7 +5848,6 @@ pub(crate) mod harness {
|
||||
conf,
|
||||
tenant_conf,
|
||||
tenant_shard_id,
|
||||
shard_identity,
|
||||
generation,
|
||||
shard,
|
||||
remote_storage,
|
||||
@@ -5962,7 +5909,8 @@ pub(crate) mod harness {
|
||||
&ShardParameters::default(),
|
||||
))
|
||||
.unwrap(),
|
||||
self.shard_identity,
|
||||
// This is a legacy/test code path: sharding isn't supported here.
|
||||
ShardIdentity::unsharded(),
|
||||
Some(walredo_mgr),
|
||||
self.tenant_shard_id,
|
||||
self.remote_storage.clone(),
|
||||
@@ -6084,7 +6032,6 @@ mod tests {
|
||||
use timeline::compaction::{KeyHistoryRetention, KeyLogAtLsn};
|
||||
use timeline::{CompactOptions, DeltaLayerTestDesc, VersionedKeySpaceQuery};
|
||||
use utils::id::TenantId;
|
||||
use utils::shard::{ShardCount, ShardNumber};
|
||||
|
||||
use super::*;
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
@@ -9420,77 +9367,6 @@ mod tests {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_failed_flush_should_not_update_disk_consistent_lsn() -> anyhow::Result<()> {
|
||||
//
|
||||
// Setup
|
||||
//
|
||||
let harness = TenantHarness::create_custom(
|
||||
"test_failed_flush_should_not_upload_disk_consistent_lsn",
|
||||
pageserver_api::models::TenantConfig::default(),
|
||||
TenantId::generate(),
|
||||
ShardIdentity::new(ShardNumber(0), ShardCount(4), ShardStripeSize(128)).unwrap(),
|
||||
Generation::new(1),
|
||||
)
|
||||
.await?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
let timeline = tenant
|
||||
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
|
||||
.await?;
|
||||
assert_eq!(timeline.get_shard_identity().count, ShardCount(4));
|
||||
let mut writer = timeline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
Lsn(0x20),
|
||||
&Value::Image(test_img("foo at 0x20")),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x20));
|
||||
drop(writer);
|
||||
timeline.freeze_and_flush().await.unwrap();
|
||||
|
||||
timeline.remote_client.wait_completion().await.unwrap();
|
||||
let disk_consistent_lsn = timeline.get_disk_consistent_lsn();
|
||||
let remote_consistent_lsn = timeline.get_remote_consistent_lsn_projected();
|
||||
assert_eq!(Some(disk_consistent_lsn), remote_consistent_lsn);
|
||||
|
||||
//
|
||||
// Test
|
||||
//
|
||||
|
||||
let mut writer = timeline.writer().await;
|
||||
writer
|
||||
.put(
|
||||
*TEST_KEY,
|
||||
Lsn(0x30),
|
||||
&Value::Image(test_img("foo at 0x30")),
|
||||
&ctx,
|
||||
)
|
||||
.await?;
|
||||
writer.finish_write(Lsn(0x30));
|
||||
drop(writer);
|
||||
|
||||
fail::cfg(
|
||||
"flush-layer-before-update-remote-consistent-lsn",
|
||||
"return()",
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let flush_res = timeline.freeze_and_flush().await;
|
||||
// if flush failed, the disk/remote consistent LSN should not be updated
|
||||
assert!(flush_res.is_err());
|
||||
assert_eq!(disk_consistent_lsn, timeline.get_disk_consistent_lsn());
|
||||
assert_eq!(
|
||||
remote_consistent_lsn,
|
||||
timeline.get_remote_consistent_lsn_projected()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[tokio::test]
|
||||
async fn test_simple_bottom_most_compaction_deltas_1() -> anyhow::Result<()> {
|
||||
|
||||
@@ -1348,21 +1348,6 @@ impl RemoteTimelineClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn schedule_unlinking_of_layers_from_index_part<I>(
|
||||
self: &Arc<Self>,
|
||||
names: I,
|
||||
) -> Result<(), NotInitialized>
|
||||
where
|
||||
I: IntoIterator<Item = LayerName>,
|
||||
{
|
||||
let mut guard = self.upload_queue.lock().unwrap();
|
||||
let upload_queue = guard.initialized_mut()?;
|
||||
|
||||
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update the remote index file, removing the to-be-deleted files from the index,
|
||||
/// allowing scheduling of actual deletions later.
|
||||
fn schedule_unlinking_of_layers_from_index_part0<I>(
|
||||
|
||||
@@ -4767,10 +4767,7 @@ impl Timeline {
|
||||
|| !flushed_to_lsn.is_valid()
|
||||
);
|
||||
|
||||
if flushed_to_lsn < frozen_to_lsn
|
||||
&& self.shard_identity.count.count() > 1
|
||||
&& result.is_ok()
|
||||
{
|
||||
if flushed_to_lsn < frozen_to_lsn && self.shard_identity.count.count() > 1 {
|
||||
// If our layer flushes didn't carry disk_consistent_lsn up to the `to_lsn` advertised
|
||||
// to us via layer_flush_start_rx, then advance it here.
|
||||
//
|
||||
@@ -4949,10 +4946,6 @@ impl Timeline {
|
||||
return Err(FlushLayerError::Cancelled);
|
||||
}
|
||||
|
||||
fail_point!("flush-layer-before-update-remote-consistent-lsn", |_| {
|
||||
Err(FlushLayerError::Other(anyhow!("failpoint").into()))
|
||||
});
|
||||
|
||||
let disk_consistent_lsn = Lsn(lsn_range.end.0 - 1);
|
||||
|
||||
// The new on-disk layers are now in the layer map. We can remove the
|
||||
|
||||
@@ -206,8 +206,8 @@ pub struct GcCompactionQueue {
|
||||
}
|
||||
|
||||
static CONCURRENT_GC_COMPACTION_TASKS: Lazy<Arc<Semaphore>> = Lazy::new(|| {
|
||||
// Only allow one timeline on one pageserver to run gc compaction at a time.
|
||||
Arc::new(Semaphore::new(1))
|
||||
// Only allow two timelines on one pageserver to run gc compaction at a time.
|
||||
Arc::new(Semaphore::new(2))
|
||||
});
|
||||
|
||||
impl GcCompactionQueue {
|
||||
|
||||
@@ -121,7 +121,6 @@ async fn remove_maybe_offloaded_timeline_from_tenant(
|
||||
// This observes the locking order between timelines and timelines_offloaded
|
||||
let mut timelines = tenant.timelines.lock().unwrap();
|
||||
let mut timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
|
||||
let mut timelines_importing = tenant.timelines_importing.lock().unwrap();
|
||||
let offloaded_children_exist = timelines_offloaded
|
||||
.iter()
|
||||
.any(|(_, entry)| entry.ancestor_timeline_id == Some(timeline.timeline_id()));
|
||||
@@ -151,12 +150,8 @@ async fn remove_maybe_offloaded_timeline_from_tenant(
|
||||
.expect("timeline that we were deleting was concurrently removed from 'timelines_offloaded' map");
|
||||
offloaded_timeline.delete_from_ancestor_with_timelines(&timelines);
|
||||
}
|
||||
TimelineOrOffloaded::Importing(importing) => {
|
||||
timelines_importing.remove(&importing.timeline.timeline_id);
|
||||
}
|
||||
}
|
||||
|
||||
drop(timelines_importing);
|
||||
drop(timelines_offloaded);
|
||||
drop(timelines);
|
||||
|
||||
@@ -208,17 +203,8 @@ impl DeleteTimelineFlow {
|
||||
guard.mark_in_progress()?;
|
||||
|
||||
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
|
||||
// TODO(vlad): shut down imported timeline here
|
||||
match &timeline {
|
||||
TimelineOrOffloaded::Timeline(timeline) => {
|
||||
timeline.shutdown(super::ShutdownMode::Hard).await;
|
||||
}
|
||||
TimelineOrOffloaded::Importing(importing) => {
|
||||
importing.shutdown().await;
|
||||
}
|
||||
TimelineOrOffloaded::Offloaded(_offloaded) => {
|
||||
// Nothing to shut down in this case
|
||||
}
|
||||
if let TimelineOrOffloaded::Timeline(timeline) = &timeline {
|
||||
timeline.shutdown(super::ShutdownMode::Hard).await;
|
||||
}
|
||||
|
||||
tenant.gc_block.before_delete(&timeline.timeline_id());
|
||||
@@ -403,18 +389,10 @@ impl DeleteTimelineFlow {
|
||||
Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))?
|
||||
});
|
||||
|
||||
match timeline {
|
||||
TimelineOrOffloaded::Timeline(timeline) => {
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await;
|
||||
}
|
||||
TimelineOrOffloaded::Importing(importing) => {
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, &importing.timeline)
|
||||
.await;
|
||||
}
|
||||
TimelineOrOffloaded::Offloaded(_offloaded) => {
|
||||
// Offloaded timelines have no local state
|
||||
// TODO: once we persist offloaded information, delete the timeline from there, too
|
||||
}
|
||||
// Offloaded timelines have no local state
|
||||
// TODO: once we persist offloaded information, delete the timeline from there, too
|
||||
if let TimelineOrOffloaded::Timeline(timeline) = timeline {
|
||||
delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await;
|
||||
}
|
||||
|
||||
fail::fail_point!("timeline-delete-after-rm", |_| {
|
||||
@@ -473,16 +451,12 @@ pub(super) fn make_timeline_delete_guard(
|
||||
// For more context see this discussion: `https://github.com/neondatabase/neon/pull/4552#discussion_r1253437346`
|
||||
let timelines = tenant.timelines.lock().unwrap();
|
||||
let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
|
||||
let timelines_importing = tenant.timelines_importing.lock().unwrap();
|
||||
|
||||
let timeline = match timelines.get(&timeline_id) {
|
||||
Some(t) => TimelineOrOffloaded::Timeline(Arc::clone(t)),
|
||||
None => match timelines_offloaded.get(&timeline_id) {
|
||||
Some(t) => TimelineOrOffloaded::Offloaded(Arc::clone(t)),
|
||||
None => match timelines_importing.get(&timeline_id) {
|
||||
Some(t) => TimelineOrOffloaded::Importing(Arc::clone(t)),
|
||||
None => return Err(DeleteTimelineError::NotFound),
|
||||
},
|
||||
None => return Err(DeleteTimelineError::NotFound),
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -8,10 +8,8 @@ use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::info;
|
||||
use utils::lsn::Lsn;
|
||||
use utils::pausable_failpoint;
|
||||
use utils::sync::gate::Gate;
|
||||
|
||||
use super::{Timeline, TimelineDeleteProgress};
|
||||
use super::Timeline;
|
||||
use crate::context::RequestContext;
|
||||
use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
|
||||
use crate::tenant::metadata::TimelineMetadata;
|
||||
@@ -21,23 +19,15 @@ mod importbucket_client;
|
||||
mod importbucket_format;
|
||||
pub(crate) mod index_part_format;
|
||||
|
||||
pub struct ImportingTimeline {
|
||||
pub(crate) struct ImportingTimeline {
|
||||
pub import_task_handle: JoinHandle<()>,
|
||||
pub import_task_gate: Gate,
|
||||
pub timeline: Arc<Timeline>,
|
||||
pub delete_progress: TimelineDeleteProgress,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ImportingTimeline {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "ImportingTimeline<{}>", self.timeline.timeline_id)
|
||||
}
|
||||
}
|
||||
|
||||
impl ImportingTimeline {
|
||||
pub async fn shutdown(&self) {
|
||||
pub(crate) async fn shutdown(self) {
|
||||
self.import_task_handle.abort();
|
||||
self.import_task_gate.close().await;
|
||||
let _ = self.import_task_handle.await;
|
||||
|
||||
self.timeline.remote_client.shutdown().await;
|
||||
}
|
||||
@@ -111,8 +101,6 @@ pub async fn doit(
|
||||
.schedule_index_upload_for_file_changes()?;
|
||||
timeline.remote_client.wait_completion().await?;
|
||||
|
||||
pausable_failpoint!("import-timeline-pre-success-notify-pausable");
|
||||
|
||||
// Communicate that shard is done.
|
||||
// Ensure at-least-once delivery of the upcall to storage controller
|
||||
// before we mark the task as done and never come here again.
|
||||
|
||||
@@ -11,14 +11,25 @@
|
||||
//! - => S3 as the source for the PGDATA instead of local filesystem
|
||||
//!
|
||||
//! TODOs before productionization:
|
||||
//! - ChunkProcessingJob size / ImportJob::total_size does not account for sharding.
|
||||
//! => produced image layers likely too small.
|
||||
//! - ChunkProcessingJob should cut up an ImportJob to hit exactly target image layer size.
|
||||
//! - asserts / unwraps need to be replaced with errors
|
||||
//! - don't trust remote objects will be small (=prevent OOMs in those cases)
|
||||
//! - limit all in-memory buffers in size, or download to disk and read from there
|
||||
//! - limit task concurrency
|
||||
//! - generally play nice with other tenants in the system
|
||||
//! - importbucket is different bucket than main pageserver storage, so, should be fine wrt S3 rate limits
|
||||
//! - but concerns like network bandwidth, local disk write bandwidth, local disk capacity, etc
|
||||
//! - integrate with layer eviction system
|
||||
//! - audit for Tenant::cancel nor Timeline::cancel responsivity
|
||||
//! - audit for Tenant/Timeline gate holding (we spawn tokio tasks during this flow!)
|
||||
//!
|
||||
//! An incomplete set of TODOs from the Hackathon:
|
||||
//! - version-specific CheckPointData (=> pgv abstraction, already exists for regular walingest)
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::num::NonZeroUsize;
|
||||
use std::ops::Range;
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -32,7 +43,7 @@ use pageserver_api::key::{
|
||||
rel_dir_to_key, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key,
|
||||
slru_segment_size_to_key,
|
||||
};
|
||||
use pageserver_api::keyspace::{ShardedRange, singleton_range};
|
||||
use pageserver_api::keyspace::{contiguous_range_len, is_contiguous_range, singleton_range};
|
||||
use pageserver_api::models::{ShardImportProgress, ShardImportProgressV1, ShardImportStatus};
|
||||
use pageserver_api::reltag::{RelTag, SlruKind};
|
||||
use pageserver_api::shard::ShardIdentity;
|
||||
@@ -89,24 +100,8 @@ async fn run_v1(
|
||||
tasks: Vec::default(),
|
||||
};
|
||||
|
||||
// Use the job size limit encoded in the progress if we are resuming an import.
|
||||
// This ensures that imports have stable plans even if the pageserver config changes.
|
||||
let import_config = {
|
||||
match &import_progress {
|
||||
Some(progress) => {
|
||||
let base = &timeline.conf.timeline_import_config;
|
||||
TimelineImportConfig {
|
||||
import_job_soft_size_limit: NonZeroUsize::new(progress.job_soft_size_limit)
|
||||
.unwrap(),
|
||||
import_job_concurrency: base.import_job_concurrency,
|
||||
import_job_checkpoint_threshold: base.import_job_checkpoint_threshold,
|
||||
}
|
||||
}
|
||||
None => timeline.conf.timeline_import_config.clone(),
|
||||
}
|
||||
};
|
||||
|
||||
let plan = planner.plan(&import_config).await?;
|
||||
let import_config = &timeline.conf.timeline_import_config;
|
||||
let plan = planner.plan(import_config).await?;
|
||||
|
||||
// Hash the plan and compare with the hash of the plan we got back from the storage controller.
|
||||
// If the two match, it means that the planning stage had the same output.
|
||||
@@ -131,7 +126,7 @@ async fn run_v1(
|
||||
pausable_failpoint!("import-timeline-pre-execute-pausable");
|
||||
|
||||
let start_from_job_idx = import_progress.map(|progress| progress.completed);
|
||||
plan.execute(timeline, start_from_job_idx, plan_hash, &import_config, ctx)
|
||||
plan.execute(timeline, start_from_job_idx, plan_hash, import_config, ctx)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -155,7 +150,6 @@ impl Planner {
|
||||
/// This function is and must remain pure: given the same input, it will generate the same import plan.
|
||||
async fn plan(mut self, import_config: &TimelineImportConfig) -> anyhow::Result<Plan> {
|
||||
let pgdata_lsn = Lsn(self.control_file.control_file_data().checkPoint).align();
|
||||
anyhow::ensure!(pgdata_lsn.is_valid());
|
||||
|
||||
let datadir = PgDataDir::new(&self.storage).await?;
|
||||
|
||||
@@ -238,22 +232,14 @@ impl Planner {
|
||||
});
|
||||
|
||||
// Assigns parts of key space to later parallel jobs
|
||||
// Note: The image layers produced here may have gaps, meaning,
|
||||
// there is not an image for each key in the layer's key range.
|
||||
// The read path stops traversal at the first image layer, regardless
|
||||
// of whether a base image has been found for a key or not.
|
||||
// (Concept of sparse image layers doesn't exist.)
|
||||
// This behavior is exactly right for the base image layers we're producing here.
|
||||
// But, since no other place in the code currently produces image layers with gaps,
|
||||
// it seems noteworthy.
|
||||
let mut last_end_key = Key::MIN;
|
||||
let mut current_chunk = Vec::new();
|
||||
let mut current_chunk_size: usize = 0;
|
||||
let mut jobs = Vec::new();
|
||||
for task in std::mem::take(&mut self.tasks).into_iter() {
|
||||
let task_size = task.total_size(&self.shard);
|
||||
let projected_chunk_size = current_chunk_size.saturating_add(task_size);
|
||||
if projected_chunk_size > import_config.import_job_soft_size_limit.into() {
|
||||
if current_chunk_size + task.total_size()
|
||||
> import_config.import_job_soft_size_limit.into()
|
||||
{
|
||||
let key_range = last_end_key..task.key_range().start;
|
||||
jobs.push(ChunkProcessingJob::new(
|
||||
key_range.clone(),
|
||||
@@ -263,7 +249,7 @@ impl Planner {
|
||||
last_end_key = key_range.end;
|
||||
current_chunk_size = 0;
|
||||
}
|
||||
current_chunk_size = current_chunk_size.saturating_add(task_size);
|
||||
current_chunk_size += task.total_size();
|
||||
current_chunk.push(task);
|
||||
}
|
||||
jobs.push(ChunkProcessingJob::new(
|
||||
@@ -467,7 +453,6 @@ impl Plan {
|
||||
jobs: jobs_in_plan,
|
||||
completed: last_completed_job_idx,
|
||||
import_plan_hash,
|
||||
job_soft_size_limit: import_config.import_job_soft_size_limit.into(),
|
||||
};
|
||||
|
||||
timeline.remote_client.schedule_index_upload_for_file_changes()?;
|
||||
@@ -601,18 +586,18 @@ impl PgDataDirDb {
|
||||
};
|
||||
|
||||
let path = datadir_path.join(rel_tag.to_segfile_name(segno));
|
||||
anyhow::ensure!(filesize % BLCKSZ as usize == 0);
|
||||
assert!(filesize % BLCKSZ as usize == 0); // TODO: this should result in an error
|
||||
let nblocks = filesize / BLCKSZ as usize;
|
||||
|
||||
Ok(PgDataDirDbFile {
|
||||
PgDataDirDbFile {
|
||||
path,
|
||||
filesize,
|
||||
rel_tag,
|
||||
segno,
|
||||
nblocks: Some(nblocks), // first non-cummulative sizes
|
||||
})
|
||||
}
|
||||
})
|
||||
.collect::<anyhow::Result<_, _>>()?;
|
||||
.collect();
|
||||
|
||||
// Set cummulative sizes. Do all of that math here, so that later we could easier
|
||||
// parallelize over segments and know with which segments we need to write relsize
|
||||
@@ -647,22 +632,12 @@ impl PgDataDirDb {
|
||||
trait ImportTask {
|
||||
fn key_range(&self) -> Range<Key>;
|
||||
|
||||
fn total_size(&self, shard_identity: &ShardIdentity) -> usize {
|
||||
let range = ShardedRange::new(self.key_range(), shard_identity);
|
||||
let page_count = range.page_count();
|
||||
if page_count == u32::MAX {
|
||||
tracing::warn!(
|
||||
"Import task has non contiguous key range: {}..{}",
|
||||
self.key_range().start,
|
||||
self.key_range().end
|
||||
);
|
||||
|
||||
// Tasks should operate on contiguous ranges. It is unexpected for
|
||||
// ranges to violate this assumption. Calling code handles this by mapping
|
||||
// any task on a non contiguous range to its own image layer.
|
||||
usize::MAX
|
||||
fn total_size(&self) -> usize {
|
||||
// TODO: revisit this
|
||||
if is_contiguous_range(&self.key_range()) {
|
||||
contiguous_range_len(&self.key_range()) as usize * 8192
|
||||
} else {
|
||||
page_count as usize * 8192
|
||||
u32::MAX as usize
|
||||
}
|
||||
}
|
||||
|
||||
@@ -760,8 +735,6 @@ impl ImportTask for ImportRelBlocksTask {
|
||||
layer_writer: &mut ImageLayerWriter,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<usize> {
|
||||
const MAX_BYTE_RANGE_SIZE: usize = 128 * 1024 * 1024;
|
||||
|
||||
debug!("Importing relation file");
|
||||
|
||||
let (rel_tag, start_blk) = self.key_range.start.to_rel_block()?;
|
||||
@@ -786,7 +759,7 @@ impl ImportTask for ImportRelBlocksTask {
|
||||
assert_eq!(key.len(), 1);
|
||||
assert!(!acc.is_empty());
|
||||
assert!(acc_end > acc_start);
|
||||
if acc_end == start && end - acc_start <= MAX_BYTE_RANGE_SIZE {
|
||||
if acc_end == start /* TODO additional max range check here, to limit memory consumption per task to X */ {
|
||||
acc.push(key.pop().unwrap());
|
||||
Ok((acc, acc_start, end))
|
||||
} else {
|
||||
@@ -801,8 +774,8 @@ impl ImportTask for ImportRelBlocksTask {
|
||||
.get_range(&self.path, range_start.into_u64(), range_end.into_u64())
|
||||
.await?;
|
||||
let mut buf = Bytes::from(range_buf);
|
||||
// TODO: batched writes
|
||||
for key in keys {
|
||||
// The writer buffers writes internally
|
||||
let image = buf.split_to(8192);
|
||||
layer_writer.put_image(key, image, ctx).await?;
|
||||
nimages += 1;
|
||||
@@ -855,9 +828,6 @@ impl ImportTask for ImportSlruBlocksTask {
|
||||
debug!("Importing SLRU segment file {}", self.path);
|
||||
let buf = self.storage.get(&self.path).await?;
|
||||
|
||||
// TODO(vlad): Does timestamp to LSN work for imported timelines?
|
||||
// Probably not since we don't append the `xact_time` to it as in
|
||||
// [`WalIngest::ingest_xact_record`].
|
||||
let (kind, segno, start_blk) = self.key_range.start.to_slru_block()?;
|
||||
let (_kind, _segno, end_blk) = self.key_range.end.to_slru_block()?;
|
||||
let mut blknum = start_blk;
|
||||
@@ -994,15 +964,6 @@ impl ChunkProcessingJob {
|
||||
.cloned();
|
||||
match existing_layer {
|
||||
Some(existing) => {
|
||||
// Unlink the remote layer from the index without scheduling its deletion.
|
||||
// When `existing_layer` drops [`LayerInner::drop`] will schedule its deletion from
|
||||
// remote storage, but that assumes that the layer was unlinked from the index first.
|
||||
timeline
|
||||
.remote_client
|
||||
.schedule_unlinking_of_layers_from_index_part(std::iter::once(
|
||||
existing.layer_desc().layer_name(),
|
||||
))?;
|
||||
|
||||
guard.open_mut()?.rewrite_layers(
|
||||
&[(existing.clone(), resident_layer.clone())],
|
||||
&[],
|
||||
|
||||
@@ -6,7 +6,7 @@ use bytes::Bytes;
|
||||
use postgres_ffi::ControlFileData;
|
||||
use remote_storage::{
|
||||
Download, DownloadError, DownloadKind, DownloadOpts, GenericRemoteStorage, Listing,
|
||||
ListingObject, RemotePath, RemoteStorageConfig,
|
||||
ListingObject, RemotePath,
|
||||
};
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -22,9 +22,11 @@ pub async fn new(
|
||||
location: &index_part_format::Location,
|
||||
cancel: CancellationToken,
|
||||
) -> Result<RemoteStorageWrapper, anyhow::Error> {
|
||||
// Downloads should be reasonably sized. We do ranged reads for relblock raw data
|
||||
// and full reads for SLRU segments which are bounded by Postgres.
|
||||
let timeout = RemoteStorageConfig::DEFAULT_TIMEOUT;
|
||||
// FIXME: we probably want some timeout, and we might be able to assume the max file
|
||||
// size on S3 is 1GiB (postgres segment size). But the problem is that the individual
|
||||
// downloaders don't know enough about concurrent downloads to make a guess on the
|
||||
// expected bandwidth and resulting best timeout.
|
||||
let timeout = std::time::Duration::from_secs(24 * 60 * 60);
|
||||
let location_storage = match location {
|
||||
#[cfg(feature = "testing")]
|
||||
index_part_format::Location::LocalFs { path } => {
|
||||
@@ -48,12 +50,9 @@ pub async fn new(
|
||||
.import_pgdata_aws_endpoint_url
|
||||
.clone()
|
||||
.map(|url| url.to_string()), // by specifying None here, remote_storage/aws-sdk-rust will infer from env
|
||||
// This matches the default import job concurrency. This is managed
|
||||
// separately from the usual S3 client, but the concern here is bandwidth
|
||||
// usage.
|
||||
concurrency_limit: 128.try_into().unwrap(),
|
||||
max_keys_per_list_response: Some(1000),
|
||||
upload_storage_class: None, // irrelevant
|
||||
concurrency_limit: 100.try_into().unwrap(), // TODO: think about this
|
||||
max_keys_per_list_response: Some(1000), // TODO: think about this
|
||||
upload_storage_class: None, // irrelevant
|
||||
},
|
||||
timeout,
|
||||
)
|
||||
|
||||
@@ -44,7 +44,6 @@ struct GlobalTimelinesState {
|
||||
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
|
||||
// this map is dropped on restart.
|
||||
tombstones: HashMap<TenantTimelineId, Instant>,
|
||||
tenant_tombstones: HashMap<TenantId, Instant>,
|
||||
|
||||
conf: Arc<SafeKeeperConf>,
|
||||
broker_active_set: Arc<TimelinesSet>,
|
||||
@@ -82,25 +81,10 @@ impl GlobalTimelinesState {
|
||||
}
|
||||
}
|
||||
|
||||
fn has_tombstone(&self, ttid: &TenantTimelineId) -> bool {
|
||||
self.tombstones.contains_key(ttid) || self.tenant_tombstones.contains_key(&ttid.tenant_id)
|
||||
}
|
||||
|
||||
/// Removes all blocking tombstones for the given timeline ID.
|
||||
/// Returns `true` if there have been actual changes.
|
||||
fn remove_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
|
||||
self.tombstones.remove(ttid).is_some()
|
||||
|| self.tenant_tombstones.remove(&ttid.tenant_id).is_some()
|
||||
}
|
||||
|
||||
fn delete(&mut self, ttid: TenantTimelineId) {
|
||||
self.timelines.remove(&ttid);
|
||||
self.tombstones.insert(ttid, Instant::now());
|
||||
}
|
||||
|
||||
fn add_tenant_tombstone(&mut self, tenant_id: TenantId) {
|
||||
self.tenant_tombstones.insert(tenant_id, Instant::now());
|
||||
}
|
||||
}
|
||||
|
||||
/// A struct used to manage access to the global timelines map.
|
||||
@@ -115,7 +99,6 @@ impl GlobalTimelines {
|
||||
state: Mutex::new(GlobalTimelinesState {
|
||||
timelines: HashMap::new(),
|
||||
tombstones: HashMap::new(),
|
||||
tenant_tombstones: HashMap::new(),
|
||||
conf,
|
||||
broker_active_set: Arc::new(TimelinesSet::default()),
|
||||
global_rate_limiter: RateLimiter::new(1, 1),
|
||||
@@ -262,7 +245,7 @@ impl GlobalTimelines {
|
||||
return Ok(timeline);
|
||||
}
|
||||
|
||||
if state.has_tombstone(&ttid) {
|
||||
if state.tombstones.contains_key(&ttid) {
|
||||
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
|
||||
}
|
||||
|
||||
@@ -312,14 +295,13 @@ impl GlobalTimelines {
|
||||
_ => {}
|
||||
}
|
||||
if check_tombstone {
|
||||
if state.has_tombstone(&ttid) {
|
||||
if state.tombstones.contains_key(&ttid) {
|
||||
anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
|
||||
}
|
||||
} else {
|
||||
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
|
||||
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
|
||||
// It's also possible that we enter this when the tenant has been deleted, even if the timeline itself has never existed.
|
||||
if state.remove_tombstone(&ttid) {
|
||||
if state.tombstones.remove(&ttid).is_some() {
|
||||
warn!("un-deleted timeline {ttid}");
|
||||
}
|
||||
}
|
||||
@@ -500,7 +482,6 @@ impl GlobalTimelines {
|
||||
let tli_res = {
|
||||
let state = self.state.lock().unwrap();
|
||||
|
||||
// Do NOT check tenant tombstones here: those were set earlier
|
||||
if state.tombstones.contains_key(ttid) {
|
||||
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
|
||||
info!("Timeline {ttid} was already deleted");
|
||||
@@ -576,10 +557,6 @@ impl GlobalTimelines {
|
||||
action: DeleteOrExclude,
|
||||
) -> Result<HashMap<TenantTimelineId, TimelineDeleteResult>> {
|
||||
info!("deleting all timelines for tenant {}", tenant_id);
|
||||
|
||||
// Adding a tombstone before getting the timelines to prevent new timeline additions
|
||||
self.state.lock().unwrap().add_tenant_tombstone(*tenant_id);
|
||||
|
||||
let to_delete = self.get_all_for_tenant(*tenant_id);
|
||||
|
||||
let mut err = None;
|
||||
@@ -623,9 +600,6 @@ impl GlobalTimelines {
|
||||
state
|
||||
.tombstones
|
||||
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
|
||||
state
|
||||
.tenant_tombstones
|
||||
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -482,10 +482,6 @@ async fn handle_tenant_timeline_delete(
|
||||
ForwardOutcome::NotForwarded(_req) => {}
|
||||
};
|
||||
|
||||
service
|
||||
.maybe_delete_timeline_import(tenant_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
// For timeline deletions, which both implement an "initially return 202, then 404 once
|
||||
// we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.
|
||||
async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>
|
||||
|
||||
@@ -139,14 +139,6 @@ pub(crate) struct StorageControllerMetricGroup {
|
||||
/// HTTP request status counters for handled requests
|
||||
pub(crate) storage_controller_reconcile_long_running:
|
||||
measured::CounterVec<ReconcileLongRunningLabelGroupSet>,
|
||||
|
||||
/// Indicator of safekeeper reconciler queue depth, broken down by safekeeper, excluding ongoing reconciles.
|
||||
pub(crate) storage_controller_safkeeper_reconciles_queued:
|
||||
measured::GaugeVec<SafekeeperReconcilerLabelGroupSet>,
|
||||
|
||||
/// Indicator of completed safekeeper reconciles, broken down by safekeeper.
|
||||
pub(crate) storage_controller_safkeeper_reconciles_complete:
|
||||
measured::CounterVec<SafekeeperReconcilerLabelGroupSet>,
|
||||
}
|
||||
|
||||
impl StorageControllerMetrics {
|
||||
@@ -265,17 +257,6 @@ pub(crate) enum Method {
|
||||
Other,
|
||||
}
|
||||
|
||||
#[derive(measured::LabelGroup, Clone)]
|
||||
#[label(set = SafekeeperReconcilerLabelGroupSet)]
|
||||
pub(crate) struct SafekeeperReconcilerLabelGroup<'a> {
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
pub(crate) sk_az: &'a str,
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
pub(crate) sk_node_id: &'a str,
|
||||
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
|
||||
pub(crate) sk_hostname: &'a str,
|
||||
}
|
||||
|
||||
impl From<hyper::Method> for Method {
|
||||
fn from(value: hyper::Method) -> Self {
|
||||
if value == hyper::Method::GET {
|
||||
|
||||
@@ -99,8 +99,8 @@ use crate::tenant_shard::{
|
||||
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
|
||||
};
|
||||
use crate::timeline_import::{
|
||||
FinalizingImport, ImportResult, ShardImportStatuses, TimelineImport,
|
||||
TimelineImportFinalizeError, TimelineImportState, UpcallClient,
|
||||
ImportResult, ShardImportStatuses, TimelineImport, TimelineImportFinalizeError,
|
||||
TimelineImportState, UpcallClient,
|
||||
};
|
||||
|
||||
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
|
||||
@@ -232,9 +232,6 @@ struct ServiceState {
|
||||
|
||||
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
|
||||
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
|
||||
|
||||
/// Tracks ongoing timeline import finalization tasks
|
||||
imports_finalizing: BTreeMap<(TenantId, TimelineId), FinalizingImport>,
|
||||
}
|
||||
|
||||
/// Transform an error from a pageserver into an error to return to callers of a storage
|
||||
@@ -311,7 +308,6 @@ impl ServiceState {
|
||||
scheduler,
|
||||
ongoing_operation: None,
|
||||
delayed_reconcile_rx,
|
||||
imports_finalizing: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4101,58 +4097,13 @@ impl Service {
|
||||
///
|
||||
/// If this method gets pre-empted by shut down, it will be called again at start-up (on-going
|
||||
/// imports are stored in the database).
|
||||
///
|
||||
/// # Cancel-Safety
|
||||
/// Not cancel safe.
|
||||
/// If the caller stops polling, the import will not be removed from
|
||||
/// [`ServiceState::imports_finalizing`].
|
||||
#[instrument(skip_all, fields(
|
||||
tenant_id=%import.tenant_id,
|
||||
timeline_id=%import.timeline_id,
|
||||
))]
|
||||
|
||||
async fn finalize_timeline_import(
|
||||
self: &Arc<Self>,
|
||||
import: TimelineImport,
|
||||
) -> Result<(), TimelineImportFinalizeError> {
|
||||
let tenant_timeline = (import.tenant_id, import.timeline_id);
|
||||
|
||||
let (_finalize_import_guard, cancel) = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let gate = Gate::default();
|
||||
let cancel = CancellationToken::default();
|
||||
|
||||
let guard = gate.enter().unwrap();
|
||||
|
||||
locked.imports_finalizing.insert(
|
||||
tenant_timeline,
|
||||
FinalizingImport {
|
||||
gate,
|
||||
cancel: cancel.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
(guard, cancel)
|
||||
};
|
||||
|
||||
let res = tokio::select! {
|
||||
res = self.finalize_timeline_import_impl(import) => {
|
||||
res
|
||||
},
|
||||
_ = cancel.cancelled() => {
|
||||
Err(TimelineImportFinalizeError::Cancelled)
|
||||
}
|
||||
};
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
locked.imports_finalizing.remove(&tenant_timeline);
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
async fn finalize_timeline_import_impl(
|
||||
self: &Arc<Self>,
|
||||
import: TimelineImport,
|
||||
) -> Result<(), TimelineImportFinalizeError> {
|
||||
tracing::info!("Finalizing timeline import");
|
||||
|
||||
@@ -4352,46 +4303,6 @@ impl Service {
|
||||
.await;
|
||||
}
|
||||
|
||||
/// Delete a timeline import if it exists
|
||||
///
|
||||
/// Firstly, delete the entry from the database. Any updates
|
||||
/// from pageservers after the update will fail with a 404, so the
|
||||
/// import cannot progress into finalizing state if it's not there already.
|
||||
/// Secondly, cancel the finalization if one is in progress.
|
||||
pub(crate) async fn maybe_delete_timeline_import(
|
||||
self: &Arc<Self>,
|
||||
tenant_id: TenantId,
|
||||
timeline_id: TimelineId,
|
||||
) -> Result<(), DatabaseError> {
|
||||
let tenant_has_ongoing_import = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
locked
|
||||
.tenants
|
||||
.range(TenantShardId::tenant_range(tenant_id))
|
||||
.any(|(_tid, shard)| shard.importing == TimelineImportState::Importing)
|
||||
};
|
||||
|
||||
if !tenant_has_ongoing_import {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.persistence
|
||||
.delete_timeline_import(tenant_id, timeline_id)
|
||||
.await?;
|
||||
|
||||
let maybe_finalizing = {
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
locked.imports_finalizing.remove(&(tenant_id, timeline_id))
|
||||
};
|
||||
|
||||
if let Some(finalizing) = maybe_finalizing {
|
||||
finalizing.cancel.cancel();
|
||||
finalizing.gate.close().await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_archival_config(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
@@ -8627,9 +8538,8 @@ impl Service {
|
||||
Some(ShardCount(new_shard_count))
|
||||
}
|
||||
|
||||
/// Fetches the top tenant shards from every available node, in descending order of
|
||||
/// max logical size. Offline nodes are skipped, and any errors from available nodes
|
||||
/// will be logged and ignored.
|
||||
/// Fetches the top tenant shards from every node, in descending order of
|
||||
/// max logical size. Any node errors will be logged and ignored.
|
||||
async fn get_top_tenant_shards(
|
||||
&self,
|
||||
request: &TopTenantShardsRequest,
|
||||
@@ -8640,7 +8550,6 @@ impl Service {
|
||||
.unwrap()
|
||||
.nodes
|
||||
.values()
|
||||
.filter(|node| node.is_available())
|
||||
.cloned()
|
||||
.collect_vec();
|
||||
|
||||
|
||||
@@ -20,9 +20,7 @@ use utils::{
|
||||
};
|
||||
|
||||
use crate::{
|
||||
metrics::{METRICS_REGISTRY, SafekeeperReconcilerLabelGroup},
|
||||
persistence::SafekeeperTimelineOpKind,
|
||||
safekeeper::Safekeeper,
|
||||
persistence::SafekeeperTimelineOpKind, safekeeper::Safekeeper,
|
||||
safekeeper_client::SafekeeperClient,
|
||||
};
|
||||
|
||||
@@ -220,26 +218,7 @@ impl ReconcilerHandle {
|
||||
fn schedule_reconcile(&self, req: ScheduleRequest) {
|
||||
let (cancel, token_id) = self.new_token_slot(req.tenant_id, req.timeline_id);
|
||||
let hostname = req.safekeeper.skp.host.clone();
|
||||
let sk_az = req.safekeeper.skp.availability_zone_id.clone();
|
||||
let sk_node_id = req.safekeeper.get_id().to_string();
|
||||
|
||||
// We don't have direct access to the queue depth here, so increase it blindly by 1.
|
||||
// We know that putting into the queue increases the queue depth. The receiver will
|
||||
// update with the correct value once it processes the next item. To avoid races where we
|
||||
// reduce before we increase, leaving the gauge with a 1 value for a long time, we
|
||||
// increase it before putting into the queue.
|
||||
let queued_gauge = &METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_safkeeper_reconciles_queued;
|
||||
let label_group = SafekeeperReconcilerLabelGroup {
|
||||
sk_az: &sk_az,
|
||||
sk_node_id: &sk_node_id,
|
||||
sk_hostname: &hostname,
|
||||
};
|
||||
queued_gauge.inc(label_group.clone());
|
||||
|
||||
if let Err(err) = self.tx.send((req, cancel, token_id)) {
|
||||
queued_gauge.set(label_group, 0);
|
||||
tracing::info!("scheduling request onto {hostname} returned error: {err}");
|
||||
}
|
||||
}
|
||||
@@ -304,18 +283,6 @@ impl SafekeeperReconciler {
|
||||
continue;
|
||||
}
|
||||
|
||||
let queued_gauge = &METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_safkeeper_reconciles_queued;
|
||||
queued_gauge.set(
|
||||
SafekeeperReconcilerLabelGroup {
|
||||
sk_az: &req.safekeeper.skp.availability_zone_id,
|
||||
sk_node_id: &req.safekeeper.get_id().to_string(),
|
||||
sk_hostname: &req.safekeeper.skp.host,
|
||||
},
|
||||
self.rx.len() as i64,
|
||||
);
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
let kind = req.kind;
|
||||
let tenant_id = req.tenant_id;
|
||||
@@ -544,16 +511,6 @@ impl SafekeeperReconcilerInner {
|
||||
req.generation,
|
||||
)
|
||||
.await;
|
||||
|
||||
let complete_counter = &METRICS_REGISTRY
|
||||
.metrics_group
|
||||
.storage_controller_safkeeper_reconciles_complete;
|
||||
complete_counter.inc(SafekeeperReconcilerLabelGroup {
|
||||
sk_az: &req.safekeeper.skp.availability_zone_id,
|
||||
sk_node_id: &req.safekeeper.get_id().to_string(),
|
||||
sk_hostname: &req.safekeeper.skp.host,
|
||||
});
|
||||
|
||||
if let Err(err) = res {
|
||||
tracing::info!(
|
||||
"couldn't remove reconciliation request onto {} from persistence: {err:?}",
|
||||
|
||||
@@ -7,7 +7,6 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use pageserver_api::models::{ShardImportProgress, ShardImportStatus};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::sync::gate::Gate;
|
||||
use utils::{
|
||||
id::{TenantId, TimelineId},
|
||||
shard::ShardIndex,
|
||||
@@ -56,8 +55,6 @@ pub(crate) enum TimelineImportUpdateFollowUp {
|
||||
pub(crate) enum TimelineImportFinalizeError {
|
||||
#[error("Shut down interrupted import finalize")]
|
||||
ShuttingDown,
|
||||
#[error("Import finalization was cancelled")]
|
||||
Cancelled,
|
||||
#[error("Mismatched shard detected during import finalize: {0}")]
|
||||
MismatchedShards(ShardIndex),
|
||||
}
|
||||
@@ -167,11 +164,6 @@ impl TimelineImport {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct FinalizingImport {
|
||||
pub(crate) gate: Gate,
|
||||
pub(crate) cancel: CancellationToken,
|
||||
}
|
||||
|
||||
pub(crate) type ImportResult = Result<(), String>;
|
||||
|
||||
pub(crate) struct UpcallClient {
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
@@ -12,7 +11,6 @@ from _pytest.config import Config
|
||||
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_cli import AbstractNeonCli
|
||||
from fixtures.neon_fixtures import Endpoint, VanillaPostgres
|
||||
from fixtures.pg_version import PgVersion
|
||||
from fixtures.remote_storage import MockS3Server
|
||||
|
||||
@@ -163,57 +161,3 @@ def fast_import(
|
||||
f.write(fi.cmd.stderr)
|
||||
|
||||
log.info("Written logs to %s", test_output_dir)
|
||||
|
||||
|
||||
def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path):
|
||||
"""
|
||||
Mock the import S3 bucket into a local directory for a provided vanilla PG instance.
|
||||
"""
|
||||
assert not vanilla_pg.is_running()
|
||||
|
||||
path.mkdir()
|
||||
# what cplane writes before scheduling fast_import
|
||||
specpath = path / "spec.json"
|
||||
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
|
||||
# what fast_import writes
|
||||
vanilla_pg.pgdatadir.rename(path / "pgdata")
|
||||
statusdir = path / "status"
|
||||
statusdir.mkdir()
|
||||
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
|
||||
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
|
||||
|
||||
|
||||
def populate_vanilla_pg(vanilla_pg: VanillaPostgres, target_relblock_size: int) -> int:
|
||||
assert vanilla_pg.is_running()
|
||||
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
# fillfactor so we don't need to produce that much data
|
||||
# 900 byte per row is > 10% => 1 row per page
|
||||
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
|
||||
|
||||
nrows = 0
|
||||
while True:
|
||||
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages"
|
||||
)
|
||||
if relblock_size >= target_relblock_size:
|
||||
break
|
||||
addrows = int((target_relblock_size - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
vanilla_pg.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
nrows += addrows
|
||||
|
||||
return nrows
|
||||
|
||||
|
||||
def validate_import_from_vanilla_pg(endpoint: Endpoint, nrows: int):
|
||||
assert endpoint.safe_psql_many(
|
||||
[
|
||||
"set effective_io_concurrency=32;",
|
||||
"SET statement_timeout='300s';",
|
||||
"select count(*), sum(data::bigint)::bigint from t",
|
||||
]
|
||||
) == [[], [], [(nrows, nrows * (nrows + 1) // 2)]]
|
||||
|
||||
@@ -536,14 +536,16 @@ class NeonLocalCli(AbstractNeonCli):
|
||||
res.check_returncode()
|
||||
return res
|
||||
|
||||
def endpoint_generate_jwt(self, endpoint_id: str, scope: list[ComputeClaimsScope]) -> str:
|
||||
def endpoint_generate_jwt(
|
||||
self, endpoint_id: str, scope: ComputeClaimsScope | None = None
|
||||
) -> str:
|
||||
"""
|
||||
Generate a JWT for making requests to the endpoint's external HTTP
|
||||
server.
|
||||
"""
|
||||
args = ["endpoint", "generate-jwt", endpoint_id]
|
||||
for s in scope:
|
||||
args += ["--scope", str(s)]
|
||||
if scope:
|
||||
args += ["--scope", str(scope)]
|
||||
|
||||
cmd = self.raw_cli(args)
|
||||
cmd.check_returncode()
|
||||
|
||||
@@ -2337,22 +2337,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
|
||||
headers=self.headers(TokenScope.ADMIN),
|
||||
)
|
||||
|
||||
def import_status(
|
||||
self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, generation: int
|
||||
):
|
||||
payload = {
|
||||
"tenant_shard_id": str(tenant_shard_id),
|
||||
"timeline_id": str(timeline_id),
|
||||
"generation": generation,
|
||||
}
|
||||
|
||||
self.request(
|
||||
"GET",
|
||||
f"{self.api}/upcall/v1/timeline_import_status",
|
||||
headers=self.headers(TokenScope.GENERATIONS_API),
|
||||
json=payload,
|
||||
)
|
||||
|
||||
def reconcile_all(self):
|
||||
r = self.request(
|
||||
"POST",
|
||||
@@ -2829,11 +2813,6 @@ class NeonPageserver(PgProtocol, LogUtils):
|
||||
if self.running:
|
||||
self.http_client().configure_failpoints([(name, action)])
|
||||
|
||||
def clear_persistent_failpoint(self, name: str):
|
||||
del self._persistent_failpoints[name]
|
||||
if self.running:
|
||||
self.http_client().configure_failpoints([(name, "off")])
|
||||
|
||||
def timeline_dir(
|
||||
self,
|
||||
tenant_shard_id: TenantId | TenantShardId,
|
||||
@@ -4282,7 +4261,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
self.config(config_lines)
|
||||
|
||||
self.__jwt = self.generate_jwt([])
|
||||
self.__jwt = self.generate_jwt()
|
||||
|
||||
return self
|
||||
|
||||
@@ -4329,7 +4308,7 @@ class Endpoint(PgProtocol, LogUtils):
|
||||
|
||||
return self
|
||||
|
||||
def generate_jwt(self, scope: list[ComputeClaimsScope]) -> str:
|
||||
def generate_jwt(self, scope: ComputeClaimsScope | None = None) -> str:
|
||||
"""
|
||||
Generate a JWT for making requests to the endpoint's external HTTP
|
||||
server.
|
||||
|
||||
@@ -675,7 +675,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
|
||||
def timeline_delete(
|
||||
self, tenant_id: TenantId | TenantShardId, timeline_id: TimelineId, **kwargs
|
||||
) -> int:
|
||||
):
|
||||
"""
|
||||
Note that deletion is not instant, it is scheduled and performed mostly in the background.
|
||||
So if you need to wait for it to complete use `timeline_delete_wait_completed`.
|
||||
@@ -688,8 +688,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
|
||||
res_json = res.json()
|
||||
assert res_json is None
|
||||
|
||||
return res.status_code
|
||||
|
||||
def timeline_gc(
|
||||
self,
|
||||
tenant_id: TenantId | TenantShardId,
|
||||
|
||||
@@ -1,41 +1,31 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import enum
|
||||
import json
|
||||
import time
|
||||
from collections import Counter
|
||||
from dataclasses import dataclass
|
||||
from enum import StrEnum
|
||||
from threading import Event
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn, TenantId, TimelineId
|
||||
from fixtures.fast_import import mock_import_bucket, populate_vanilla_pg
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
PgBin,
|
||||
VanillaPostgres,
|
||||
wait_for_last_flush_lsn,
|
||||
)
|
||||
from fixtures.pageserver.http import (
|
||||
ImportPgdataIdemptencyKey,
|
||||
)
|
||||
from fixtures.pageserver.utils import wait_for_upload_queue_empty
|
||||
from fixtures.remote_storage import RemoteStorageKind
|
||||
from fixtures.utils import human_bytes, run_only_on_default_postgres, wait_until
|
||||
from werkzeug.wrappers.response import Response
|
||||
from fixtures.utils import human_bytes, wait_until
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Iterable
|
||||
from typing import Any
|
||||
|
||||
from fixtures.pageserver.http import PageserverHttpClient
|
||||
from pytest_httpserver import HTTPServer
|
||||
from werkzeug.wrappers.request import Request
|
||||
|
||||
|
||||
GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"
|
||||
@@ -174,7 +164,6 @@ class EvictionEnv:
|
||||
min_avail_bytes,
|
||||
mock_behavior,
|
||||
eviction_order: EvictionOrder,
|
||||
wait_logical_size: bool = True,
|
||||
):
|
||||
"""
|
||||
Starts pageserver up with mocked statvfs setup. The startup is
|
||||
@@ -212,12 +201,11 @@ class EvictionEnv:
|
||||
pageserver.start()
|
||||
|
||||
# we now do initial logical size calculation on startup, which on debug builds can fight with disk usage based eviction
|
||||
if wait_logical_size:
|
||||
for tenant_id, timeline_id in self.timelines:
|
||||
tenant_ps = self.neon_env.get_tenant_pageserver(tenant_id)
|
||||
# Pageserver may be none if we are currently not attached anywhere, e.g. during secondary eviction test
|
||||
if tenant_ps is not None:
|
||||
tenant_ps.http_client().timeline_wait_logical_size(tenant_id, timeline_id)
|
||||
for tenant_id, timeline_id in self.timelines:
|
||||
tenant_ps = self.neon_env.get_tenant_pageserver(tenant_id)
|
||||
# Pageserver may be none if we are currently not attached anywhere, e.g. during secondary eviction test
|
||||
if tenant_ps is not None:
|
||||
tenant_ps.http_client().timeline_wait_logical_size(tenant_id, timeline_id)
|
||||
|
||||
def statvfs_called():
|
||||
pageserver.assert_log_contains(".*running mocked statvfs.*")
|
||||
@@ -894,121 +882,3 @@ def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv):
|
||||
assert total_size - post_eviction_total_size >= evict_bytes, (
|
||||
"we requested at least evict_bytes worth of free space"
|
||||
)
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason="PG version is irrelevant here")
|
||||
def test_import_timeline_disk_pressure_eviction(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
make_httpserver: HTTPServer,
|
||||
pg_bin: PgBin,
|
||||
):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
# Set up mock control plane HTTP server to listen for import completions
|
||||
import_completion_signaled = Event()
|
||||
|
||||
def handler(request: Request) -> Response:
|
||||
log.info(f"control plane /import_complete request: {request.json}")
|
||||
import_completion_signaled.set()
|
||||
return Response(json.dumps({}), status=200)
|
||||
|
||||
cplane_mgmt_api_server = make_httpserver
|
||||
cplane_mgmt_api_server.expect_request(
|
||||
"/storage/api/v1/import_complete", method="PUT"
|
||||
).respond_with_handler(handler)
|
||||
|
||||
# Plug the cplane mock in
|
||||
neon_env_builder.control_plane_hooks_api = (
|
||||
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
|
||||
)
|
||||
|
||||
# The import will specifiy a local filesystem path mocking remote storage
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
vanilla_pg.start()
|
||||
target_relblock_size = 1024 * 1024 * 128
|
||||
populate_vanilla_pg(vanilla_pg, target_relblock_size)
|
||||
vanilla_pg.stop()
|
||||
|
||||
env = neon_env_builder.init_configs()
|
||||
env.start()
|
||||
|
||||
importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket"
|
||||
mock_import_bucket(vanilla_pg, importbucket_path)
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
idempotency = ImportPgdataIdemptencyKey.random()
|
||||
|
||||
eviction_env = EvictionEnv(
|
||||
timelines=[(tenant_id, timeline_id)],
|
||||
neon_env=env,
|
||||
pageserver_http=env.pageserver.http_client(),
|
||||
layer_size=5 * 1024 * 1024, # Doesn't apply here
|
||||
pg_bin=pg_bin, # Not used here
|
||||
pgbench_init_lsns={}, # Not used here
|
||||
)
|
||||
|
||||
# Pause before delivering the final notification to storcon.
|
||||
# This keeps the import in progress.
|
||||
failpoint_name = "import-timeline-pre-success-notify-pausable"
|
||||
env.pageserver.add_persistent_failpoint(failpoint_name, "pause")
|
||||
|
||||
env.storage_controller.tenant_create(tenant_id)
|
||||
env.storage_controller.timeline_create(
|
||||
tenant_id,
|
||||
{
|
||||
"new_timeline_id": str(timeline_id),
|
||||
"import_pgdata": {
|
||||
"idempotency_key": str(idempotency),
|
||||
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
def hit_failpoint():
|
||||
log.info("Checking log for pattern...")
|
||||
try:
|
||||
assert env.pageserver.log_contains(f".*at failpoint {failpoint_name}.*")
|
||||
except Exception:
|
||||
log.exception("Failed to find pattern in log")
|
||||
raise
|
||||
|
||||
wait_until(hit_failpoint)
|
||||
assert not import_completion_signaled.is_set()
|
||||
|
||||
env.pageserver.stop()
|
||||
|
||||
total_size, _, _ = eviction_env.timelines_du(env.pageserver)
|
||||
blocksize = 512
|
||||
total_blocks = (total_size + (blocksize - 1)) // blocksize
|
||||
|
||||
eviction_env.pageserver_start_with_disk_usage_eviction(
|
||||
env.pageserver,
|
||||
period="1s",
|
||||
max_usage_pct=33,
|
||||
min_avail_bytes=0,
|
||||
mock_behavior={
|
||||
"type": "Success",
|
||||
"blocksize": blocksize,
|
||||
"total_blocks": total_blocks,
|
||||
# Only count layer files towards used bytes in the mock_statvfs.
|
||||
# This avoids accounting for metadata files & tenant conf in the tests.
|
||||
"name_filter": ".*__.*",
|
||||
},
|
||||
eviction_order=EvictionOrder.RELATIVE_ORDER_SPARE,
|
||||
wait_logical_size=False,
|
||||
)
|
||||
|
||||
wait_until(lambda: env.pageserver.assert_log_contains(".*disk usage pressure relieved"))
|
||||
|
||||
env.pageserver.clear_persistent_failpoint(failpoint_name)
|
||||
|
||||
def cplane_notified():
|
||||
assert import_completion_signaled.is_set()
|
||||
|
||||
wait_until(cplane_notified)
|
||||
|
||||
env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")
|
||||
|
||||
@@ -12,19 +12,13 @@ import psycopg2
|
||||
import psycopg2.errors
|
||||
import pytest
|
||||
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
|
||||
from fixtures.fast_import import (
|
||||
FastImport,
|
||||
mock_import_bucket,
|
||||
populate_vanilla_pg,
|
||||
validate_import_from_vanilla_pg,
|
||||
)
|
||||
from fixtures.fast_import import FastImport
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnvBuilder,
|
||||
PageserverImportConfig,
|
||||
PgBin,
|
||||
PgProtocol,
|
||||
StorageControllerApiException,
|
||||
StorageControllerMigrationConfig,
|
||||
VanillaPostgres,
|
||||
)
|
||||
@@ -65,6 +59,24 @@ smoke_params = [
|
||||
]
|
||||
|
||||
|
||||
def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path):
|
||||
"""
|
||||
Mock the import S3 bucket into a local directory for a provided vanilla PG instance.
|
||||
"""
|
||||
assert not vanilla_pg.is_running()
|
||||
|
||||
path.mkdir()
|
||||
# what cplane writes before scheduling fast_import
|
||||
specpath = path / "spec.json"
|
||||
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
|
||||
# what fast_import writes
|
||||
vanilla_pg.pgdatadir.rename(path / "pgdata")
|
||||
statusdir = path / "status"
|
||||
statusdir.mkdir()
|
||||
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
|
||||
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
|
||||
|
||||
|
||||
@skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data")
|
||||
@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params)
|
||||
def test_pgdata_import_smoke(
|
||||
@@ -119,6 +131,10 @@ def test_pgdata_import_smoke(
|
||||
# Put data in vanilla pg
|
||||
#
|
||||
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
|
||||
log.info("create relblock data")
|
||||
if rel_block_size == RelBlockSize.ONE_STRIPE_SIZE:
|
||||
target_relblock_size = stripe_size * 8192
|
||||
elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD:
|
||||
@@ -129,8 +145,45 @@ def test_pgdata_import_smoke(
|
||||
else:
|
||||
raise ValueError
|
||||
|
||||
vanilla_pg.start()
|
||||
rows_inserted = populate_vanilla_pg(vanilla_pg, target_relblock_size)
|
||||
# fillfactor so we don't need to produce that much data
|
||||
# 900 byte per row is > 10% => 1 row per page
|
||||
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
|
||||
|
||||
nrows = 0
|
||||
while True:
|
||||
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages"
|
||||
)
|
||||
if relblock_size >= target_relblock_size:
|
||||
break
|
||||
addrows = int((target_relblock_size - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
vanilla_pg.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
nrows += addrows
|
||||
expect_nrows = nrows
|
||||
expect_sum = (
|
||||
(nrows) * (nrows + 1) // 2
|
||||
) # https://stackoverflow.com/questions/43901484/sum-of-the-integers-from-1-to-n
|
||||
|
||||
def validate_vanilla_equivalence(ep):
|
||||
# TODO: would be nicer to just compare pgdump
|
||||
|
||||
# Enable IO concurrency for batching on large sequential scan, to avoid making
|
||||
# this test unnecessarily onerous on CPU. Especially on debug mode, it's still
|
||||
# pretty onerous though, so increase statement_timeout to avoid timeouts.
|
||||
assert ep.safe_psql_many(
|
||||
[
|
||||
"set effective_io_concurrency=32;",
|
||||
"SET statement_timeout='300s';",
|
||||
"select count(*), sum(data::bigint)::bigint from t",
|
||||
]
|
||||
) == [[], [], [(expect_nrows, expect_sum)]]
|
||||
|
||||
validate_vanilla_equivalence(vanilla_pg)
|
||||
|
||||
vanilla_pg.stop()
|
||||
|
||||
#
|
||||
@@ -221,14 +274,14 @@ def test_pgdata_import_smoke(
|
||||
config_lines=ep_config,
|
||||
)
|
||||
|
||||
validate_import_from_vanilla_pg(ro_endpoint, rows_inserted)
|
||||
validate_vanilla_equivalence(ro_endpoint)
|
||||
|
||||
# ensure the import survives restarts
|
||||
ro_endpoint.stop()
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.pageserver.start()
|
||||
ro_endpoint.start()
|
||||
validate_import_from_vanilla_pg(ro_endpoint, rows_inserted)
|
||||
validate_vanilla_equivalence(ro_endpoint)
|
||||
|
||||
#
|
||||
# validate the layer files in each shard only have the shard-specific data
|
||||
@@ -268,7 +321,7 @@ def test_pgdata_import_smoke(
|
||||
child_workload = workload.branch(timeline_id=child_timeline_id, branch_name="br-tip")
|
||||
child_workload.validate()
|
||||
|
||||
validate_import_from_vanilla_pg(child_workload.endpoint(), rows_inserted)
|
||||
validate_vanilla_equivalence(child_workload.endpoint())
|
||||
|
||||
# ... at the initdb lsn
|
||||
_ = env.create_branch(
|
||||
@@ -283,21 +336,10 @@ def test_pgdata_import_smoke(
|
||||
tenant_id=tenant_id,
|
||||
config_lines=ep_config,
|
||||
)
|
||||
validate_import_from_vanilla_pg(br_initdb_endpoint, rows_inserted)
|
||||
validate_vanilla_equivalence(br_initdb_endpoint)
|
||||
with pytest.raises(psycopg2.errors.UndefinedTable):
|
||||
br_initdb_endpoint.safe_psql(f"select * from {workload.table}")
|
||||
|
||||
# The storage controller might be overly eager and attempt to finalize
|
||||
# the import before the task got a chance to exit.
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*Call to node.*management API.*failed.*Import task still running.*",
|
||||
]
|
||||
)
|
||||
|
||||
for ps in env.pageservers:
|
||||
ps.allowed_errors.extend([".*Error processing HTTP request.*Import task not done yet.*"])
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason="PG version is irrelevant here")
|
||||
def test_import_completion_on_restart(
|
||||
@@ -381,12 +423,8 @@ def test_import_completion_on_restart(
|
||||
|
||||
|
||||
@run_only_on_default_postgres(reason="PG version is irrelevant here")
|
||||
@pytest.mark.parametrize("action", ["restart", "delete"])
|
||||
def test_import_respects_timeline_lifecycle(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
vanilla_pg: VanillaPostgres,
|
||||
make_httpserver: HTTPServer,
|
||||
action: str,
|
||||
def test_import_respects_tenant_shutdown(
|
||||
neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer
|
||||
):
|
||||
"""
|
||||
Validate that importing timelines respect the usual timeline life cycle:
|
||||
@@ -454,44 +492,16 @@ def test_import_respects_timeline_lifecycle(
|
||||
wait_until(hit_failpoint)
|
||||
assert not import_completion_signaled.is_set()
|
||||
|
||||
if action == "restart":
|
||||
# Restart the pageserver while an import job is in progress.
|
||||
# This clears the failpoint and we expect that the import starts up afresh
|
||||
# after the restart and eventually completes.
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
# Restart the pageserver while an import job is in progress.
|
||||
# This clears the failpoint and we expect that the import starts up afresh
|
||||
# after the restart and eventually completes.
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start()
|
||||
|
||||
def cplane_notified():
|
||||
assert import_completion_signaled.is_set()
|
||||
def cplane_notified():
|
||||
assert import_completion_signaled.is_set()
|
||||
|
||||
wait_until(cplane_notified)
|
||||
elif action == "delete":
|
||||
status = env.storage_controller.pageserver_api().timeline_delete(tenant_id, timeline_id)
|
||||
assert status == 200
|
||||
|
||||
timeline_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
|
||||
assert not timeline_path.exists(), "Timeline dir exists after deletion"
|
||||
|
||||
shard_zero = TenantShardId(tenant_id, 0, 0)
|
||||
location = env.storage_controller.inspect(shard_zero)
|
||||
assert location is not None
|
||||
generation = location[0]
|
||||
|
||||
with pytest.raises(StorageControllerApiException, match="not found"):
|
||||
env.storage_controller.import_status(shard_zero, timeline_id, generation)
|
||||
else:
|
||||
raise RuntimeError(f"{action} param not recognized")
|
||||
|
||||
# The storage controller might be overly eager and attempt to finalize
|
||||
# the import before the task got a chance to exit.
|
||||
env.storage_controller.allowed_errors.extend(
|
||||
[
|
||||
".*Call to node.*management API.*failed.*Import task still running.*",
|
||||
]
|
||||
)
|
||||
|
||||
for ps in env.pageservers:
|
||||
ps.allowed_errors.extend([".*Error processing HTTP request.*Import task not done yet.*"])
|
||||
wait_until(cplane_notified)
|
||||
|
||||
|
||||
@skip_in_debug_build("Validation query takes too long in debug builds")
|
||||
@@ -546,8 +556,23 @@ def test_import_chaos(
|
||||
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
|
||||
|
||||
vanilla_pg.start()
|
||||
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
|
||||
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
|
||||
|
||||
inserted_rows = populate_vanilla_pg(vanilla_pg, TARGET_RELBOCK_SIZE)
|
||||
nrows = 0
|
||||
while True:
|
||||
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
|
||||
log.info(
|
||||
f"relblock size: {relblock_size / 8192} pages (target: {TARGET_RELBOCK_SIZE // 8192}) pages"
|
||||
)
|
||||
if relblock_size >= TARGET_RELBOCK_SIZE:
|
||||
break
|
||||
addrows = int((TARGET_RELBOCK_SIZE - relblock_size) // 8192)
|
||||
assert addrows >= 1, "forward progress"
|
||||
vanilla_pg.safe_psql(
|
||||
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
|
||||
)
|
||||
nrows += addrows
|
||||
|
||||
vanilla_pg.stop()
|
||||
|
||||
@@ -715,7 +740,13 @@ def test_import_chaos(
|
||||
endpoint = env.endpoints.create_start(branch_name=import_branch_name, tenant_id=tenant_id)
|
||||
|
||||
# Validate the imported data is legit
|
||||
validate_import_from_vanilla_pg(endpoint, inserted_rows)
|
||||
assert endpoint.safe_psql_many(
|
||||
[
|
||||
"set effective_io_concurrency=32;",
|
||||
"SET statement_timeout='300s';",
|
||||
"select count(*), sum(data::bigint)::bigint from t",
|
||||
]
|
||||
) == [[], [], [(nrows, nrows * (nrows + 1) // 2)]]
|
||||
|
||||
endpoint.stop()
|
||||
|
||||
|
||||
@@ -124,9 +124,6 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
|
||||
".*downloading failed, possibly for shutdown",
|
||||
# {tenant_id=... timeline_id=...}:handle_pagerequests:handle_get_page_at_lsn_request{rel=1664/0/1260 blkno=0 req_lsn=0/149F0D8}: error reading relation or page version: Not found: will not become active. Current state: Stopping\n'
|
||||
".*page_service.*will not become active.*",
|
||||
# the following errors are possible when pageserver tries to ingest wal records despite being in unreadable state
|
||||
".*wal_connection_manager.*layer file download failed: No file found.*",
|
||||
".*wal_connection_manager.*could not ingest record.*",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -159,45 +156,6 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
|
||||
env.pageservers[2].id: ("Detached", None),
|
||||
}
|
||||
|
||||
# Track all the attached locations with mode and generation
|
||||
history: list[tuple[int, str, int | None]] = []
|
||||
|
||||
def may_read(pageserver: NeonPageserver, mode: str, generation: int | None) -> bool:
|
||||
# Rules for when a pageserver may read:
|
||||
# - our generation is higher than any previous
|
||||
# - our generation is equal to previous, but no other pageserver
|
||||
# in that generation has been AttachedSingle (i.e. allowed to compact/GC)
|
||||
# - our generation is equal to previous, and the previous holder of this
|
||||
# generation was the same node as we're attaching now.
|
||||
#
|
||||
# If these conditions are not met, then a read _might_ work, but the pageserver might
|
||||
# also hit errors trying to download layers.
|
||||
highest_historic_generation = max([i[2] for i in history if i[2] is not None], default=None)
|
||||
|
||||
if generation is None:
|
||||
# We're not in an attached state, we may not read
|
||||
return False
|
||||
elif highest_historic_generation is not None and generation < highest_historic_generation:
|
||||
# We are in an outdated generation, we may not read
|
||||
return False
|
||||
elif highest_historic_generation is not None and generation == highest_historic_generation:
|
||||
# We are re-using a generation: if any pageserver other than this one
|
||||
# has held AttachedSingle mode, this node may not read (because some other
|
||||
# node may be doing GC/compaction).
|
||||
if any(
|
||||
i[1] == "AttachedSingle"
|
||||
and i[2] == highest_historic_generation
|
||||
and i[0] != pageserver.id
|
||||
for i in history
|
||||
):
|
||||
log.info(
|
||||
f"Skipping read on {pageserver.id} because other pageserver has been in AttachedSingle mode in generation {highest_historic_generation}"
|
||||
)
|
||||
return False
|
||||
|
||||
# Fall through: we have passed conditions for readability
|
||||
return True
|
||||
|
||||
latest_attached = env.pageservers[0].id
|
||||
|
||||
for _i in range(0, 64):
|
||||
@@ -241,10 +199,9 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
|
||||
assert len(tenants) == 1
|
||||
assert tenants[0]["generation"] == new_generation
|
||||
|
||||
if may_read(pageserver, last_state_ps[0], last_state_ps[1]):
|
||||
log.info("Entering postgres...")
|
||||
workload.churn_rows(rng.randint(128, 256), pageserver.id)
|
||||
workload.validate(pageserver.id)
|
||||
log.info("Entering postgres...")
|
||||
workload.churn_rows(rng.randint(128, 256), pageserver.id)
|
||||
workload.validate(pageserver.id)
|
||||
elif last_state_ps[0].startswith("Attached"):
|
||||
# The `storage_controller` will only re-attach on startup when a pageserver was the
|
||||
# holder of the latest generation: otherwise the pageserver will revert to detached
|
||||
@@ -284,16 +241,18 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
|
||||
location_conf["generation"] = generation
|
||||
|
||||
pageserver.tenant_location_configure(tenant_id, location_conf)
|
||||
|
||||
last_state[pageserver.id] = (mode, generation)
|
||||
|
||||
may_read_this_generation = may_read(pageserver, mode, generation)
|
||||
history.append((pageserver.id, mode, generation))
|
||||
# It's only valid to connect to the last generation. Newer generations may yank layer
|
||||
# files used in older generations.
|
||||
last_generation = max(
|
||||
[s[1] for s in last_state.values() if s[1] is not None], default=None
|
||||
)
|
||||
|
||||
# This is a basic test: we are validating that he endpoint works properly _between_
|
||||
# configuration changes. A stronger test would be to validate that clients see
|
||||
# no errors while we are making the changes.
|
||||
if may_read_this_generation:
|
||||
if mode.startswith("Attached") and generation == last_generation:
|
||||
# This is a basic test: we are validating that he endpoint works properly _between_
|
||||
# configuration changes. A stronger test would be to validate that clients see
|
||||
# no errors while we are making the changes.
|
||||
workload.churn_rows(
|
||||
rng.randint(128, 256), pageserver.id, upload=mode != "AttachedStale"
|
||||
)
|
||||
@@ -306,16 +265,9 @@ def test_location_conf_churn(neon_env_builder: NeonEnvBuilder, make_httpserver,
|
||||
assert gc_summary["remote_storage_errors"] == 0
|
||||
assert gc_summary["indices_deleted"] > 0
|
||||
|
||||
# Attach all pageservers, in a higher generation than any previous. We will use the same
|
||||
# gen for all, and AttachedMulti mode so that they do not interfere with one another.
|
||||
generation = env.storage_controller.attach_hook_issue(tenant_id, env.pageservers[0].id)
|
||||
# Attach all pageservers
|
||||
for ps in env.pageservers:
|
||||
location_conf = {
|
||||
"mode": "AttachedMulti",
|
||||
"secondary_conf": None,
|
||||
"tenant_conf": {},
|
||||
"generation": generation,
|
||||
}
|
||||
location_conf = {"mode": "AttachedMulti", "secondary_conf": None, "tenant_conf": {}}
|
||||
ps.tenant_location_configure(tenant_id, location_conf)
|
||||
|
||||
# Confirm that all are readable
|
||||
|
||||
@@ -4192,10 +4192,10 @@ def test_storcon_create_delete_sk_down(
|
||||
# ensure the safekeeper deleted the timeline
|
||||
def timeline_deleted_on_active_sks():
|
||||
env.safekeepers[0].assert_log_contains(
|
||||
f"((deleting timeline|Timeline) {tenant_id}/{child_timeline_id} (from disk|was already deleted)|DELETE.*tenant/{tenant_id} .*status: 200 OK)"
|
||||
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
|
||||
)
|
||||
env.safekeepers[2].assert_log_contains(
|
||||
f"((deleting timeline|Timeline) {tenant_id}/{child_timeline_id} (from disk|was already deleted)|DELETE.*tenant/{tenant_id} .*status: 200 OK)"
|
||||
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
|
||||
)
|
||||
|
||||
wait_until(timeline_deleted_on_active_sks)
|
||||
@@ -4210,7 +4210,7 @@ def test_storcon_create_delete_sk_down(
|
||||
# ensure that there is log msgs for the third safekeeper too
|
||||
def timeline_deleted_on_sk():
|
||||
env.safekeepers[1].assert_log_contains(
|
||||
f"((deleting timeline|Timeline) {tenant_id}/{child_timeline_id} (from disk|was already deleted)|DELETE.*tenant/{tenant_id} .*status: 200 OK)"
|
||||
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
|
||||
)
|
||||
|
||||
wait_until(timeline_deleted_on_sk)
|
||||
|
||||
Reference in New Issue
Block a user