Compare commits

..

9 Commits

Author SHA1 Message Date
Elizabeth Murray
04440343f8 Pagebench with grpc option. Note that grpc is on port 51050, so requires a connstring to be set. 2025-05-28 14:44:28 -07:00
Elizabeth Murray
578b7f1668 Remove "pub" for module module in pageserver_page_api. 2025-05-28 13:10:09 -07:00
Elizabeth Murray
97f18dd013 Remove unnecessary whitespace. 2025-05-28 12:54:53 -07:00
Elizabeth Murray
c8abe7e90f Remove unnecessary model changes. 2025-05-28 12:53:28 -07:00
Elizabeth Murray
7160fd16cd Response to review comments, code cleanup. 2025-05-28 12:40:21 -07:00
Elizabeth Murray
13b9d4cb67 Merge branch 'main' into elizabeth/communicator-grpc-minimal-domain-client 2025-05-28 09:40:47 -07:00
Elizabeth Murray
f0982f9a0a Clean up dependencies. 2025-05-28 08:51:01 -07:00
Elizabeth Murray
1634af6d10 Move conversion from string out of the auth interceptor. 2025-05-28 08:45:20 -07:00
Elizabeth Murray
53c1a7ca7f Add minimal GRPC client code that will be used for pagebench. 2025-05-28 08:09:45 -07:00
26 changed files with 634 additions and 705 deletions

90
Cargo.lock generated
View File

@@ -701,7 +701,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"itoa",
"matchit",
@@ -718,7 +718,7 @@ dependencies = [
"sync_wrapper 1.0.1",
"tokio",
"tokio-tungstenite 0.26.1",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
"tracing",
@@ -761,7 +761,7 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
]
@@ -1337,7 +1337,7 @@ dependencies = [
"tokio-postgres",
"tokio-stream",
"tokio-util",
"tower 0.5.2",
"tower",
"tower-http",
"tower-otel",
"tracing",
@@ -2066,7 +2066,7 @@ dependencies = [
"test-log",
"tokio",
"tokio-util",
"tower 0.5.2",
"tower",
"tracing",
"utils",
"workspace_hack",
@@ -2330,7 +2330,7 @@ dependencies = [
"futures-core",
"futures-sink",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"pin-project",
"rand 0.8.5",
@@ -2883,9 +2883,9 @@ dependencies = [
[[package]]
name = "httparse"
version = "1.8.0"
version = "1.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
[[package]]
name = "httpdate"
@@ -2935,9 +2935,9 @@ dependencies = [
[[package]]
name = "hyper"
version = "1.4.1"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80"
dependencies = [
"bytes",
"futures-channel",
@@ -2977,7 +2977,7 @@ checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
dependencies = [
"futures-util",
"http 1.1.0",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"rustls 0.22.4",
"rustls-pki-types",
@@ -2992,7 +2992,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
dependencies = [
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"pin-project-lite",
"tokio",
@@ -3001,20 +3001,20 @@ dependencies = [
[[package]]
name = "hyper-util"
version = "0.1.7"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
checksum = "cf9f1e950e0d9d1d3c47184416723cf29c0d1f93bd8cccf37e4beb6b44f31710"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"hyper 1.4.1",
"hyper 1.6.0",
"libc",
"pin-project-lite",
"socket2",
"tokio",
"tower 0.4.13",
"tower-service",
"tracing",
]
@@ -4236,6 +4236,7 @@ name = "pagebench"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"camino",
"clap",
"futures",
@@ -4244,12 +4245,15 @@ dependencies = [
"humantime-serde",
"pageserver_api",
"pageserver_client",
"pageserver_client_grpc",
"pageserver_page_api",
"rand 0.8.5",
"reqwest",
"serde",
"serde_json",
"tokio",
"tokio-util",
"tonic 0.13.1",
"tracing",
"utils",
"workspace_hack",
@@ -4432,6 +4436,21 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_client_grpc"
version = "0.1.0"
dependencies = [
"bytes",
"futures",
"http 1.1.0",
"pageserver_page_api",
"thiserror 1.0.69",
"tokio",
"tonic 0.13.1",
"tracing",
"utils",
]
[[package]]
name = "pageserver_compaction"
version = "0.1.0"
@@ -5208,7 +5227,7 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper 0.14.30",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"indexmap 2.9.0",
"ipnet",
@@ -5604,7 +5623,7 @@ dependencies = [
"http-body-util",
"http-types",
"humantime-serde",
"hyper 1.4.1",
"hyper 1.6.0",
"itertools 0.10.5",
"metrics",
"once_cell",
@@ -5644,7 +5663,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-rustls 0.26.0",
"hyper-util",
"ipnet",
@@ -5701,7 +5720,7 @@ dependencies = [
"futures",
"getrandom 0.2.11",
"http 1.1.0",
"hyper 1.4.1",
"hyper 1.6.0",
"parking_lot 0.11.2",
"reqwest",
"reqwest-middleware",
@@ -6642,12 +6661,12 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.5.5"
version = "0.5.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
dependencies = [
"libc",
"windows-sys 0.48.0",
"windows-sys 0.52.0",
]
[[package]]
@@ -6713,7 +6732,7 @@ dependencies = [
"http-body-util",
"http-utils",
"humantime",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"metrics",
"once_cell",
@@ -7542,7 +7561,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-timeout",
"hyper-util",
"percent-encoding",
@@ -7553,7 +7572,7 @@ dependencies = [
"tokio",
"tokio-rustls 0.26.2",
"tokio-stream",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
"tracing",
@@ -7586,21 +7605,6 @@ dependencies = [
"tonic 0.13.1",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tokio",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower"
version = "0.5.2"
@@ -8591,7 +8595,7 @@ dependencies = [
"hex",
"hmac",
"hyper 0.14.30",
"hyper 1.4.1",
"hyper 1.6.0",
"hyper-util",
"indexmap 2.9.0",
"itertools 0.12.1",
@@ -8645,7 +8649,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"toml_edit",
"tower 0.5.2",
"tower",
"tracing",
"tracing-core",
"tracing-log",

View File

@@ -8,6 +8,7 @@ members = [
"pageserver/compaction",
"pageserver/ctl",
"pageserver/client",
"pageserver/client_grpc",
"pageserver/pagebench",
"pageserver/page_api",
"proxy",
@@ -254,6 +255,7 @@ metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_client_grpc = { path = "./pageserver/client_grpc" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }

View File

@@ -339,8 +339,6 @@ async fn run_dump_restore(
destination_connstring: String,
) -> Result<(), anyhow::Error> {
let dumpdir = workdir.join("dumpdir");
let num_jobs = num_cpus::get().to_string();
info!("using {num_jobs} jobs for dump/restore");
let common_args = [
// schema mapping (prob suffices to specify them on one side)
@@ -356,7 +354,7 @@ async fn run_dump_restore(
"directory".to_string(),
// concurrency
"--jobs".to_string(),
num_jobs,
num_cpus::get().to_string(),
// progress updates
"--verbose".to_string(),
];

View File

@@ -0,0 +1,16 @@
[package]
name = "pageserver_client_grpc"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
bytes.workspace = true
futures.workspace = true
http.workspace = true
thiserror.workspace = true
tonic.workspace = true
tracing.workspace = true
pageserver_page_api.workspace = true
utils.workspace = true
tokio.workspace = true

View File

@@ -0,0 +1,192 @@
//!
//! Pageserver gRPC client library
//!
//! This library provides a gRPC client for the pageserver for the
//! communicator project.
//!
//! This library is a work in progress.
//!
//!
use std::collections::HashMap;
use bytes::Bytes;
use futures::{StreamExt};
use thiserror::Error;
use tonic::metadata::AsciiMetadataValue;
use pageserver_page_api::proto;
use pageserver_page_api::proto::PageServiceClient;
use utils::shard::ShardIndex;
use std::fmt::Debug;
use tracing::error;
use tokio::sync::RwLock;
use tonic::transport::{Channel, Endpoint};
#[derive(Error, Debug)]
pub enum PageserverClientError {
#[error("could not connect to service: {0}")]
ConnectError(#[from] tonic::transport::Error),
#[error("could not perform request: {0}`")]
RequestError(#[from] tonic::Status),
#[error("protocol error: {0}")]
ProtocolError(#[from] pageserver_page_api::ProtocolError),
#[error("could not perform request: {0}`")]
InvalidUri(#[from] http::uri::InvalidUri),
#[error("could not perform request: {0}`")]
Other(String),
}
pub struct PageserverClient {
endpoint_map: HashMap<ShardIndex, Endpoint>,
channels: tokio::sync::RwLock<HashMap<ShardIndex, Channel>>,
auth_interceptor: AuthInterceptor,
}
impl PageserverClient {
/// TODO: this doesn't currently react to changes in the shard map.
pub fn new(
tenant_id: AsciiMetadataValue,
timeline_id: AsciiMetadataValue,
auth_token: Option<String>,
shard_map: HashMap<ShardIndex, String>,
) -> Result<Self, PageserverClientError> {
let endpoint_map: HashMap<ShardIndex, Endpoint> = shard_map
.into_iter()
.map(|(shard, url)| {
let endpoint = Endpoint::from_shared(url)
.map_err(|_e| PageserverClientError::Other("Unable to parse endpoint {url}".to_string()))?;
Ok::<(ShardIndex, Endpoint), PageserverClientError>((shard, endpoint))
})
.collect::<Result<_, _>>()?;
Ok(Self {
endpoint_map,
channels: RwLock::new(HashMap::new()),
auth_interceptor: AuthInterceptor::new(
tenant_id,
timeline_id,
auth_token,
),
})
}
//
// TODO: This opens a new gRPC stream for every request, which is extremely inefficient
pub async fn get_page(
&self,
shard: ShardIndex,
request: pageserver_page_api::GetPageRequest,
) -> Result<Vec<Bytes>, PageserverClientError> {
// FIXME: calculate the shard number correctly
let chan = self.get_client(shard).await?;
let mut client =
PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard));
let request = proto::GetPageRequest::try_from(request)?;
let request_stream = futures::stream::once(std::future::ready(request));
let mut response_stream = client
.get_pages(tonic::Request::new(request_stream))
.await?
.into_inner();
let Some(response) = response_stream.next().await else {
return Err(PageserverClientError::Other(
"no response received for getpage request".to_string(),
));
};
match response {
Err(status) => {
return Err(PageserverClientError::RequestError(status));
}
Ok(resp) => {
let response: pageserver_page_api::GetPageResponse = resp.try_into().unwrap();
return Ok(response.page_images.to_vec());
}
}
}
//
// TODO: this should use a connection pool with concurrency limits,
// not a single connection to the shard.
//
async fn get_client(&self, shard: ShardIndex) -> Result<Channel, PageserverClientError> {
// Get channel from the hashmap
let mut channels = self.channels.write();
if let Some(channel) = channels.await.get(&shard) {
return Ok(channel.clone());
}
// Create a new channel if it doesn't exist
let shard_endpoint = self
.endpoint_map
.get(&shard);
let endpoint = match shard_endpoint{
Some(_endpoint) => _endpoint,
None => {
error!("Shard {shard} not found in shard map");
return Err(PageserverClientError::Other(format!(
"Shard {shard} not found in shard map"
)));
}
};
let channel = endpoint.connect().await?;
channels = self.channels.write();
channels.await.insert(shard, channel.clone());
Ok(channel.clone())
}
}
/// Inject tenant_id, timeline_id and authentication token to all pageserver requests.
#[derive(Clone)]
struct AuthInterceptor {
tenant_id: AsciiMetadataValue,
shard_id: Option<AsciiMetadataValue>,
timeline_id: AsciiMetadataValue,
auth_header: Option<AsciiMetadataValue>, // including "Bearer " prefix
}
impl AuthInterceptor {
fn new(tenant_id: AsciiMetadataValue,
timeline_id: AsciiMetadataValue,
auth_token: Option<String>) -> Self {
Self {
tenant_id: tenant_id,
shard_id: None,
timeline_id: timeline_id,
auth_header: auth_token
.map(|t| format!("Bearer {t}"))
.map(|t| t.parse().expect("could not parse auth token")),
}
}
fn for_shard(&self, shard_id: ShardIndex) -> Self {
let mut with_shard = self.clone();
with_shard.shard_id = Some(
shard_id
.to_string()
.parse()
.expect("could not parse shard id"),
);
with_shard
}
}
impl tonic::service::Interceptor for AuthInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
req.metadata_mut()
.insert("neon-tenant-id", self.tenant_id.clone());
if let Some(shard_id) = &self.shard_id {
req.metadata_mut().insert("neon-shard-id", shard_id.clone());
}
req.metadata_mut()
.insert("neon-timeline-id", self.timeline_id.clone());
if let Some(auth_header) = &self.auth_header {
req.metadata_mut()
.insert("authorization", auth_header.clone());
}
Ok(req)
}
}

View File

@@ -20,9 +20,13 @@ serde.workspace = true
serde_json.workspace = true
tracing.workspace = true
tokio.workspace = true
tonic.workspace = true
tokio-util.workspace = true
async-trait = "0.1"
pageserver_client.workspace = true
pageserver_api.workspace = true
pageserver_client_grpc.workspace = true
pageserver_page_api.workspace = true
utils = { path = "../../libs/utils/" }
workspace_hack = { version = "0.1", path = "../../workspace_hack" }

View File

@@ -6,25 +6,40 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tonic::metadata::AsciiMetadataValue;
use anyhow::Context;
use camino::Utf8PathBuf;
use pageserver_api::key::Key;
use pageserver_api::keyspace::KeySpaceAccum;
use pageserver_api::models::{PagestreamGetPageRequest, PagestreamRequest};
use pageserver_api::shard::TenantShardId;
use pageserver_client::page_service::PagestreamClient;
use rand::prelude::*;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tracing::info;
use utils::id::TenantTimelineId;
use utils::id::TenantId;
use utils::id::TimelineId;
use utils::lsn::Lsn;
use futures::{
future::BoxFuture,
stream::FuturesOrdered,
FutureExt, StreamExt,
};
use crate::util::tokio_thread_local_stats::AllThreadLocalStats;
use crate::util::{request_stats, tokio_thread_local_stats};
use async_trait::async_trait;
use rand::distributions::weighted::WeightedIndex;
use utils::shard::ShardIndex;
/// GetPage@LatestLSN, uniformly distributed across the compute-accessible keyspace.
#[derive(clap::Parser)]
pub(crate) struct Args {
#[clap(long, default_value = "false")]
grpc: bool,
#[clap(long, default_value = "http://localhost:9898")]
mgmt_api_endpoint: String,
#[clap(long, default_value = "postgres://postgres@localhost:64000")]
@@ -303,7 +318,19 @@ async fn main_impl(
.unwrap();
Box::pin(async move {
client_libpq(args, worker_id, ss, cancel, rps_period, ranges, weights).await
if args.grpc {
let grpc = GrpcProtocol::new(
args.page_service_connstring.clone(),
worker_id.timeline.tenant_id,
worker_id.timeline.timeline_id).await;
client_proto(args, grpc, worker_id, ss, cancel, rps_period, ranges, weights).await
} else {
let pg = PgProtocol::new(
args.page_service_connstring.clone(),
worker_id.timeline.tenant_id,
worker_id.timeline.timeline_id).await;
client_proto(args, pg, worker_id, ss, cancel, rps_period, ranges, weights).await
}
})
};
@@ -354,9 +381,208 @@ async fn main_impl(
anyhow::Ok(())
}
/// Common interface for both Pg and Grpc versions.
#[async_trait]
trait Protocol: Send {
/// Constructor/factory.
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self
where
Self: Sized;
async fn client_libpq(
/// Fire off a “get page” request and store the start time.
async fn add_to_inflight(
&mut self,
start: Instant,
args: &Args,
ranges: Vec<KeyRange>,
weights: WeightedIndex<i128>,
);
/// Wait for the next response and return its start time.
async fn get_start_time(&mut self) -> Instant;
/// How many in-flight requests do we have?
fn len(&self) -> usize;
}
///////////////////////////////////////////////////////////////////////////////
// PgProtocol
///////////////////////////////////////////////////////////////////////////////
struct PgProtocol {
libpq_pagestream: PagestreamClient,
libpq_vector: VecDeque<Instant>,
}
#[async_trait]
impl Protocol for PgProtocol {
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self {
let client = pageserver_client::page_service::Client::new(conn_string)
.await
.unwrap()
.pagestream(tenant_id, timeline_id)
.await
.unwrap();
Self {
libpq_pagestream: client,
libpq_vector: VecDeque::new(),
}
}
async fn add_to_inflight(
&mut self,
start: Instant,
args: &Args,
ranges: Vec<KeyRange>,
weights: WeightedIndex<i128>,
) {
// build your PagestreamGetPageRequest exactly as before…
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key.to_rel_block().unwrap();
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
};
let _ = self.libpq_pagestream.getpage_send(req).await;
self.libpq_vector.push_back(start);
}
async fn get_start_time(&mut self) -> Instant {
let start = self.libpq_vector.pop_front().unwrap();
let _ = self.libpq_pagestream.getpage_recv().await;
start
}
fn len(&self) -> usize {
self.libpq_vector.len()
}
}
///////////////////////////////////////////////////////////////////////////////
// GrpcProtocol
///////////////////////////////////////////////////////////////////////////////
type GetPageFut = BoxFuture<'static, (Instant, Option<pageserver_client_grpc::PageserverClientError>)>;
struct GrpcProtocol {
grpc_page_client: Arc<pageserver_client_grpc::PageserverClient>,
grpc_vector: FuturesOrdered<GetPageFut>,
}
#[async_trait]
impl Protocol for GrpcProtocol {
async fn new(
conn_string: String,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Self {
let shard_map = std::collections::HashMap::from([(
ShardIndex::unsharded(),
conn_string.clone(),
)]);
let tenant_ascii : AsciiMetadataValue = tenant_id.to_string().parse().unwrap();
let timeline_ascii : AsciiMetadataValue = timeline_id.to_string().parse().unwrap();
let client = pageserver_client_grpc::PageserverClient::new(
tenant_ascii,
timeline_ascii,
None,
shard_map,
).unwrap();
Self {
grpc_page_client: Arc::new(client),
grpc_vector: FuturesOrdered::new(),
}
}
async fn add_to_inflight(
&mut self,
start: Instant,
args: &Args,
ranges: Vec<KeyRange>,
weights: WeightedIndex<i128>,
) {
// build your GetPageRequest exactly as before…
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key.to_rel_block().unwrap();
pageserver_page_api::GetPageRequest {
request_id: 0,
request_class: pageserver_page_api::GetPageClass::Normal,
read_lsn: pageserver_page_api::ReadLsn {
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since_lsn: Some(r.timeline_lsn),
},
rel: pageserver_page_api::RelTag {
spcnode: rel_tag.spcnode,
dbnode: rel_tag.dbnode,
relnode: rel_tag.relnode,
forknum: rel_tag.forknum,
},
block_numbers: vec![block_no].into(),
}
};
let client_clone = self.grpc_page_client.clone();
let getpage_fut : GetPageFut = async move {
let result = client_clone.get_page(ShardIndex::unsharded(), req).await;
match result {
Ok(_) => {
(start, None)
}
Err(e) => {
(start, Some(e))
}
}
}.boxed();
self.grpc_vector.push_back(getpage_fut);
}
async fn get_start_time(&mut self) -> Instant {
let (start, err) = self.grpc_vector.next().await.unwrap();
if let Some(e) = err {
tracing::error!("getpage request failed: {e}");
}
start
}
fn len(&self) -> usize {
self.grpc_vector.len()
}
}
async fn client_proto(
args: &Args,
mut protocol: impl Protocol,
worker_id: WorkerId,
shared_state: Arc<SharedState>,
cancel: CancellationToken,
@@ -364,18 +590,11 @@ async fn client_libpq(
ranges: Vec<KeyRange>,
weights: rand::distributions::weighted::WeightedIndex<i128>,
) {
let client = pageserver_client::page_service::Client::new(args.page_service_connstring.clone())
.await
.unwrap();
let mut client = client
.pagestream(worker_id.timeline.tenant_id, worker_id.timeline.timeline_id)
.await
.unwrap();
shared_state.start_work_barrier.wait().await;
let client_start = Instant::now();
let mut ticks_processed = 0;
let mut inflight = VecDeque::new();
while !cancel.is_cancelled() {
// Detect if a request took longer than the RPS rate
if let Some(period) = &rps_period {
@@ -390,37 +609,12 @@ async fn client_libpq(
ticks_processed = periods_passed_until_now;
}
while inflight.len() < args.queue_depth.get() {
while protocol.len() < args.queue_depth.get() {
let start = Instant::now();
let req = {
let mut rng = rand::thread_rng();
let r = &ranges[weights.sample(&mut rng)];
let key: i128 = rng.gen_range(r.start..r.end);
let key = Key::from_i128(key);
assert!(key.is_rel_block_key());
let (rel_tag, block_no) = key
.to_rel_block()
.expect("we filter non-rel-block keys out above");
PagestreamGetPageRequest {
hdr: PagestreamRequest {
reqid: 0,
request_lsn: if rng.gen_bool(args.req_latest_probability) {
Lsn::MAX
} else {
r.timeline_lsn
},
not_modified_since: r.timeline_lsn,
},
rel: rel_tag,
blkno: block_no,
}
};
client.getpage_send(req).await.unwrap();
inflight.push_back(start);
protocol.add_to_inflight(start, args, ranges.clone(), weights.clone()).await;
}
let start = inflight.pop_front().unwrap();
client.getpage_recv().await.unwrap();
let start = protocol.get_start_time().await;
let end = Instant::now();
shared_state.live_stats.request_done();
ticks_processed += 1;
@@ -436,9 +630,11 @@ async fn client_libpq(
if let Some(period) = &rps_period {
let next_at = client_start
+ Duration::from_micros(
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
);
(ticks_processed) as u64 * u64::try_from(period.as_micros()).unwrap(),
);
tokio::time::sleep_until(next_at.into()).await;
}
}
}

View File

@@ -837,30 +837,7 @@ async fn collect_eviction_candidates(
continue;
}
let info = tl.get_local_layers_for_disk_usage_eviction().await;
debug!(
tenant_id=%tl.tenant_shard_id.tenant_id,
shard_id=%tl.tenant_shard_id.shard_slug(),
timeline_id=%tl.timeline_id,
"timeline resident layers count: {}", info.resident_layers.len()
);
tenant_candidates.extend(info.resident_layers.into_iter());
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));
if cancel.is_cancelled() {
return Ok(EvictionCandidates::Cancelled);
}
}
// Also consider layers of timelines being imported for eviction
for tl in tenant.list_importing_timelines() {
let info = tl.timeline.get_local_layers_for_disk_usage_eviction().await;
debug!(
tenant_id=%tl.timeline.tenant_shard_id.tenant_id,
shard_id=%tl.timeline.tenant_shard_id.shard_slug(),
timeline_id=%tl.timeline.timeline_id,
"timeline resident layers count: {}", info.resident_layers.len()
);
debug!(tenant_id=%tl.tenant_shard_id.tenant_id, shard_id=%tl.tenant_shard_id.shard_slug(), timeline_id=%tl.timeline_id, "timeline resident layers count: {}", info.resident_layers.len());
tenant_candidates.extend(info.resident_layers.into_iter());
max_layer_size = max_layer_size.max(info.max_layer_size.unwrap_or(0));

View File

@@ -300,7 +300,7 @@ pub struct TenantShard {
/// as in progress.
/// * Imported timelines are removed when the storage controller calls the post timeline
/// import activation endpoint.
timelines_importing: std::sync::Mutex<HashMap<TimelineId, Arc<ImportingTimeline>>>,
timelines_importing: std::sync::Mutex<HashMap<TimelineId, ImportingTimeline>>,
/// The last tenant manifest known to be in remote storage. None if the manifest has not yet
/// been either downloaded or uploaded. Always Some after tenant attach.
@@ -672,7 +672,6 @@ pub enum MaybeOffloaded {
pub enum TimelineOrOffloaded {
Timeline(Arc<Timeline>),
Offloaded(Arc<OffloadedTimeline>),
Importing(Arc<ImportingTimeline>),
}
impl TimelineOrOffloaded {
@@ -684,9 +683,6 @@ impl TimelineOrOffloaded {
TimelineOrOffloaded::Offloaded(offloaded) => {
TimelineOrOffloadedArcRef::Offloaded(offloaded)
}
TimelineOrOffloaded::Importing(importing) => {
TimelineOrOffloadedArcRef::Importing(importing)
}
}
}
pub fn tenant_shard_id(&self) -> TenantShardId {
@@ -699,16 +695,12 @@ impl TimelineOrOffloaded {
match self {
TimelineOrOffloaded::Timeline(timeline) => &timeline.delete_progress,
TimelineOrOffloaded::Offloaded(offloaded) => &offloaded.delete_progress,
TimelineOrOffloaded::Importing(importing) => &importing.delete_progress,
}
}
fn maybe_remote_client(&self) -> Option<Arc<RemoteTimelineClient>> {
match self {
TimelineOrOffloaded::Timeline(timeline) => Some(timeline.remote_client.clone()),
TimelineOrOffloaded::Offloaded(_offloaded) => None,
TimelineOrOffloaded::Importing(importing) => {
Some(importing.timeline.remote_client.clone())
}
}
}
}
@@ -716,7 +708,6 @@ impl TimelineOrOffloaded {
pub enum TimelineOrOffloadedArcRef<'a> {
Timeline(&'a Arc<Timeline>),
Offloaded(&'a Arc<OffloadedTimeline>),
Importing(&'a Arc<ImportingTimeline>),
}
impl TimelineOrOffloadedArcRef<'_> {
@@ -724,14 +715,12 @@ impl TimelineOrOffloadedArcRef<'_> {
match self {
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.tenant_shard_id,
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.tenant_shard_id,
TimelineOrOffloadedArcRef::Importing(importing) => importing.timeline.tenant_shard_id,
}
}
pub fn timeline_id(&self) -> TimelineId {
match self {
TimelineOrOffloadedArcRef::Timeline(timeline) => timeline.timeline_id,
TimelineOrOffloadedArcRef::Offloaded(offloaded) => offloaded.timeline_id,
TimelineOrOffloadedArcRef::Importing(importing) => importing.timeline.timeline_id,
}
}
}
@@ -748,12 +737,6 @@ impl<'a> From<&'a Arc<OffloadedTimeline>> for TimelineOrOffloadedArcRef<'a> {
}
}
impl<'a> From<&'a Arc<ImportingTimeline>> for TimelineOrOffloadedArcRef<'a> {
fn from(timeline: &'a Arc<ImportingTimeline>) -> Self {
Self::Importing(timeline)
}
}
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum GetTimelineError {
#[error("Timeline is shutting down")]
@@ -1806,25 +1789,20 @@ impl TenantShard {
},
) => {
let timeline_id = timeline.timeline_id;
let import_task_gate = Gate::default();
let import_task_guard = import_task_gate.enter().unwrap();
let import_task_handle =
tokio::task::spawn(self.clone().create_timeline_import_pgdata_task(
timeline.clone(),
import_pgdata,
guard,
import_task_guard,
ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn),
));
let prev = self.timelines_importing.lock().unwrap().insert(
timeline_id,
Arc::new(ImportingTimeline {
ImportingTimeline {
timeline: timeline.clone(),
import_task_handle,
import_task_gate,
delete_progress: TimelineDeleteProgress::default(),
}),
},
);
assert!(prev.is_none());
@@ -2442,17 +2420,6 @@ impl TenantShard {
.collect()
}
/// Lists timelines the tenant contains.
/// It's up to callers to omit certain timelines that are not considered ready for use.
pub fn list_importing_timelines(&self) -> Vec<Arc<ImportingTimeline>> {
self.timelines_importing
.lock()
.unwrap()
.values()
.map(Arc::clone)
.collect()
}
/// Lists timelines the tenant manages, including offloaded ones.
///
/// It's up to callers to omit certain timelines that are not considered ready for use.
@@ -2886,25 +2853,19 @@ impl TenantShard {
let (timeline, timeline_create_guard) = uninit_timeline.finish_creation_myself();
let import_task_gate = Gate::default();
let import_task_guard = import_task_gate.enter().unwrap();
let import_task_handle = tokio::spawn(self.clone().create_timeline_import_pgdata_task(
timeline.clone(),
index_part,
timeline_create_guard,
import_task_guard,
timeline_ctx.detached_child(TaskKind::ImportPgdata, DownloadBehavior::Warn),
));
let prev = self.timelines_importing.lock().unwrap().insert(
timeline.timeline_id,
Arc::new(ImportingTimeline {
ImportingTimeline {
timeline: timeline.clone(),
import_task_handle,
import_task_gate,
delete_progress: TimelineDeleteProgress::default(),
}),
},
);
// Idempotency is enforced higher up the stack
@@ -2963,7 +2924,6 @@ impl TenantShard {
timeline: Arc<Timeline>,
index_part: import_pgdata::index_part_format::Root,
timeline_create_guard: TimelineCreateGuard,
_import_task_guard: GateGuard,
ctx: RequestContext,
) {
debug_assert_current_span_has_tenant_and_timeline_id();
@@ -3875,9 +3835,6 @@ impl TenantShard {
.build_timeline_client(offloaded.timeline_id, self.remote_storage.clone());
Arc::new(remote_client)
}
TimelineOrOffloadedArcRef::Importing(_) => {
unreachable!("Importing timelines are not included in the iterator")
}
};
// Shut down the timeline's remote client: this means that the indices we write
@@ -5087,14 +5044,6 @@ impl TenantShard {
info!("timeline already exists but is offloaded");
Err(CreateTimelineError::Conflict)
}
Err(TimelineExclusionError::AlreadyExists {
existing: TimelineOrOffloaded::Importing(_existing),
..
}) => {
// If there's a timeline already importing, then we would hit
// the [`TimelineExclusionError::AlreadyCreating`] branch above.
unreachable!("Importing timelines hold the creation guard")
}
Err(TimelineExclusionError::AlreadyExists {
existing: TimelineOrOffloaded::Timeline(existing),
arg,

View File

@@ -1348,21 +1348,6 @@ impl RemoteTimelineClient {
Ok(())
}
pub(crate) fn schedule_unlinking_of_layers_from_index_part<I>(
self: &Arc<Self>,
names: I,
) -> Result<(), NotInitialized>
where
I: IntoIterator<Item = LayerName>,
{
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
self.schedule_unlinking_of_layers_from_index_part0(upload_queue, names);
Ok(())
}
/// Update the remote index file, removing the to-be-deleted files from the index,
/// allowing scheduling of actual deletions later.
fn schedule_unlinking_of_layers_from_index_part0<I>(

View File

@@ -206,8 +206,8 @@ pub struct GcCompactionQueue {
}
static CONCURRENT_GC_COMPACTION_TASKS: Lazy<Arc<Semaphore>> = Lazy::new(|| {
// Only allow one timeline on one pageserver to run gc compaction at a time.
Arc::new(Semaphore::new(1))
// Only allow two timelines on one pageserver to run gc compaction at a time.
Arc::new(Semaphore::new(2))
});
impl GcCompactionQueue {

View File

@@ -121,7 +121,6 @@ async fn remove_maybe_offloaded_timeline_from_tenant(
// This observes the locking order between timelines and timelines_offloaded
let mut timelines = tenant.timelines.lock().unwrap();
let mut timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
let mut timelines_importing = tenant.timelines_importing.lock().unwrap();
let offloaded_children_exist = timelines_offloaded
.iter()
.any(|(_, entry)| entry.ancestor_timeline_id == Some(timeline.timeline_id()));
@@ -151,12 +150,8 @@ async fn remove_maybe_offloaded_timeline_from_tenant(
.expect("timeline that we were deleting was concurrently removed from 'timelines_offloaded' map");
offloaded_timeline.delete_from_ancestor_with_timelines(&timelines);
}
TimelineOrOffloaded::Importing(importing) => {
timelines_importing.remove(&importing.timeline.timeline_id);
}
}
drop(timelines_importing);
drop(timelines_offloaded);
drop(timelines);
@@ -208,17 +203,8 @@ impl DeleteTimelineFlow {
guard.mark_in_progress()?;
// Now that the Timeline is in Stopping state, request all the related tasks to shut down.
// TODO(vlad): shut down imported timeline here
match &timeline {
TimelineOrOffloaded::Timeline(timeline) => {
timeline.shutdown(super::ShutdownMode::Hard).await;
}
TimelineOrOffloaded::Importing(importing) => {
importing.shutdown().await;
}
TimelineOrOffloaded::Offloaded(_offloaded) => {
// Nothing to shut down in this case
}
if let TimelineOrOffloaded::Timeline(timeline) = &timeline {
timeline.shutdown(super::ShutdownMode::Hard).await;
}
tenant.gc_block.before_delete(&timeline.timeline_id());
@@ -403,18 +389,10 @@ impl DeleteTimelineFlow {
Err(anyhow::anyhow!("failpoint: timeline-delete-before-rm"))?
});
match timeline {
TimelineOrOffloaded::Timeline(timeline) => {
delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await;
}
TimelineOrOffloaded::Importing(importing) => {
delete_local_timeline_directory(conf, tenant.tenant_shard_id, &importing.timeline)
.await;
}
TimelineOrOffloaded::Offloaded(_offloaded) => {
// Offloaded timelines have no local state
// TODO: once we persist offloaded information, delete the timeline from there, too
}
// Offloaded timelines have no local state
// TODO: once we persist offloaded information, delete the timeline from there, too
if let TimelineOrOffloaded::Timeline(timeline) = timeline {
delete_local_timeline_directory(conf, tenant.tenant_shard_id, timeline).await;
}
fail::fail_point!("timeline-delete-after-rm", |_| {
@@ -473,16 +451,12 @@ pub(super) fn make_timeline_delete_guard(
// For more context see this discussion: `https://github.com/neondatabase/neon/pull/4552#discussion_r1253437346`
let timelines = tenant.timelines.lock().unwrap();
let timelines_offloaded = tenant.timelines_offloaded.lock().unwrap();
let timelines_importing = tenant.timelines_importing.lock().unwrap();
let timeline = match timelines.get(&timeline_id) {
Some(t) => TimelineOrOffloaded::Timeline(Arc::clone(t)),
None => match timelines_offloaded.get(&timeline_id) {
Some(t) => TimelineOrOffloaded::Offloaded(Arc::clone(t)),
None => match timelines_importing.get(&timeline_id) {
Some(t) => TimelineOrOffloaded::Importing(Arc::clone(t)),
None => return Err(DeleteTimelineError::NotFound),
},
None => return Err(DeleteTimelineError::NotFound),
},
};

View File

@@ -8,10 +8,8 @@ use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::info;
use utils::lsn::Lsn;
use utils::pausable_failpoint;
use utils::sync::gate::Gate;
use super::{Timeline, TimelineDeleteProgress};
use super::Timeline;
use crate::context::RequestContext;
use crate::controller_upcall_client::{StorageControllerUpcallApi, StorageControllerUpcallClient};
use crate::tenant::metadata::TimelineMetadata;
@@ -21,23 +19,15 @@ mod importbucket_client;
mod importbucket_format;
pub(crate) mod index_part_format;
pub struct ImportingTimeline {
pub(crate) struct ImportingTimeline {
pub import_task_handle: JoinHandle<()>,
pub import_task_gate: Gate,
pub timeline: Arc<Timeline>,
pub delete_progress: TimelineDeleteProgress,
}
impl std::fmt::Debug for ImportingTimeline {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ImportingTimeline<{}>", self.timeline.timeline_id)
}
}
impl ImportingTimeline {
pub async fn shutdown(&self) {
pub(crate) async fn shutdown(self) {
self.import_task_handle.abort();
self.import_task_gate.close().await;
let _ = self.import_task_handle.await;
self.timeline.remote_client.shutdown().await;
}
@@ -111,8 +101,6 @@ pub async fn doit(
.schedule_index_upload_for_file_changes()?;
timeline.remote_client.wait_completion().await?;
pausable_failpoint!("import-timeline-pre-success-notify-pausable");
// Communicate that shard is done.
// Ensure at-least-once delivery of the upcall to storage controller
// before we mark the task as done and never come here again.

View File

@@ -982,15 +982,6 @@ impl ChunkProcessingJob {
.cloned();
match existing_layer {
Some(existing) => {
// Unlink the remote layer from the index without scheduling its deletion.
// When `existing_layer` drops [`LayerInner::drop`] will schedule its deletion from
// remote storage, but that assumes that the layer was unlinked from the index first.
timeline
.remote_client
.schedule_unlinking_of_layers_from_index_part(std::iter::once(
existing.layer_desc().layer_name(),
))?;
guard.open_mut()?.rewrite_layers(
&[(existing.clone(), resident_layer.clone())],
&[],

View File

@@ -44,7 +44,6 @@ struct GlobalTimelinesState {
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
// this map is dropped on restart.
tombstones: HashMap<TenantTimelineId, Instant>,
tenant_tombstones: HashMap<TenantId, Instant>,
conf: Arc<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
@@ -82,25 +81,10 @@ impl GlobalTimelinesState {
}
}
fn has_tombstone(&self, ttid: &TenantTimelineId) -> bool {
self.tombstones.contains_key(ttid) || self.tenant_tombstones.contains_key(&ttid.tenant_id)
}
/// Removes all blocking tombstones for the given timeline ID.
/// Returns `true` if there have been actual changes.
fn remove_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
self.tombstones.remove(ttid).is_some()
|| self.tenant_tombstones.remove(&ttid.tenant_id).is_some()
}
fn delete(&mut self, ttid: TenantTimelineId) {
self.timelines.remove(&ttid);
self.tombstones.insert(ttid, Instant::now());
}
fn add_tenant_tombstone(&mut self, tenant_id: TenantId) {
self.tenant_tombstones.insert(tenant_id, Instant::now());
}
}
/// A struct used to manage access to the global timelines map.
@@ -115,7 +99,6 @@ impl GlobalTimelines {
state: Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
tombstones: HashMap::new(),
tenant_tombstones: HashMap::new(),
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
global_rate_limiter: RateLimiter::new(1, 1),
@@ -262,7 +245,7 @@ impl GlobalTimelines {
return Ok(timeline);
}
if state.has_tombstone(&ttid) {
if state.tombstones.contains_key(&ttid) {
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
}
@@ -312,14 +295,13 @@ impl GlobalTimelines {
_ => {}
}
if check_tombstone {
if state.has_tombstone(&ttid) {
if state.tombstones.contains_key(&ttid) {
anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
}
} else {
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
// It's also possible that we enter this when the tenant has been deleted, even if the timeline itself has never existed.
if state.remove_tombstone(&ttid) {
if state.tombstones.remove(&ttid).is_some() {
warn!("un-deleted timeline {ttid}");
}
}
@@ -500,7 +482,6 @@ impl GlobalTimelines {
let tli_res = {
let state = self.state.lock().unwrap();
// Do NOT check tenant tombstones here: those were set earlier
if state.tombstones.contains_key(ttid) {
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
info!("Timeline {ttid} was already deleted");
@@ -576,10 +557,6 @@ impl GlobalTimelines {
action: DeleteOrExclude,
) -> Result<HashMap<TenantTimelineId, TimelineDeleteResult>> {
info!("deleting all timelines for tenant {}", tenant_id);
// Adding a tombstone before getting the timelines to prevent new timeline additions
self.state.lock().unwrap().add_tenant_tombstone(*tenant_id);
let to_delete = self.get_all_for_tenant(*tenant_id);
let mut err = None;
@@ -623,9 +600,6 @@ impl GlobalTimelines {
state
.tombstones
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
state
.tenant_tombstones
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
}
}

View File

@@ -482,10 +482,6 @@ async fn handle_tenant_timeline_delete(
ForwardOutcome::NotForwarded(_req) => {}
};
service
.maybe_delete_timeline_import(tenant_id, timeline_id)
.await?;
// For timeline deletions, which both implement an "initially return 202, then 404 once
// we're done" semantic, we wrap with a retry loop to expose a simpler API upstream.
async fn deletion_wrapper<R, F>(service: Arc<Service>, f: F) -> Result<Response<Body>, ApiError>

View File

@@ -139,14 +139,6 @@ pub(crate) struct StorageControllerMetricGroup {
/// HTTP request status counters for handled requests
pub(crate) storage_controller_reconcile_long_running:
measured::CounterVec<ReconcileLongRunningLabelGroupSet>,
/// Indicator of safekeeper reconciler queue depth, broken down by safekeeper, excluding ongoing reconciles.
pub(crate) storage_controller_safkeeper_reconciles_queued:
measured::GaugeVec<SafekeeperReconcilerLabelGroupSet>,
/// Indicator of completed safekeeper reconciles, broken down by safekeeper.
pub(crate) storage_controller_safkeeper_reconciles_complete:
measured::CounterVec<SafekeeperReconcilerLabelGroupSet>,
}
impl StorageControllerMetrics {
@@ -265,17 +257,6 @@ pub(crate) enum Method {
Other,
}
#[derive(measured::LabelGroup, Clone)]
#[label(set = SafekeeperReconcilerLabelGroupSet)]
pub(crate) struct SafekeeperReconcilerLabelGroup<'a> {
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) sk_az: &'a str,
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) sk_node_id: &'a str,
#[label(dynamic_with = lasso::ThreadedRodeo, default)]
pub(crate) sk_hostname: &'a str,
}
impl From<hyper::Method> for Method {
fn from(value: hyper::Method) -> Self {
if value == hyper::Method::GET {

View File

@@ -99,8 +99,8 @@ use crate::tenant_shard::{
ScheduleOptimization, ScheduleOptimizationAction, TenantShard,
};
use crate::timeline_import::{
FinalizingImport, ImportResult, ShardImportStatuses, TimelineImport,
TimelineImportFinalizeError, TimelineImportState, UpcallClient,
ImportResult, ShardImportStatuses, TimelineImport, TimelineImportFinalizeError,
TimelineImportState, UpcallClient,
};
const WAITER_FILL_DRAIN_POLL_TIMEOUT: Duration = Duration::from_millis(500);
@@ -232,9 +232,6 @@ struct ServiceState {
/// Queue of tenants who are waiting for concurrency limits to permit them to reconcile
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
/// Tracks ongoing timeline import finalization tasks
imports_finalizing: BTreeMap<(TenantId, TimelineId), FinalizingImport>,
}
/// Transform an error from a pageserver into an error to return to callers of a storage
@@ -311,7 +308,6 @@ impl ServiceState {
scheduler,
ongoing_operation: None,
delayed_reconcile_rx,
imports_finalizing: Default::default(),
}
}
@@ -4101,58 +4097,13 @@ impl Service {
///
/// If this method gets pre-empted by shut down, it will be called again at start-up (on-going
/// imports are stored in the database).
///
/// # Cancel-Safety
/// Not cancel safe.
/// If the caller stops polling, the import will not be removed from
/// [`ServiceState::imports_finalizing`].
#[instrument(skip_all, fields(
tenant_id=%import.tenant_id,
timeline_id=%import.timeline_id,
))]
async fn finalize_timeline_import(
self: &Arc<Self>,
import: TimelineImport,
) -> Result<(), TimelineImportFinalizeError> {
let tenant_timeline = (import.tenant_id, import.timeline_id);
let (_finalize_import_guard, cancel) = {
let mut locked = self.inner.write().unwrap();
let gate = Gate::default();
let cancel = CancellationToken::default();
let guard = gate.enter().unwrap();
locked.imports_finalizing.insert(
tenant_timeline,
FinalizingImport {
gate,
cancel: cancel.clone(),
},
);
(guard, cancel)
};
let res = tokio::select! {
res = self.finalize_timeline_import_impl(import) => {
res
},
_ = cancel.cancelled() => {
Err(TimelineImportFinalizeError::Cancelled)
}
};
let mut locked = self.inner.write().unwrap();
locked.imports_finalizing.remove(&tenant_timeline);
res
}
async fn finalize_timeline_import_impl(
self: &Arc<Self>,
import: TimelineImport,
) -> Result<(), TimelineImportFinalizeError> {
tracing::info!("Finalizing timeline import");
@@ -4352,46 +4303,6 @@ impl Service {
.await;
}
/// Delete a timeline import if it exists
///
/// Firstly, delete the entry from the database. Any updates
/// from pageservers after the update will fail with a 404, so the
/// import cannot progress into finalizing state if it's not there already.
/// Secondly, cancel the finalization if one is in progress.
pub(crate) async fn maybe_delete_timeline_import(
self: &Arc<Self>,
tenant_id: TenantId,
timeline_id: TimelineId,
) -> Result<(), DatabaseError> {
let tenant_has_ongoing_import = {
let locked = self.inner.read().unwrap();
locked
.tenants
.range(TenantShardId::tenant_range(tenant_id))
.any(|(_tid, shard)| shard.importing == TimelineImportState::Importing)
};
if !tenant_has_ongoing_import {
return Ok(());
}
self.persistence
.delete_timeline_import(tenant_id, timeline_id)
.await?;
let maybe_finalizing = {
let mut locked = self.inner.write().unwrap();
locked.imports_finalizing.remove(&(tenant_id, timeline_id))
};
if let Some(finalizing) = maybe_finalizing {
finalizing.cancel.cancel();
finalizing.gate.close().await;
}
Ok(())
}
pub(crate) async fn tenant_timeline_archival_config(
&self,
tenant_id: TenantId,
@@ -8627,9 +8538,8 @@ impl Service {
Some(ShardCount(new_shard_count))
}
/// Fetches the top tenant shards from every available node, in descending order of
/// max logical size. Offline nodes are skipped, and any errors from available nodes
/// will be logged and ignored.
/// Fetches the top tenant shards from every node, in descending order of
/// max logical size. Any node errors will be logged and ignored.
async fn get_top_tenant_shards(
&self,
request: &TopTenantShardsRequest,
@@ -8640,7 +8550,6 @@ impl Service {
.unwrap()
.nodes
.values()
.filter(|node| node.is_available())
.cloned()
.collect_vec();

View File

@@ -20,9 +20,7 @@ use utils::{
};
use crate::{
metrics::{METRICS_REGISTRY, SafekeeperReconcilerLabelGroup},
persistence::SafekeeperTimelineOpKind,
safekeeper::Safekeeper,
persistence::SafekeeperTimelineOpKind, safekeeper::Safekeeper,
safekeeper_client::SafekeeperClient,
};
@@ -220,26 +218,7 @@ impl ReconcilerHandle {
fn schedule_reconcile(&self, req: ScheduleRequest) {
let (cancel, token_id) = self.new_token_slot(req.tenant_id, req.timeline_id);
let hostname = req.safekeeper.skp.host.clone();
let sk_az = req.safekeeper.skp.availability_zone_id.clone();
let sk_node_id = req.safekeeper.get_id().to_string();
// We don't have direct access to the queue depth here, so increase it blindly by 1.
// We know that putting into the queue increases the queue depth. The receiver will
// update with the correct value once it processes the next item. To avoid races where we
// reduce before we increase, leaving the gauge with a 1 value for a long time, we
// increase it before putting into the queue.
let queued_gauge = &METRICS_REGISTRY
.metrics_group
.storage_controller_safkeeper_reconciles_queued;
let label_group = SafekeeperReconcilerLabelGroup {
sk_az: &sk_az,
sk_node_id: &sk_node_id,
sk_hostname: &hostname,
};
queued_gauge.inc(label_group.clone());
if let Err(err) = self.tx.send((req, cancel, token_id)) {
queued_gauge.set(label_group, 0);
tracing::info!("scheduling request onto {hostname} returned error: {err}");
}
}
@@ -304,18 +283,6 @@ impl SafekeeperReconciler {
continue;
}
let queued_gauge = &METRICS_REGISTRY
.metrics_group
.storage_controller_safkeeper_reconciles_queued;
queued_gauge.set(
SafekeeperReconcilerLabelGroup {
sk_az: &req.safekeeper.skp.availability_zone_id,
sk_node_id: &req.safekeeper.get_id().to_string(),
sk_hostname: &req.safekeeper.skp.host,
},
self.rx.len() as i64,
);
tokio::task::spawn(async move {
let kind = req.kind;
let tenant_id = req.tenant_id;
@@ -544,16 +511,6 @@ impl SafekeeperReconcilerInner {
req.generation,
)
.await;
let complete_counter = &METRICS_REGISTRY
.metrics_group
.storage_controller_safkeeper_reconciles_complete;
complete_counter.inc(SafekeeperReconcilerLabelGroup {
sk_az: &req.safekeeper.skp.availability_zone_id,
sk_node_id: &req.safekeeper.get_id().to_string(),
sk_hostname: &req.safekeeper.skp.host,
});
if let Err(err) = res {
tracing::info!(
"couldn't remove reconciliation request onto {} from persistence: {err:?}",

View File

@@ -7,7 +7,6 @@ use serde::{Deserialize, Serialize};
use pageserver_api::models::{ShardImportProgress, ShardImportStatus};
use tokio_util::sync::CancellationToken;
use utils::sync::gate::Gate;
use utils::{
id::{TenantId, TimelineId},
shard::ShardIndex,
@@ -56,8 +55,6 @@ pub(crate) enum TimelineImportUpdateFollowUp {
pub(crate) enum TimelineImportFinalizeError {
#[error("Shut down interrupted import finalize")]
ShuttingDown,
#[error("Import finalization was cancelled")]
Cancelled,
#[error("Mismatched shard detected during import finalize: {0}")]
MismatchedShards(ShardIndex),
}
@@ -167,11 +164,6 @@ impl TimelineImport {
}
}
pub(crate) struct FinalizingImport {
pub(crate) gate: Gate,
pub(crate) cancel: CancellationToken,
}
pub(crate) type ImportResult = Result<(), String>;
pub(crate) struct UpcallClient {

View File

@@ -1,4 +1,3 @@
import json
import os
import shutil
import subprocess
@@ -12,7 +11,6 @@ from _pytest.config import Config
from fixtures.log_helper import log
from fixtures.neon_cli import AbstractNeonCli
from fixtures.neon_fixtures import Endpoint, VanillaPostgres
from fixtures.pg_version import PgVersion
from fixtures.remote_storage import MockS3Server
@@ -163,57 +161,3 @@ def fast_import(
f.write(fi.cmd.stderr)
log.info("Written logs to %s", test_output_dir)
def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path):
"""
Mock the import S3 bucket into a local directory for a provided vanilla PG instance.
"""
assert not vanilla_pg.is_running()
path.mkdir()
# what cplane writes before scheduling fast_import
specpath = path / "spec.json"
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
# what fast_import writes
vanilla_pg.pgdatadir.rename(path / "pgdata")
statusdir = path / "status"
statusdir.mkdir()
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
def populate_vanilla_pg(vanilla_pg: VanillaPostgres, target_relblock_size: int) -> int:
assert vanilla_pg.is_running()
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
# fillfactor so we don't need to produce that much data
# 900 byte per row is > 10% => 1 row per page
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
nrows = 0
while True:
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
log.info(
f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages"
)
if relblock_size >= target_relblock_size:
break
addrows = int((target_relblock_size - relblock_size) // 8192)
assert addrows >= 1, "forward progress"
vanilla_pg.safe_psql(
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
)
nrows += addrows
return nrows
def validate_import_from_vanilla_pg(endpoint: Endpoint, nrows: int):
assert endpoint.safe_psql_many(
[
"set effective_io_concurrency=32;",
"SET statement_timeout='300s';",
"select count(*), sum(data::bigint)::bigint from t",
]
) == [[], [], [(nrows, nrows * (nrows + 1) // 2)]]

View File

@@ -2337,22 +2337,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
headers=self.headers(TokenScope.ADMIN),
)
def import_status(
self, tenant_shard_id: TenantShardId, timeline_id: TimelineId, generation: int
):
payload = {
"tenant_shard_id": str(tenant_shard_id),
"timeline_id": str(timeline_id),
"generation": generation,
}
self.request(
"GET",
f"{self.api}/upcall/v1/timeline_import_status",
headers=self.headers(TokenScope.GENERATIONS_API),
json=payload,
)
def reconcile_all(self):
r = self.request(
"POST",
@@ -2829,11 +2813,6 @@ class NeonPageserver(PgProtocol, LogUtils):
if self.running:
self.http_client().configure_failpoints([(name, action)])
def clear_persistent_failpoint(self, name: str):
del self._persistent_failpoints[name]
if self.running:
self.http_client().configure_failpoints([(name, "off")])
def timeline_dir(
self,
tenant_shard_id: TenantId | TenantShardId,

View File

@@ -675,7 +675,7 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
def timeline_delete(
self, tenant_id: TenantId | TenantShardId, timeline_id: TimelineId, **kwargs
) -> int:
):
"""
Note that deletion is not instant, it is scheduled and performed mostly in the background.
So if you need to wait for it to complete use `timeline_delete_wait_completed`.
@@ -688,8 +688,6 @@ class PageserverHttpClient(requests.Session, MetricsGetter):
res_json = res.json()
assert res_json is None
return res.status_code
def timeline_gc(
self,
tenant_id: TenantId | TenantShardId,

View File

@@ -1,41 +1,31 @@
from __future__ import annotations
import enum
import json
import time
from collections import Counter
from dataclasses import dataclass
from enum import StrEnum
from threading import Event
from typing import TYPE_CHECKING
import pytest
from fixtures.common_types import Lsn, TenantId, TimelineId
from fixtures.fast_import import mock_import_bucket, populate_vanilla_pg
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
PgBin,
VanillaPostgres,
wait_for_last_flush_lsn,
)
from fixtures.pageserver.http import (
ImportPgdataIdemptencyKey,
)
from fixtures.pageserver.utils import wait_for_upload_queue_empty
from fixtures.remote_storage import RemoteStorageKind
from fixtures.utils import human_bytes, run_only_on_default_postgres, wait_until
from werkzeug.wrappers.response import Response
from fixtures.utils import human_bytes, wait_until
if TYPE_CHECKING:
from collections.abc import Iterable
from typing import Any
from fixtures.pageserver.http import PageserverHttpClient
from pytest_httpserver import HTTPServer
from werkzeug.wrappers.request import Request
GLOBAL_LRU_LOG_LINE = "tenant_min_resident_size-respecting LRU would not relieve pressure, evicting more following global LRU policy"
@@ -174,7 +164,6 @@ class EvictionEnv:
min_avail_bytes,
mock_behavior,
eviction_order: EvictionOrder,
wait_logical_size: bool = True,
):
"""
Starts pageserver up with mocked statvfs setup. The startup is
@@ -212,12 +201,11 @@ class EvictionEnv:
pageserver.start()
# we now do initial logical size calculation on startup, which on debug builds can fight with disk usage based eviction
if wait_logical_size:
for tenant_id, timeline_id in self.timelines:
tenant_ps = self.neon_env.get_tenant_pageserver(tenant_id)
# Pageserver may be none if we are currently not attached anywhere, e.g. during secondary eviction test
if tenant_ps is not None:
tenant_ps.http_client().timeline_wait_logical_size(tenant_id, timeline_id)
for tenant_id, timeline_id in self.timelines:
tenant_ps = self.neon_env.get_tenant_pageserver(tenant_id)
# Pageserver may be none if we are currently not attached anywhere, e.g. during secondary eviction test
if tenant_ps is not None:
tenant_ps.http_client().timeline_wait_logical_size(tenant_id, timeline_id)
def statvfs_called():
pageserver.assert_log_contains(".*running mocked statvfs.*")
@@ -894,121 +882,3 @@ def test_secondary_mode_eviction(eviction_env_ha: EvictionEnv):
assert total_size - post_eviction_total_size >= evict_bytes, (
"we requested at least evict_bytes worth of free space"
)
@run_only_on_default_postgres(reason="PG version is irrelevant here")
def test_import_timeline_disk_pressure_eviction(
neon_env_builder: NeonEnvBuilder,
vanilla_pg: VanillaPostgres,
make_httpserver: HTTPServer,
pg_bin: PgBin,
):
"""
TODO
"""
# Set up mock control plane HTTP server to listen for import completions
import_completion_signaled = Event()
def handler(request: Request) -> Response:
log.info(f"control plane /import_complete request: {request.json}")
import_completion_signaled.set()
return Response(json.dumps({}), status=200)
cplane_mgmt_api_server = make_httpserver
cplane_mgmt_api_server.expect_request(
"/storage/api/v1/import_complete", method="PUT"
).respond_with_handler(handler)
# Plug the cplane mock in
neon_env_builder.control_plane_hooks_api = (
f"http://{cplane_mgmt_api_server.host}:{cplane_mgmt_api_server.port}/storage/api/v1/"
)
# The import will specifiy a local filesystem path mocking remote storage
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
vanilla_pg.start()
target_relblock_size = 1024 * 1024 * 128
populate_vanilla_pg(vanilla_pg, target_relblock_size)
vanilla_pg.stop()
env = neon_env_builder.init_configs()
env.start()
importbucket_path = neon_env_builder.repo_dir / "test_import_completion_bucket"
mock_import_bucket(vanilla_pg, importbucket_path)
tenant_id = TenantId.generate()
timeline_id = TimelineId.generate()
idempotency = ImportPgdataIdemptencyKey.random()
eviction_env = EvictionEnv(
timelines=[(tenant_id, timeline_id)],
neon_env=env,
pageserver_http=env.pageserver.http_client(),
layer_size=5 * 1024 * 1024, # Doesn't apply here
pg_bin=pg_bin, # Not used here
pgbench_init_lsns={}, # Not used here
)
# Pause before delivering the final notification to storcon.
# This keeps the import in progress.
failpoint_name = "import-timeline-pre-success-notify-pausable"
env.pageserver.add_persistent_failpoint(failpoint_name, "pause")
env.storage_controller.tenant_create(tenant_id)
env.storage_controller.timeline_create(
tenant_id,
{
"new_timeline_id": str(timeline_id),
"import_pgdata": {
"idempotency_key": str(idempotency),
"location": {"LocalFs": {"path": str(importbucket_path.absolute())}},
},
},
)
def hit_failpoint():
log.info("Checking log for pattern...")
try:
assert env.pageserver.log_contains(f".*at failpoint {failpoint_name}.*")
except Exception:
log.exception("Failed to find pattern in log")
raise
wait_until(hit_failpoint)
assert not import_completion_signaled.is_set()
env.pageserver.stop()
total_size, _, _ = eviction_env.timelines_du(env.pageserver)
blocksize = 512
total_blocks = (total_size + (blocksize - 1)) // blocksize
eviction_env.pageserver_start_with_disk_usage_eviction(
env.pageserver,
period="1s",
max_usage_pct=33,
min_avail_bytes=0,
mock_behavior={
"type": "Success",
"blocksize": blocksize,
"total_blocks": total_blocks,
# Only count layer files towards used bytes in the mock_statvfs.
# This avoids accounting for metadata files & tenant conf in the tests.
"name_filter": ".*__.*",
},
eviction_order=EvictionOrder.RELATIVE_ORDER_SPARE,
wait_logical_size=False,
)
wait_until(lambda: env.pageserver.assert_log_contains(".*disk usage pressure relieved"))
env.pageserver.clear_persistent_failpoint(failpoint_name)
def cplane_notified():
assert import_completion_signaled.is_set()
wait_until(cplane_notified)
env.pageserver.allowed_errors.append(r".* running disk usage based eviction due to pressure.*")

View File

@@ -12,19 +12,13 @@ import psycopg2
import psycopg2.errors
import pytest
from fixtures.common_types import Lsn, TenantId, TenantShardId, TimelineId
from fixtures.fast_import import (
FastImport,
mock_import_bucket,
populate_vanilla_pg,
validate_import_from_vanilla_pg,
)
from fixtures.fast_import import FastImport
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
NeonEnvBuilder,
PageserverImportConfig,
PgBin,
PgProtocol,
StorageControllerApiException,
StorageControllerMigrationConfig,
VanillaPostgres,
)
@@ -65,6 +59,24 @@ smoke_params = [
]
def mock_import_bucket(vanilla_pg: VanillaPostgres, path: Path):
"""
Mock the import S3 bucket into a local directory for a provided vanilla PG instance.
"""
assert not vanilla_pg.is_running()
path.mkdir()
# what cplane writes before scheduling fast_import
specpath = path / "spec.json"
specpath.write_text(json.dumps({"branch_id": "somebranch", "project_id": "someproject"}))
# what fast_import writes
vanilla_pg.pgdatadir.rename(path / "pgdata")
statusdir = path / "status"
statusdir.mkdir()
(statusdir / "pgdata").write_text(json.dumps({"done": True}))
(statusdir / "fast_import").write_text(json.dumps({"command": "pgdata", "done": True}))
@skip_in_debug_build("MULTIPLE_RELATION_SEGMENTS has non trivial amount of data")
@pytest.mark.parametrize("shard_count,stripe_size,rel_block_size", smoke_params)
def test_pgdata_import_smoke(
@@ -119,6 +131,10 @@ def test_pgdata_import_smoke(
# Put data in vanilla pg
#
vanilla_pg.start()
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
log.info("create relblock data")
if rel_block_size == RelBlockSize.ONE_STRIPE_SIZE:
target_relblock_size = stripe_size * 8192
elif rel_block_size == RelBlockSize.TWO_STRPES_PER_SHARD:
@@ -129,8 +145,45 @@ def test_pgdata_import_smoke(
else:
raise ValueError
vanilla_pg.start()
rows_inserted = populate_vanilla_pg(vanilla_pg, target_relblock_size)
# fillfactor so we don't need to produce that much data
# 900 byte per row is > 10% => 1 row per page
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
nrows = 0
while True:
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
log.info(
f"relblock size: {relblock_size / 8192} pages (target: {target_relblock_size // 8192}) pages"
)
if relblock_size >= target_relblock_size:
break
addrows = int((target_relblock_size - relblock_size) // 8192)
assert addrows >= 1, "forward progress"
vanilla_pg.safe_psql(
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
)
nrows += addrows
expect_nrows = nrows
expect_sum = (
(nrows) * (nrows + 1) // 2
) # https://stackoverflow.com/questions/43901484/sum-of-the-integers-from-1-to-n
def validate_vanilla_equivalence(ep):
# TODO: would be nicer to just compare pgdump
# Enable IO concurrency for batching on large sequential scan, to avoid making
# this test unnecessarily onerous on CPU. Especially on debug mode, it's still
# pretty onerous though, so increase statement_timeout to avoid timeouts.
assert ep.safe_psql_many(
[
"set effective_io_concurrency=32;",
"SET statement_timeout='300s';",
"select count(*), sum(data::bigint)::bigint from t",
]
) == [[], [], [(expect_nrows, expect_sum)]]
validate_vanilla_equivalence(vanilla_pg)
vanilla_pg.stop()
#
@@ -221,14 +274,14 @@ def test_pgdata_import_smoke(
config_lines=ep_config,
)
validate_import_from_vanilla_pg(ro_endpoint, rows_inserted)
validate_vanilla_equivalence(ro_endpoint)
# ensure the import survives restarts
ro_endpoint.stop()
env.pageserver.stop(immediate=True)
env.pageserver.start()
ro_endpoint.start()
validate_import_from_vanilla_pg(ro_endpoint, rows_inserted)
validate_vanilla_equivalence(ro_endpoint)
#
# validate the layer files in each shard only have the shard-specific data
@@ -268,7 +321,7 @@ def test_pgdata_import_smoke(
child_workload = workload.branch(timeline_id=child_timeline_id, branch_name="br-tip")
child_workload.validate()
validate_import_from_vanilla_pg(child_workload.endpoint(), rows_inserted)
validate_vanilla_equivalence(child_workload.endpoint())
# ... at the initdb lsn
_ = env.create_branch(
@@ -283,7 +336,7 @@ def test_pgdata_import_smoke(
tenant_id=tenant_id,
config_lines=ep_config,
)
validate_import_from_vanilla_pg(br_initdb_endpoint, rows_inserted)
validate_vanilla_equivalence(br_initdb_endpoint)
with pytest.raises(psycopg2.errors.UndefinedTable):
br_initdb_endpoint.safe_psql(f"select * from {workload.table}")
@@ -370,12 +423,8 @@ def test_import_completion_on_restart(
@run_only_on_default_postgres(reason="PG version is irrelevant here")
@pytest.mark.parametrize("action", ["restart", "delete"])
def test_import_respects_timeline_lifecycle(
neon_env_builder: NeonEnvBuilder,
vanilla_pg: VanillaPostgres,
make_httpserver: HTTPServer,
action: str,
def test_import_respects_tenant_shutdown(
neon_env_builder: NeonEnvBuilder, vanilla_pg: VanillaPostgres, make_httpserver: HTTPServer
):
"""
Validate that importing timelines respect the usual timeline life cycle:
@@ -443,33 +492,16 @@ def test_import_respects_timeline_lifecycle(
wait_until(hit_failpoint)
assert not import_completion_signaled.is_set()
if action == "restart":
# Restart the pageserver while an import job is in progress.
# This clears the failpoint and we expect that the import starts up afresh
# after the restart and eventually completes.
env.pageserver.stop()
env.pageserver.start()
# Restart the pageserver while an import job is in progress.
# This clears the failpoint and we expect that the import starts up afresh
# after the restart and eventually completes.
env.pageserver.stop()
env.pageserver.start()
def cplane_notified():
assert import_completion_signaled.is_set()
def cplane_notified():
assert import_completion_signaled.is_set()
wait_until(cplane_notified)
elif action == "delete":
status = env.storage_controller.pageserver_api().timeline_delete(tenant_id, timeline_id)
assert status == 200
timeline_path = env.pageserver.timeline_dir(tenant_id, timeline_id)
assert not timeline_path.exists(), "Timeline dir exists after deletion"
shard_zero = TenantShardId(tenant_id, 0, 0)
location = env.storage_controller.inspect(shard_zero)
assert location is not None
generation = location[0]
with pytest.raises(StorageControllerApiException, match="not found"):
env.storage_controller.import_status(shard_zero, timeline_id, generation)
else:
raise RuntimeError(f"{action} param not recognized")
wait_until(cplane_notified)
@skip_in_debug_build("Validation query takes too long in debug builds")
@@ -524,8 +556,23 @@ def test_import_chaos(
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
vanilla_pg.start()
vanilla_pg.safe_psql("create user cloud_admin with password 'postgres' superuser")
vanilla_pg.safe_psql("""create table t (data char(900)) with (fillfactor = 10)""")
inserted_rows = populate_vanilla_pg(vanilla_pg, TARGET_RELBOCK_SIZE)
nrows = 0
while True:
relblock_size = vanilla_pg.safe_psql_scalar("select pg_relation_size('t')")
log.info(
f"relblock size: {relblock_size / 8192} pages (target: {TARGET_RELBOCK_SIZE // 8192}) pages"
)
if relblock_size >= TARGET_RELBOCK_SIZE:
break
addrows = int((TARGET_RELBOCK_SIZE - relblock_size) // 8192)
assert addrows >= 1, "forward progress"
vanilla_pg.safe_psql(
f"insert into t select generate_series({nrows + 1}, {nrows + addrows})"
)
nrows += addrows
vanilla_pg.stop()
@@ -693,7 +740,13 @@ def test_import_chaos(
endpoint = env.endpoints.create_start(branch_name=import_branch_name, tenant_id=tenant_id)
# Validate the imported data is legit
validate_import_from_vanilla_pg(endpoint, inserted_rows)
assert endpoint.safe_psql_many(
[
"set effective_io_concurrency=32;",
"SET statement_timeout='300s';",
"select count(*), sum(data::bigint)::bigint from t",
]
) == [[], [], [(nrows, nrows * (nrows + 1) // 2)]]
endpoint.stop()

View File

@@ -4192,10 +4192,10 @@ def test_storcon_create_delete_sk_down(
# ensure the safekeeper deleted the timeline
def timeline_deleted_on_active_sks():
env.safekeepers[0].assert_log_contains(
f"((deleting timeline|Timeline) {tenant_id}/{child_timeline_id} (from disk|was already deleted)|DELETE.*tenant/{tenant_id} .*status: 200 OK)"
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
)
env.safekeepers[2].assert_log_contains(
f"((deleting timeline|Timeline) {tenant_id}/{child_timeline_id} (from disk|was already deleted)|DELETE.*tenant/{tenant_id} .*status: 200 OK)"
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
)
wait_until(timeline_deleted_on_active_sks)
@@ -4210,7 +4210,7 @@ def test_storcon_create_delete_sk_down(
# ensure that there is log msgs for the third safekeeper too
def timeline_deleted_on_sk():
env.safekeepers[1].assert_log_contains(
f"((deleting timeline|Timeline) {tenant_id}/{child_timeline_id} (from disk|was already deleted)|DELETE.*tenant/{tenant_id} .*status: 200 OK)"
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
)
wait_until(timeline_deleted_on_sk)