Compare commits

..

1 Commits

Author SHA1 Message Date
Arpad Müller
831f2a4ba7 Fix flakiness of test_storcon_create_delete_sk_down (#12040)
The `test_storcon_create_delete_sk_down` test is still flaky. This test
addresses two possible causes for flakiness. both causes are related to
deletion racing with `pull_timeline` which hasn't finished yet.

* the first cause is timeline deletion racing with `pull_timeline`:
* the first deletion attempt doesn't contain the line because the
timeline doesn't exist yet
* the subsequent deletion attempts don't contain it either, only a note
that the timeline is already deleted.
* so this patch adds the note that the timeline is already deleted to
the regex
* the second cause is about tenant deletion racing with `pull_timeline`:
* there were no tenant specific tombstones so if a tenant was deleted,
we only added tombstones for the specific timelines being deleted, not
for the tenant itself.
* This patch changes this, so we now have tenant specific tombstones as
well as timeline specific ones, and creation of a timeline checks both.
* we also don't see any retries of the tenant deletion in the logs. once
it's done it's done. so extend the regex to contain the tenant deletion
message as well.

One could wonder why the regex and why not using the API to check
whether the timeline is just "gone". The issue with the API is that it
doesn't allow one to distinguish between "deleted" and "has never
existed", and latter case might race with `pull_timeline`. I.e. the
second case flakiness helped in the discovery of a real bug (no tenant
tombstones), so the more precise check was helpful.

Before, I could easily reproduce 2-9 occurences of flakiness when
running the test with an additional `range(128)` parameter (i.e. 218
times 4 times). With this patch, I ran it three times, not a single
failure.

Fixes #11838
2025-05-28 18:20:38 +00:00
6 changed files with 75 additions and 259 deletions

86
Cargo.lock generated
View File

@@ -701,7 +701,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.6.0",
"hyper 1.4.1",
"hyper-util",
"itoa",
"matchit",
@@ -718,7 +718,7 @@ dependencies = [
"sync_wrapper 1.0.1",
"tokio",
"tokio-tungstenite 0.26.1",
"tower",
"tower 0.5.2",
"tower-layer",
"tower-service",
"tracing",
@@ -761,7 +761,7 @@ dependencies = [
"mime",
"pin-project-lite",
"serde",
"tower",
"tower 0.5.2",
"tower-layer",
"tower-service",
]
@@ -1337,7 +1337,7 @@ dependencies = [
"tokio-postgres",
"tokio-stream",
"tokio-util",
"tower",
"tower 0.5.2",
"tower-http",
"tower-otel",
"tracing",
@@ -2066,7 +2066,7 @@ dependencies = [
"test-log",
"tokio",
"tokio-util",
"tower",
"tower 0.5.2",
"tracing",
"utils",
"workspace_hack",
@@ -2330,7 +2330,7 @@ dependencies = [
"futures-core",
"futures-sink",
"http-body-util",
"hyper 1.6.0",
"hyper 1.4.1",
"hyper-util",
"pin-project",
"rand 0.8.5",
@@ -2883,9 +2883,9 @@ dependencies = [
[[package]]
name = "httparse"
version = "1.10.1"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dbf3de79e51f3d586ab4cb9d5c3e2c14aa28ed23d180cf89b4df0454a69cc87"
checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
[[package]]
name = "httpdate"
@@ -2935,9 +2935,9 @@ dependencies = [
[[package]]
name = "hyper"
version = "1.6.0"
version = "1.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80"
checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05"
dependencies = [
"bytes",
"futures-channel",
@@ -2977,7 +2977,7 @@ checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
dependencies = [
"futures-util",
"http 1.1.0",
"hyper 1.6.0",
"hyper 1.4.1",
"hyper-util",
"rustls 0.22.4",
"rustls-pki-types",
@@ -2992,7 +2992,7 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
dependencies = [
"hyper 1.6.0",
"hyper 1.4.1",
"hyper-util",
"pin-project-lite",
"tokio",
@@ -3001,20 +3001,20 @@ dependencies = [
[[package]]
name = "hyper-util"
version = "0.1.12"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf9f1e950e0d9d1d3c47184416723cf29c0d1f93bd8cccf37e4beb6b44f31710"
checksum = "cde7055719c54e36e95e8719f95883f22072a48ede39db7fc17a4e1d5281e9b9"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"hyper 1.6.0",
"libc",
"hyper 1.4.1",
"pin-project-lite",
"socket2",
"tokio",
"tower 0.4.13",
"tower-service",
"tracing",
]
@@ -4432,21 +4432,6 @@ dependencies = [
"workspace_hack",
]
[[package]]
name = "pageserver_client_grpc"
version = "0.1.0"
dependencies = [
"bytes",
"futures",
"http 1.1.0",
"pageserver_page_api",
"thiserror 1.0.69",
"tokio",
"tonic 0.13.1",
"tracing",
"utils",
]
[[package]]
name = "pageserver_compaction"
version = "0.1.0"
@@ -5223,7 +5208,7 @@ dependencies = [
"humantime",
"humantime-serde",
"hyper 0.14.30",
"hyper 1.6.0",
"hyper 1.4.1",
"hyper-util",
"indexmap 2.9.0",
"ipnet",
@@ -5619,7 +5604,7 @@ dependencies = [
"http-body-util",
"http-types",
"humantime-serde",
"hyper 1.6.0",
"hyper 1.4.1",
"itertools 0.10.5",
"metrics",
"once_cell",
@@ -5659,7 +5644,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.6.0",
"hyper 1.4.1",
"hyper-rustls 0.26.0",
"hyper-util",
"ipnet",
@@ -5716,7 +5701,7 @@ dependencies = [
"futures",
"getrandom 0.2.11",
"http 1.1.0",
"hyper 1.6.0",
"hyper 1.4.1",
"parking_lot 0.11.2",
"reqwest",
"reqwest-middleware",
@@ -6657,12 +6642,12 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.5.10"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9"
dependencies = [
"libc",
"windows-sys 0.52.0",
"windows-sys 0.48.0",
]
[[package]]
@@ -6728,7 +6713,7 @@ dependencies = [
"http-body-util",
"http-utils",
"humantime",
"hyper 1.6.0",
"hyper 1.4.1",
"hyper-util",
"metrics",
"once_cell",
@@ -7557,7 +7542,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.6.0",
"hyper 1.4.1",
"hyper-timeout",
"hyper-util",
"percent-encoding",
@@ -7568,7 +7553,7 @@ dependencies = [
"tokio",
"tokio-rustls 0.26.2",
"tokio-stream",
"tower",
"tower 0.5.2",
"tower-layer",
"tower-service",
"tracing",
@@ -7601,6 +7586,21 @@ dependencies = [
"tonic 0.13.1",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"pin-project-lite",
"tokio",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower"
version = "0.5.2"
@@ -8591,7 +8591,7 @@ dependencies = [
"hex",
"hmac",
"hyper 0.14.30",
"hyper 1.6.0",
"hyper 1.4.1",
"hyper-util",
"indexmap 2.9.0",
"itertools 0.12.1",
@@ -8645,7 +8645,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"toml_edit",
"tower",
"tower 0.5.2",
"tracing",
"tracing-core",
"tracing-log",

View File

@@ -8,7 +8,6 @@ members = [
"pageserver/compaction",
"pageserver/ctl",
"pageserver/client",
"pageserver/client_grpc",
"pageserver/pagebench",
"pageserver/page_api",
"proxy",
@@ -255,7 +254,6 @@ metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver = { path = "./pageserver" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
pageserver_client = { path = "./pageserver/client" }
pageserver_client_grpc = { path = "./pageserver/client_grpc" }
pageserver_compaction = { version = "0.1", path = "./pageserver/compaction/" }
pageserver_page_api = { path = "./pageserver/page_api" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }

View File

@@ -1,16 +0,0 @@
[package]
name = "pageserver_client_grpc"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
bytes.workspace = true
futures.workspace = true
http.workspace = true
thiserror.workspace = true
tonic.workspace = true
tracing.workspace = true
pageserver_page_api.workspace = true
utils.workspace = true
tokio.workspace = true

View File

@@ -1,192 +0,0 @@
//!
//! Pageserver gRPC client library
//!
//! This library provides a gRPC client for the pageserver for the
//! communicator project.
//!
//! This library is a work in progress.
//!
//!
use std::collections::HashMap;
use bytes::Bytes;
use futures::{StreamExt};
use thiserror::Error;
use tonic::metadata::AsciiMetadataValue;
use pageserver_page_api::proto;
use pageserver_page_api::proto::PageServiceClient;
use utils::shard::ShardIndex;
use std::fmt::Debug;
use tracing::error;
use tokio::sync::RwLock;
use tonic::transport::{Channel, Endpoint};
#[derive(Error, Debug)]
pub enum PageserverClientError {
#[error("could not connect to service: {0}")]
ConnectError(#[from] tonic::transport::Error),
#[error("could not perform request: {0}`")]
RequestError(#[from] tonic::Status),
#[error("protocol error: {0}")]
ProtocolError(#[from] pageserver_page_api::ProtocolError),
#[error("could not perform request: {0}`")]
InvalidUri(#[from] http::uri::InvalidUri),
#[error("could not perform request: {0}`")]
Other(String),
}
pub struct PageserverClient {
endpoint_map: HashMap<ShardIndex, Endpoint>,
channels: tokio::sync::RwLock<HashMap<ShardIndex, Channel>>,
auth_interceptor: AuthInterceptor,
}
impl PageserverClient {
/// TODO: this doesn't currently react to changes in the shard map.
pub fn new(
tenant_id: AsciiMetadataValue,
timeline_id: AsciiMetadataValue,
auth_token: Option<String>,
shard_map: HashMap<ShardIndex, String>,
) -> Result<Self, PageserverClientError> {
let endpoint_map: HashMap<ShardIndex, Endpoint> = shard_map
.into_iter()
.map(|(shard, url)| {
let endpoint = Endpoint::from_shared(url)
.map_err(|_e| PageserverClientError::Other("Unable to parse endpoint {url}".to_string()))?;
Ok::<(ShardIndex, Endpoint), PageserverClientError>((shard, endpoint))
})
.collect::<Result<_, _>>()?;
Ok(Self {
endpoint_map,
channels: RwLock::new(HashMap::new()),
auth_interceptor: AuthInterceptor::new(
tenant_id,
timeline_id,
auth_token,
),
})
}
//
// TODO: This opens a new gRPC stream for every request, which is extremely inefficient
pub async fn get_page(
&self,
shard: ShardIndex,
request: pageserver_page_api::GetPageRequest,
) -> Result<Vec<Bytes>, PageserverClientError> {
// FIXME: calculate the shard number correctly
let chan = self.get_client(shard).await?;
let mut client =
PageServiceClient::with_interceptor(chan, self.auth_interceptor.for_shard(shard));
let request = proto::GetPageRequest::try_from(request)?;
let request_stream = futures::stream::once(std::future::ready(request));
let mut response_stream = client
.get_pages(tonic::Request::new(request_stream))
.await?
.into_inner();
let Some(response) = response_stream.next().await else {
return Err(PageserverClientError::Other(
"no response received for getpage request".to_string(),
));
};
match response {
Err(status) => {
return Err(PageserverClientError::RequestError(status));
}
Ok(resp) => {
let response: pageserver_page_api::GetPageResponse = resp.try_into().unwrap();
return Ok(response.page_images.to_vec());
}
}
}
//
// TODO: this should use a connection pool with concurrency limits,
// not a single connection to the shard.
//
async fn get_client(&self, shard: ShardIndex) -> Result<Channel, PageserverClientError> {
// Get channel from the hashmap
let mut channels = self.channels.write();
if let Some(channel) = channels.await.get(&shard) {
return Ok(channel.clone());
}
// Create a new channel if it doesn't exist
let shard_endpoint = self
.endpoint_map
.get(&shard);
let endpoint = match shard_endpoint{
Some(_endpoint) => _endpoint,
None => {
error!("Shard {shard} not found in shard map");
return Err(PageserverClientError::Other(format!(
"Shard {shard} not found in shard map"
)));
}
};
let channel = endpoint.connect().await?;
channels = self.channels.write();
channels.await.insert(shard, channel.clone());
Ok(channel.clone())
}
}
/// Inject tenant_id, timeline_id and authentication token to all pageserver requests.
#[derive(Clone)]
struct AuthInterceptor {
tenant_id: AsciiMetadataValue,
shard_id: Option<AsciiMetadataValue>,
timeline_id: AsciiMetadataValue,
auth_header: Option<AsciiMetadataValue>, // including "Bearer " prefix
}
impl AuthInterceptor {
fn new(tenant_id: AsciiMetadataValue,
timeline_id: AsciiMetadataValue,
auth_token: Option<String>) -> Self {
Self {
tenant_id: tenant_id,
shard_id: None,
timeline_id: timeline_id,
auth_header: auth_token
.map(|t| format!("Bearer {t}"))
.map(|t| t.parse().expect("could not parse auth token")),
}
}
fn for_shard(&self, shard_id: ShardIndex) -> Self {
let mut with_shard = self.clone();
with_shard.shard_id = Some(
shard_id
.to_string()
.parse()
.expect("could not parse shard id"),
);
with_shard
}
}
impl tonic::service::Interceptor for AuthInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
req.metadata_mut()
.insert("neon-tenant-id", self.tenant_id.clone());
if let Some(shard_id) = &self.shard_id {
req.metadata_mut().insert("neon-shard-id", shard_id.clone());
}
req.metadata_mut()
.insert("neon-timeline-id", self.timeline_id.clone());
if let Some(auth_header) = &self.auth_header {
req.metadata_mut()
.insert("authorization", auth_header.clone());
}
Ok(req)
}
}

View File

@@ -44,6 +44,7 @@ struct GlobalTimelinesState {
// on-demand timeline creation from recreating deleted timelines. This is only soft-enforced, as
// this map is dropped on restart.
tombstones: HashMap<TenantTimelineId, Instant>,
tenant_tombstones: HashMap<TenantId, Instant>,
conf: Arc<SafeKeeperConf>,
broker_active_set: Arc<TimelinesSet>,
@@ -81,10 +82,25 @@ impl GlobalTimelinesState {
}
}
fn has_tombstone(&self, ttid: &TenantTimelineId) -> bool {
self.tombstones.contains_key(ttid) || self.tenant_tombstones.contains_key(&ttid.tenant_id)
}
/// Removes all blocking tombstones for the given timeline ID.
/// Returns `true` if there have been actual changes.
fn remove_tombstone(&mut self, ttid: &TenantTimelineId) -> bool {
self.tombstones.remove(ttid).is_some()
|| self.tenant_tombstones.remove(&ttid.tenant_id).is_some()
}
fn delete(&mut self, ttid: TenantTimelineId) {
self.timelines.remove(&ttid);
self.tombstones.insert(ttid, Instant::now());
}
fn add_tenant_tombstone(&mut self, tenant_id: TenantId) {
self.tenant_tombstones.insert(tenant_id, Instant::now());
}
}
/// A struct used to manage access to the global timelines map.
@@ -99,6 +115,7 @@ impl GlobalTimelines {
state: Mutex::new(GlobalTimelinesState {
timelines: HashMap::new(),
tombstones: HashMap::new(),
tenant_tombstones: HashMap::new(),
conf,
broker_active_set: Arc::new(TimelinesSet::default()),
global_rate_limiter: RateLimiter::new(1, 1),
@@ -245,7 +262,7 @@ impl GlobalTimelines {
return Ok(timeline);
}
if state.tombstones.contains_key(&ttid) {
if state.has_tombstone(&ttid) {
anyhow::bail!("Timeline {ttid} is deleted, refusing to recreate");
}
@@ -295,13 +312,14 @@ impl GlobalTimelines {
_ => {}
}
if check_tombstone {
if state.tombstones.contains_key(&ttid) {
if state.has_tombstone(&ttid) {
anyhow::bail!("timeline {ttid} is deleted, refusing to recreate");
}
} else {
// We may be have been asked to load a timeline that was previously deleted (e.g. from `pull_timeline.rs`). We trust
// that the human doing this manual intervention knows what they are doing, and remove its tombstone.
if state.tombstones.remove(&ttid).is_some() {
// It's also possible that we enter this when the tenant has been deleted, even if the timeline itself has never existed.
if state.remove_tombstone(&ttid) {
warn!("un-deleted timeline {ttid}");
}
}
@@ -482,6 +500,7 @@ impl GlobalTimelines {
let tli_res = {
let state = self.state.lock().unwrap();
// Do NOT check tenant tombstones here: those were set earlier
if state.tombstones.contains_key(ttid) {
// Presence of a tombstone guarantees that a previous deletion has completed and there is no work to do.
info!("Timeline {ttid} was already deleted");
@@ -557,6 +576,10 @@ impl GlobalTimelines {
action: DeleteOrExclude,
) -> Result<HashMap<TenantTimelineId, TimelineDeleteResult>> {
info!("deleting all timelines for tenant {}", tenant_id);
// Adding a tombstone before getting the timelines to prevent new timeline additions
self.state.lock().unwrap().add_tenant_tombstone(*tenant_id);
let to_delete = self.get_all_for_tenant(*tenant_id);
let mut err = None;
@@ -600,6 +623,9 @@ impl GlobalTimelines {
state
.tombstones
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
state
.tenant_tombstones
.retain(|_, v| now.duration_since(*v) < *tombstone_ttl);
}
}

View File

@@ -4192,10 +4192,10 @@ def test_storcon_create_delete_sk_down(
# ensure the safekeeper deleted the timeline
def timeline_deleted_on_active_sks():
env.safekeepers[0].assert_log_contains(
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
f"((deleting timeline|Timeline) {tenant_id}/{child_timeline_id} (from disk|was already deleted)|DELETE.*tenant/{tenant_id} .*status: 200 OK)"
)
env.safekeepers[2].assert_log_contains(
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
f"((deleting timeline|Timeline) {tenant_id}/{child_timeline_id} (from disk|was already deleted)|DELETE.*tenant/{tenant_id} .*status: 200 OK)"
)
wait_until(timeline_deleted_on_active_sks)
@@ -4210,7 +4210,7 @@ def test_storcon_create_delete_sk_down(
# ensure that there is log msgs for the third safekeeper too
def timeline_deleted_on_sk():
env.safekeepers[1].assert_log_contains(
f"deleting timeline {tenant_id}/{child_timeline_id} from disk"
f"((deleting timeline|Timeline) {tenant_id}/{child_timeline_id} (from disk|was already deleted)|DELETE.*tenant/{tenant_id} .*status: 200 OK)"
)
wait_until(timeline_deleted_on_sk)