mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
storage controller: background reconcile, graceful shutdown, better logging (#6709)
## Problem Now that the storage controller is working end to end, we start burning down the robustness aspects. ## Summary of changes - Add a background task that periodically calls `reconcile_all`. This ensures that if earlier operations couldn't succeed (e.g. because a node was unavailable), we will eventually retry. This is a naive initial implementation can start an unlimited number of reconcile tasks: limiting reconcile concurrency is a later item in #6342 - Add a number of tracing spans in key locations: each background task, each reconciler task. - Add a top level CancellationToken and Gate, and use these to implement a graceful shutdown that waits for tasks to shut down. This is not bulletproof yet, because within these tasks we have remote HTTP calls that aren't wrapped in cancellation/timeouts, but it creates the structure, and if we don't shutdown promptly then k8s will kill us. - To protect shard splits from background reconciliation, expose the `SplitState` in memory and use it to guard any APIs that require an attached tenant.
This commit is contained in:
@@ -4,6 +4,11 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
# Enables test-only APIs and behaviors
|
||||
testing = []
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
aws-config.workspace = true
|
||||
|
||||
@@ -155,7 +155,7 @@ impl ComputeHook {
|
||||
|
||||
for (endpoint_name, endpoint) in &cplane.endpoints {
|
||||
if endpoint.tenant_id == tenant_id && endpoint.status() == EndpointStatus::Running {
|
||||
tracing::info!("🔁 Reconfiguring endpoint {}", endpoint_name,);
|
||||
tracing::info!("Reconfiguring endpoint {}", endpoint_name,);
|
||||
endpoint.reconfigure(compute_pageservers.clone()).await?;
|
||||
}
|
||||
}
|
||||
@@ -177,7 +177,7 @@ impl ComputeHook {
|
||||
req
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
tracing::info!(
|
||||
"Sending notify request to {} ({:?})",
|
||||
url,
|
||||
reconfigure_request
|
||||
@@ -266,7 +266,7 @@ impl ComputeHook {
|
||||
/// periods, but we don't retry forever. The **caller** is responsible for handling failures and
|
||||
/// ensuring that they eventually call again to ensure that the compute is eventually notified of
|
||||
/// the proper pageserver nodes for a tenant.
|
||||
#[tracing::instrument(skip_all, fields(tenant_shard_id, node_id))]
|
||||
#[tracing::instrument(skip_all, fields(tenant_id=%tenant_shard_id.tenant_id, shard_id=%tenant_shard_id.shard_slug(), node_id))]
|
||||
pub(super) async fn notify(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
@@ -298,7 +298,7 @@ impl ComputeHook {
|
||||
let Some(reconfigure_request) = reconfigure_request else {
|
||||
// The tenant doesn't yet have pageservers for all its shards: we won't notify anything
|
||||
// until it does.
|
||||
tracing::debug!("Tenant isn't yet ready to emit a notification",);
|
||||
tracing::info!("Tenant isn't yet ready to emit a notification");
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
|
||||
@@ -37,6 +37,12 @@ impl std::fmt::Display for Sequence {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for Sequence {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl MonotonicCounter<Sequence> for Sequence {
|
||||
fn cnt_advance(&mut self, v: Sequence) {
|
||||
assert!(*self <= v);
|
||||
|
||||
@@ -15,6 +15,7 @@ use diesel::Connection;
|
||||
use metrics::launch_timestamp::LaunchTimestamp;
|
||||
use std::sync::Arc;
|
||||
use tokio::signal::unix::SignalKind;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use utils::auth::{JwtAuth, SwappableJwtAuth};
|
||||
use utils::logging::{self, LogFormat};
|
||||
|
||||
@@ -237,15 +238,23 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
let auth = secrets
|
||||
.public_key
|
||||
.map(|jwt_auth| Arc::new(SwappableJwtAuth::new(jwt_auth)));
|
||||
let router = make_router(service, auth)
|
||||
let router = make_router(service.clone(), auth)
|
||||
.build()
|
||||
.map_err(|err| anyhow!(err))?;
|
||||
let router_service = utils::http::RouterService::new(router).unwrap();
|
||||
let server = hyper::Server::from_tcp(http_listener)?.serve(router_service);
|
||||
|
||||
// Start HTTP server
|
||||
let server_shutdown = CancellationToken::new();
|
||||
let server = hyper::Server::from_tcp(http_listener)?
|
||||
.serve(router_service)
|
||||
.with_graceful_shutdown({
|
||||
let server_shutdown = server_shutdown.clone();
|
||||
async move {
|
||||
server_shutdown.cancelled().await;
|
||||
}
|
||||
});
|
||||
tracing::info!("Serving on {0}", args.listen);
|
||||
|
||||
tokio::task::spawn(server);
|
||||
let server_task = tokio::task::spawn(server);
|
||||
|
||||
// Wait until we receive a signal
|
||||
let mut sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
|
||||
@@ -266,5 +275,16 @@ async fn async_main() -> anyhow::Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
// Stop HTTP server first, so that we don't have to service requests
|
||||
// while shutting down Service
|
||||
server_shutdown.cancel();
|
||||
if let Err(e) = server_task.await {
|
||||
tracing::error!("Error joining HTTP server task: {e}")
|
||||
}
|
||||
tracing::info!("Joined HTTP server task");
|
||||
|
||||
service.shutdown().await;
|
||||
tracing::info!("Service shutdown complete");
|
||||
|
||||
std::process::exit(0);
|
||||
}
|
||||
|
||||
@@ -13,6 +13,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use utils::generation::Generation;
|
||||
use utils::id::{NodeId, TimelineId};
|
||||
use utils::lsn::Lsn;
|
||||
use utils::sync::gate::GateGuard;
|
||||
|
||||
use crate::compute_hook::{ComputeHook, NotifyError};
|
||||
use crate::node::Node;
|
||||
@@ -53,6 +54,10 @@ pub(super) struct Reconciler {
|
||||
/// the tenant is changed.
|
||||
pub(crate) cancel: CancellationToken,
|
||||
|
||||
/// Reconcilers are registered with a Gate so that during a graceful shutdown we
|
||||
/// can wait for all the reconcilers to respond to their cancellation tokens.
|
||||
pub(crate) _gate_guard: GateGuard,
|
||||
|
||||
/// Access to persistent storage for updating generation numbers
|
||||
pub(crate) persistence: Arc<Persistence>,
|
||||
}
|
||||
|
||||
@@ -77,12 +77,11 @@ impl Scheduler {
|
||||
return Err(ScheduleError::ImpossibleConstraint);
|
||||
}
|
||||
|
||||
for (node_id, count) in &tenant_counts {
|
||||
tracing::info!("tenant_counts[{node_id}]={count}");
|
||||
}
|
||||
|
||||
let node_id = tenant_counts.first().unwrap().0;
|
||||
tracing::info!("scheduler selected node {node_id}");
|
||||
tracing::info!(
|
||||
"scheduler selected node {node_id} (elegible nodes {:?}, exclude: {hard_exclude:?})",
|
||||
tenant_counts.iter().map(|i| i.0 .0).collect::<Vec<_>>()
|
||||
);
|
||||
*self.tenant_counts.get_mut(&node_id).unwrap() += 1;
|
||||
Ok(node_id)
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ use pageserver_api::{
|
||||
};
|
||||
use pageserver_client::mgmt_api;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::instrument;
|
||||
use utils::{
|
||||
backoff,
|
||||
completion::Barrier,
|
||||
@@ -37,6 +38,7 @@ use utils::{
|
||||
http::error::ApiError,
|
||||
id::{NodeId, TenantId, TimelineId},
|
||||
seqwait::SeqWait,
|
||||
sync::gate::Gate,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
@@ -124,6 +126,12 @@ pub struct Service {
|
||||
config: Config,
|
||||
persistence: Arc<Persistence>,
|
||||
|
||||
// Process shutdown will fire this token
|
||||
cancel: CancellationToken,
|
||||
|
||||
// Background tasks will hold this gate
|
||||
gate: Gate,
|
||||
|
||||
/// This waits for initial reconciliation with pageservers to complete. Until this barrier
|
||||
/// passes, it isn't safe to do any actions that mutate tenants.
|
||||
pub(crate) startup_complete: Barrier,
|
||||
@@ -144,8 +152,9 @@ impl Service {
|
||||
&self.config
|
||||
}
|
||||
|
||||
/// TODO: don't allow other API calls until this is done, don't start doing any background housekeeping
|
||||
/// until this is done.
|
||||
/// Called once on startup, this function attempts to contact all pageservers to build an up-to-date
|
||||
/// view of the world, and determine which pageservers are responsive.
|
||||
#[instrument(skip_all)]
|
||||
async fn startup_reconcile(&self) {
|
||||
// For all tenant shards, a vector of observed states on nodes (where None means
|
||||
// indeterminate, same as in [`ObservedStateLocation`])
|
||||
@@ -153,9 +162,6 @@ impl Service {
|
||||
|
||||
let mut nodes_online = HashSet::new();
|
||||
|
||||
// TODO: give Service a cancellation token for clean shutdown
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// TODO: issue these requests concurrently
|
||||
{
|
||||
let nodes = {
|
||||
@@ -190,7 +196,7 @@ impl Service {
|
||||
1,
|
||||
5,
|
||||
"Location config listing",
|
||||
&cancel,
|
||||
&self.cancel,
|
||||
)
|
||||
.await;
|
||||
let Some(list_response) = list_response else {
|
||||
@@ -331,7 +337,7 @@ impl Service {
|
||||
let stream = futures::stream::iter(compute_notifications.into_iter())
|
||||
.map(|(tenant_shard_id, node_id)| {
|
||||
let compute_hook = compute_hook.clone();
|
||||
let cancel = cancel.clone();
|
||||
let cancel = self.cancel.clone();
|
||||
async move {
|
||||
if let Err(e) = compute_hook.notify(tenant_shard_id, node_id, &cancel).await {
|
||||
tracing::error!(
|
||||
@@ -368,8 +374,98 @@ impl Service {
|
||||
tracing::info!("Startup complete, spawned {reconcile_tasks} reconciliation tasks ({shard_count} shards total)");
|
||||
}
|
||||
|
||||
/// Long running background task that periodically wakes up and looks for shards that need
|
||||
/// reconciliation. Reconciliation is fallible, so any reconciliation tasks that fail during
|
||||
/// e.g. a tenant create/attach/migrate must eventually be retried: this task is responsible
|
||||
/// for those retries.
|
||||
#[instrument(skip_all)]
|
||||
async fn background_reconcile(&self) {
|
||||
self.startup_complete.clone().wait().await;
|
||||
|
||||
const BACKGROUND_RECONCILE_PERIOD: Duration = Duration::from_secs(20);
|
||||
|
||||
let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD);
|
||||
while !self.cancel.is_cancelled() {
|
||||
tokio::select! {
|
||||
_ = interval.tick() => { self.reconcile_all(); }
|
||||
_ = self.cancel.cancelled() => return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
async fn process_results(
|
||||
&self,
|
||||
mut result_rx: tokio::sync::mpsc::UnboundedReceiver<ReconcileResult>,
|
||||
) {
|
||||
loop {
|
||||
// Wait for the next result, or for cancellation
|
||||
let result = tokio::select! {
|
||||
r = result_rx.recv() => {
|
||||
match r {
|
||||
Some(result) => {result},
|
||||
None => {break;}
|
||||
}
|
||||
}
|
||||
_ = self.cancel.cancelled() => {
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"Reconcile result for sequence {}, ok={}",
|
||||
result.sequence,
|
||||
result.result.is_ok()
|
||||
);
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
|
||||
// A reconciliation result might race with removing a tenant: drop results for
|
||||
// tenants that aren't in our map.
|
||||
continue;
|
||||
};
|
||||
|
||||
// Usually generation should only be updated via this path, so the max() isn't
|
||||
// needed, but it is used to handle out-of-band updates via. e.g. test hook.
|
||||
tenant.generation = std::cmp::max(tenant.generation, result.generation);
|
||||
|
||||
// If the reconciler signals that it failed to notify compute, set this state on
|
||||
// the shard so that a future [`TenantState::maybe_reconcile`] will try again.
|
||||
tenant.pending_compute_notification = result.pending_compute_notification;
|
||||
|
||||
match result.result {
|
||||
Ok(()) => {
|
||||
for (node_id, loc) in &result.observed.locations {
|
||||
if let Some(conf) = &loc.conf {
|
||||
tracing::info!("Updating observed location {}: {:?}", node_id, conf);
|
||||
} else {
|
||||
tracing::info!("Setting observed location {} to None", node_id,)
|
||||
}
|
||||
}
|
||||
tenant.observed = result.observed;
|
||||
tenant.waiter.advance(result.sequence);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Reconcile error on tenant {}: {}",
|
||||
tenant.tenant_shard_id,
|
||||
e
|
||||
);
|
||||
|
||||
// Ordering: populate last_error before advancing error_seq,
|
||||
// so that waiters will see the correct error after waiting.
|
||||
*(tenant.last_error.lock().unwrap()) = format!("{e}");
|
||||
tenant.error_waiter.advance(result.sequence);
|
||||
|
||||
for (node_id, o) in result.observed.locations {
|
||||
tenant.observed.locations.insert(node_id, o);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn spawn(config: Config, persistence: Arc<Persistence>) -> anyhow::Result<Arc<Self>> {
|
||||
let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (result_tx, result_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||
|
||||
tracing::info!("Loading nodes from database...");
|
||||
let nodes = persistence.list_nodes().await?;
|
||||
@@ -418,6 +514,7 @@ impl Service {
|
||||
observed: ObservedState::new(),
|
||||
config: serde_json::from_str(&tsp.config).unwrap(),
|
||||
reconciler: None,
|
||||
splitting: tsp.splitting,
|
||||
waiter: Arc::new(SeqWait::new(Sequence::initial())),
|
||||
error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
|
||||
last_error: Arc::default(),
|
||||
@@ -439,73 +536,35 @@ impl Service {
|
||||
config,
|
||||
persistence,
|
||||
startup_complete: startup_complete.clone(),
|
||||
cancel: CancellationToken::new(),
|
||||
gate: Gate::default(),
|
||||
});
|
||||
|
||||
let result_task_this = this.clone();
|
||||
tokio::task::spawn(async move {
|
||||
while let Some(result) = result_rx.recv().await {
|
||||
tracing::info!(
|
||||
"Reconcile result for sequence {}, ok={}",
|
||||
result.sequence,
|
||||
result.result.is_ok()
|
||||
);
|
||||
let mut locked = result_task_this.inner.write().unwrap();
|
||||
let Some(tenant) = locked.tenants.get_mut(&result.tenant_shard_id) else {
|
||||
// A reconciliation result might race with removing a tenant: drop results for
|
||||
// tenants that aren't in our map.
|
||||
continue;
|
||||
};
|
||||
|
||||
// Usually generation should only be updated via this path, so the max() isn't
|
||||
// needed, but it is used to handle out-of-band updates via. e.g. test hook.
|
||||
tenant.generation = std::cmp::max(tenant.generation, result.generation);
|
||||
|
||||
// If the reconciler signals that it failed to notify compute, set this state on
|
||||
// the shard so that a future [`TenantState::maybe_reconcile`] will try again.
|
||||
tenant.pending_compute_notification = result.pending_compute_notification;
|
||||
|
||||
match result.result {
|
||||
Ok(()) => {
|
||||
for (node_id, loc) in &result.observed.locations {
|
||||
if let Some(conf) = &loc.conf {
|
||||
tracing::info!(
|
||||
"Updating observed location {}: {:?}",
|
||||
node_id,
|
||||
conf
|
||||
);
|
||||
} else {
|
||||
tracing::info!("Setting observed location {} to None", node_id,)
|
||||
}
|
||||
}
|
||||
tenant.observed = result.observed;
|
||||
tenant.waiter.advance(result.sequence);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Reconcile error on tenant {}: {}",
|
||||
tenant.tenant_shard_id,
|
||||
e
|
||||
);
|
||||
|
||||
// Ordering: populate last_error before advancing error_seq,
|
||||
// so that waiters will see the correct error after waiting.
|
||||
*(tenant.last_error.lock().unwrap()) = format!("{e}");
|
||||
tenant.error_waiter.advance(result.sequence);
|
||||
|
||||
for (node_id, o) in result.observed.locations {
|
||||
tenant.observed.locations.insert(node_id, o);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Block shutdown until we're done (we must respect self.cancel)
|
||||
if let Ok(_gate) = result_task_this.gate.enter() {
|
||||
result_task_this.process_results(result_rx).await
|
||||
}
|
||||
});
|
||||
|
||||
let startup_reconcile_this = this.clone();
|
||||
tokio::task::spawn(async move {
|
||||
// Block the [`Service::startup_complete`] barrier until we're done
|
||||
let _completion = startup_completion;
|
||||
tokio::task::spawn({
|
||||
let this = this.clone();
|
||||
// We will block the [`Service::startup_complete`] barrier until [`Self::startup_reconcile`]
|
||||
// is done.
|
||||
let startup_completion = startup_completion.clone();
|
||||
async move {
|
||||
// Block shutdown until we're done (we must respect self.cancel)
|
||||
let Ok(_gate) = this.gate.enter() else {
|
||||
return;
|
||||
};
|
||||
|
||||
startup_reconcile_this.startup_reconcile().await
|
||||
this.startup_reconcile().await;
|
||||
|
||||
drop(startup_completion);
|
||||
|
||||
this.background_reconcile().await;
|
||||
}
|
||||
});
|
||||
|
||||
Ok(this)
|
||||
@@ -620,6 +679,28 @@ impl Service {
|
||||
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
|
||||
);
|
||||
|
||||
// Trick the reconciler into not doing anything for this tenant: this helps
|
||||
// tests that manually configure a tenant on the pagesrever, and then call this
|
||||
// attach hook: they don't want background reconciliation to modify what they
|
||||
// did to the pageserver.
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
if let Some(node_id) = attach_req.node_id {
|
||||
tenant_state.observed.locations = HashMap::from([(
|
||||
node_id,
|
||||
ObservedStateLocation {
|
||||
conf: Some(attached_location_conf(
|
||||
tenant_state.generation,
|
||||
&tenant_state.shard,
|
||||
&tenant_state.config,
|
||||
)),
|
||||
},
|
||||
)]);
|
||||
} else {
|
||||
tenant_state.observed.locations.clear();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(AttachHookResponse {
|
||||
gen: attach_req
|
||||
.node_id
|
||||
@@ -868,6 +949,8 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
@@ -970,6 +1053,8 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
);
|
||||
if let Some(waiter) = maybe_waiter {
|
||||
waiters.push(waiter);
|
||||
@@ -1059,6 +1144,8 @@ impl Service {
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_delete(&self, tenant_id: TenantId) -> Result<StatusCode, ApiError> {
|
||||
self.ensure_attached_wait(tenant_id).await?;
|
||||
|
||||
// TODO: refactor into helper
|
||||
let targets = {
|
||||
let locked = self.inner.read().unwrap();
|
||||
@@ -1080,8 +1167,6 @@ impl Service {
|
||||
targets
|
||||
};
|
||||
|
||||
// TODO: error out if the tenant is not attached anywhere.
|
||||
|
||||
// Phase 1: delete on the pageservers
|
||||
let mut any_pending = false;
|
||||
for (tenant_shard_id, node) in targets {
|
||||
@@ -1417,9 +1502,6 @@ impl Service {
|
||||
let mut policy = None;
|
||||
let mut shard_ident = None;
|
||||
|
||||
// TODO: put a cancellation token on Service for clean shutdown
|
||||
let cancel = CancellationToken::new();
|
||||
|
||||
// A parent shard which will be split
|
||||
struct SplitTarget {
|
||||
parent_id: TenantShardId,
|
||||
@@ -1591,6 +1673,18 @@ impl Service {
|
||||
}
|
||||
}
|
||||
|
||||
// Now that I have persisted the splitting state, apply it in-memory. This is infallible, so
|
||||
// callers may assume that if splitting is set in memory, then it was persisted, and if splitting
|
||||
// is not set in memory, then it was not persisted.
|
||||
{
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
for target in &targets {
|
||||
if let Some(parent_shard) = locked.tenants.get_mut(&target.parent_id) {
|
||||
parent_shard.splitting = SplitState::Splitting;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: we have now committed the shard split state to the database, so any subsequent
|
||||
// failure needs to roll it back. We will later wrap this function in logic to roll back
|
||||
// the split if it fails.
|
||||
@@ -1650,7 +1744,7 @@ impl Service {
|
||||
.complete_shard_split(tenant_id, old_shard_count)
|
||||
.await?;
|
||||
|
||||
// Replace all the shards we just split with their children
|
||||
// Replace all the shards we just split with their children: this phase is infallible.
|
||||
let mut response = TenantShardSplitResponse {
|
||||
new_shards: Vec::new(),
|
||||
};
|
||||
@@ -1698,6 +1792,10 @@ impl Service {
|
||||
child_state.generation = generation;
|
||||
child_state.config = config.clone();
|
||||
|
||||
// The child's TenantState::splitting is intentionally left at the default value of Idle,
|
||||
// as at this point in the split process we have succeeded and this part is infallible:
|
||||
// we will never need to do any special recovery from this state.
|
||||
|
||||
child_locations.push((child, pageserver));
|
||||
|
||||
locked.tenants.insert(child, child_state);
|
||||
@@ -1709,7 +1807,7 @@ impl Service {
|
||||
// Send compute notifications for all the new shards
|
||||
let mut failed_notifications = Vec::new();
|
||||
for (child_id, child_ps) in child_locations {
|
||||
if let Err(e) = compute_hook.notify(child_id, child_ps, &cancel).await {
|
||||
if let Err(e) = compute_hook.notify(child_id, child_ps, &self.cancel).await {
|
||||
tracing::warn!("Failed to update compute of {}->{} during split, proceeding anyway to complete split ({e})",
|
||||
child_id, child_ps);
|
||||
failed_notifications.push(child_id);
|
||||
@@ -1785,6 +1883,8 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
)
|
||||
};
|
||||
|
||||
@@ -1986,6 +2086,8 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -2007,6 +2109,8 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -2046,6 +2150,8 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
) {
|
||||
waiters.push(waiter);
|
||||
}
|
||||
@@ -2057,6 +2163,17 @@ impl Service {
|
||||
let ensure_waiters = {
|
||||
let locked = self.inner.write().unwrap();
|
||||
|
||||
// Check if the tenant is splitting: in this case, even if it is attached,
|
||||
// we must act as if it is not: this blocks e.g. timeline creation/deletion
|
||||
// operations during the split.
|
||||
for (_shard_id, shard) in locked.tenants.range(TenantShardId::tenant_range(tenant_id)) {
|
||||
if !matches!(shard.splitting, SplitState::Idle) {
|
||||
return Err(ApiError::ResourceUnavailable(
|
||||
"Tenant shards are currently splitting".into(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
self.ensure_attached_schedule(locked, tenant_id)
|
||||
.map_err(ApiError::InternalServerError)?
|
||||
};
|
||||
@@ -2088,8 +2205,25 @@ impl Service {
|
||||
&compute_hook,
|
||||
&self.config,
|
||||
&self.persistence,
|
||||
&self.gate,
|
||||
&self.cancel,
|
||||
)
|
||||
})
|
||||
.count()
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) {
|
||||
// Note that this already stops processing any results from reconciles: so
|
||||
// we do not expect that our [`TenantState`] objects will reach a neat
|
||||
// final state.
|
||||
self.cancel.cancel();
|
||||
|
||||
// The cancellation tokens in [`crate::reconciler::Reconciler`] are children
|
||||
// of our cancellation token, so we do not need to explicitly cancel each of
|
||||
// them.
|
||||
|
||||
// Background tasks and reconcilers hold gate guards: this waits for them all
|
||||
// to complete.
|
||||
self.gate.close().await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,16 +7,18 @@ use pageserver_api::{
|
||||
};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::{instrument, Instrument};
|
||||
use utils::{
|
||||
generation::Generation,
|
||||
id::NodeId,
|
||||
seqwait::{SeqWait, SeqWaitError},
|
||||
sync::gate::Gate,
|
||||
};
|
||||
|
||||
use crate::{
|
||||
compute_hook::ComputeHook,
|
||||
node::Node,
|
||||
persistence::Persistence,
|
||||
persistence::{split_state::SplitState, Persistence},
|
||||
reconciler::{attached_location_conf, secondary_location_conf, ReconcileError, Reconciler},
|
||||
scheduler::{ScheduleError, Scheduler},
|
||||
service, PlacementPolicy, Sequence,
|
||||
@@ -58,6 +60,11 @@ pub(crate) struct TenantState {
|
||||
/// cancellation token has been fired)
|
||||
pub(crate) reconciler: Option<ReconcilerHandle>,
|
||||
|
||||
/// If a tenant is being split, then all shards with that TenantId will have a
|
||||
/// SplitState set, this acts as a guard against other operations such as background
|
||||
/// reconciliation, and timeline creation.
|
||||
pub(crate) splitting: SplitState,
|
||||
|
||||
/// Optionally wait for reconciliation to complete up to a particular
|
||||
/// sequence number.
|
||||
pub(crate) waiter: std::sync::Arc<SeqWait<Sequence, Sequence>>,
|
||||
@@ -238,6 +245,7 @@ impl TenantState {
|
||||
observed: ObservedState::default(),
|
||||
config: TenantConfig::default(),
|
||||
reconciler: None,
|
||||
splitting: SplitState::Idle,
|
||||
sequence: Sequence(1),
|
||||
waiter: Arc::new(SeqWait::new(Sequence(0))),
|
||||
error_waiter: Arc::new(SeqWait::new(Sequence(0))),
|
||||
@@ -415,6 +423,8 @@ impl TenantState {
|
||||
false
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
#[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
|
||||
pub(crate) fn maybe_reconcile(
|
||||
&mut self,
|
||||
result_tx: tokio::sync::mpsc::UnboundedSender<ReconcileResult>,
|
||||
@@ -422,6 +432,8 @@ impl TenantState {
|
||||
compute_hook: &Arc<ComputeHook>,
|
||||
service_config: &service::Config,
|
||||
persistence: &Arc<Persistence>,
|
||||
gate: &Gate,
|
||||
cancel: &CancellationToken,
|
||||
) -> Option<ReconcilerWaiter> {
|
||||
// If there are any ambiguous observed states, and the nodes they refer to are available,
|
||||
// we should reconcile to clean them up.
|
||||
@@ -443,6 +455,14 @@ impl TenantState {
|
||||
return None;
|
||||
}
|
||||
|
||||
// If we are currently splitting, then never start a reconciler task: the splitting logic
|
||||
// requires that shards are not interfered with while it runs. Do this check here rather than
|
||||
// up top, so that we only log this message if we would otherwise have done a reconciliation.
|
||||
if !matches!(self.splitting, SplitState::Idle) {
|
||||
tracing::info!("Refusing to reconcile, splitting in progress");
|
||||
return None;
|
||||
}
|
||||
|
||||
// Reconcile already in flight for the current sequence?
|
||||
if let Some(handle) = &self.reconciler {
|
||||
if handle.sequence == self.sequence {
|
||||
@@ -460,7 +480,12 @@ impl TenantState {
|
||||
// doing our sequence's work.
|
||||
let old_handle = self.reconciler.take();
|
||||
|
||||
let cancel = CancellationToken::new();
|
||||
let Ok(gate_guard) = gate.enter() else {
|
||||
// Shutting down, don't start a reconciler
|
||||
return None;
|
||||
};
|
||||
|
||||
let reconciler_cancel = cancel.child_token();
|
||||
let mut reconciler = Reconciler {
|
||||
tenant_shard_id: self.tenant_shard_id,
|
||||
shard: self.shard,
|
||||
@@ -471,59 +496,66 @@ impl TenantState {
|
||||
pageservers: pageservers.clone(),
|
||||
compute_hook: compute_hook.clone(),
|
||||
service_config: service_config.clone(),
|
||||
cancel: cancel.clone(),
|
||||
_gate_guard: gate_guard,
|
||||
cancel: reconciler_cancel.clone(),
|
||||
persistence: persistence.clone(),
|
||||
compute_notify_failure: false,
|
||||
};
|
||||
|
||||
let reconcile_seq = self.sequence;
|
||||
|
||||
tracing::info!("Spawning Reconciler for sequence {}", self.sequence);
|
||||
tracing::info!(seq=%reconcile_seq, "Spawning Reconciler for sequence {}", self.sequence);
|
||||
let must_notify = self.pending_compute_notification;
|
||||
let join_handle = tokio::task::spawn(async move {
|
||||
// Wait for any previous reconcile task to complete before we start
|
||||
if let Some(old_handle) = old_handle {
|
||||
old_handle.cancel.cancel();
|
||||
if let Err(e) = old_handle.handle.await {
|
||||
// We can't do much with this other than log it: the task is done, so
|
||||
// we may proceed with our work.
|
||||
tracing::error!("Unexpected join error waiting for reconcile task: {e}");
|
||||
let reconciler_span = tracing::info_span!(parent: None, "reconciler", seq=%reconcile_seq,
|
||||
tenant_id=%reconciler.tenant_shard_id.tenant_id,
|
||||
shard_id=%reconciler.tenant_shard_id.shard_slug());
|
||||
let join_handle = tokio::task::spawn(
|
||||
async move {
|
||||
// Wait for any previous reconcile task to complete before we start
|
||||
if let Some(old_handle) = old_handle {
|
||||
old_handle.cancel.cancel();
|
||||
if let Err(e) = old_handle.handle.await {
|
||||
// We can't do much with this other than log it: the task is done, so
|
||||
// we may proceed with our work.
|
||||
tracing::error!("Unexpected join error waiting for reconcile task: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
// Early check for cancellation before doing any work
|
||||
// TODO: wrap all remote API operations in cancellation check
|
||||
// as well.
|
||||
if reconciler.cancel.is_cancelled() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Attempt to make observed state match intent state
|
||||
let result = reconciler.reconcile().await;
|
||||
|
||||
// If we know we had a pending compute notification from some previous action, send a notification irrespective
|
||||
// of whether the above reconcile() did any work
|
||||
if result.is_ok() && must_notify {
|
||||
// If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
|
||||
reconciler.compute_notify().await.ok();
|
||||
}
|
||||
|
||||
result_tx
|
||||
.send(ReconcileResult {
|
||||
sequence: reconcile_seq,
|
||||
result,
|
||||
tenant_shard_id: reconciler.tenant_shard_id,
|
||||
generation: reconciler.generation,
|
||||
observed: reconciler.observed,
|
||||
pending_compute_notification: reconciler.compute_notify_failure,
|
||||
})
|
||||
.ok();
|
||||
}
|
||||
|
||||
// Early check for cancellation before doing any work
|
||||
// TODO: wrap all remote API operations in cancellation check
|
||||
// as well.
|
||||
if reconciler.cancel.is_cancelled() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Attempt to make observed state match intent state
|
||||
let result = reconciler.reconcile().await;
|
||||
|
||||
// If we know we had a pending compute notification from some previous action, send a notification irrespective
|
||||
// of whether the above reconcile() did any work
|
||||
if result.is_ok() && must_notify {
|
||||
// If this fails we will send the need to retry in [`ReconcileResult::pending_compute_notification`]
|
||||
reconciler.compute_notify().await.ok();
|
||||
}
|
||||
|
||||
result_tx
|
||||
.send(ReconcileResult {
|
||||
sequence: reconcile_seq,
|
||||
result,
|
||||
tenant_shard_id: reconciler.tenant_shard_id,
|
||||
generation: reconciler.generation,
|
||||
observed: reconciler.observed,
|
||||
pending_compute_notification: reconciler.compute_notify_failure,
|
||||
})
|
||||
.ok();
|
||||
});
|
||||
.instrument(reconciler_span),
|
||||
);
|
||||
|
||||
self.reconciler = Some(ReconcilerHandle {
|
||||
sequence: self.sequence,
|
||||
handle: join_handle,
|
||||
cancel,
|
||||
cancel: reconciler_cancel,
|
||||
});
|
||||
|
||||
Some(ReconcilerWaiter {
|
||||
|
||||
@@ -20,6 +20,7 @@ from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import (
|
||||
NeonEnv,
|
||||
NeonEnvBuilder,
|
||||
NeonPageserver,
|
||||
PgBin,
|
||||
S3Scrubber,
|
||||
last_flush_lsn_upload,
|
||||
@@ -62,7 +63,7 @@ def generate_uploads_and_deletions(
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
timeline_id: Optional[TimelineId] = None,
|
||||
data: Optional[str] = None,
|
||||
pageserver_id: Optional[int] = None,
|
||||
pageserver: NeonPageserver,
|
||||
):
|
||||
"""
|
||||
Using the environment's default tenant + timeline, generate a load pattern
|
||||
@@ -77,14 +78,16 @@ def generate_uploads_and_deletions(
|
||||
timeline_id = env.initial_timeline
|
||||
assert timeline_id is not None
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
ps_http = pageserver.http_client()
|
||||
|
||||
with env.endpoints.create_start(
|
||||
"main", tenant_id=tenant_id, pageserver_id=pageserver_id
|
||||
"main", tenant_id=tenant_id, pageserver_id=pageserver.id
|
||||
) as endpoint:
|
||||
if init:
|
||||
endpoint.safe_psql("CREATE TABLE foo (id INTEGER PRIMARY KEY, val text)")
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
last_flush_lsn_upload(
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
|
||||
)
|
||||
|
||||
def churn(data):
|
||||
endpoint.safe_psql_many(
|
||||
@@ -105,7 +108,9 @@ def generate_uploads_and_deletions(
|
||||
# We are waiting for uploads as well as local flush, in order to avoid leaving the system
|
||||
# in a state where there are "future layers" in remote storage that will generate deletions
|
||||
# after a restart.
|
||||
last_flush_lsn_upload(env, endpoint, tenant_id, timeline_id)
|
||||
last_flush_lsn_upload(
|
||||
env, endpoint, tenant_id, timeline_id, pageserver_id=pageserver.id
|
||||
)
|
||||
ps_http.timeline_checkpoint(tenant_id, timeline_id)
|
||||
|
||||
# Compaction should generate some GC-elegible layers
|
||||
@@ -205,7 +210,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
env.neon_cli.create_tenant(
|
||||
tenant_id=env.initial_tenant, conf=TENANT_CONF, timeline_id=env.initial_timeline
|
||||
)
|
||||
generate_uploads_and_deletions(env, pageserver_id=env.pageserver.id)
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver)
|
||||
|
||||
def parse_generation_suffix(key):
|
||||
m = re.match(".+-([0-9a-zA-Z]{8})$", key)
|
||||
@@ -233,7 +238,7 @@ def test_generations_upgrade(neon_env_builder: NeonEnvBuilder):
|
||||
# Starting without the override that disabled control_plane_api
|
||||
env.pageserver.start()
|
||||
|
||||
generate_uploads_and_deletions(env, pageserver_id=env.pageserver.id, init=False)
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver, init=False)
|
||||
|
||||
legacy_objects: list[str] = []
|
||||
suffixed_objects = []
|
||||
@@ -277,13 +282,16 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
|
||||
some_other_pageserver = 1234
|
||||
attached_to_id = env.attachment_service.locate(env.initial_tenant)[0]["node_id"]
|
||||
main_pageserver = env.get_pageserver(attached_to_id)
|
||||
other_pageserver = [p for p in env.pageservers if p.id != attached_to_id][0]
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
ps_http = main_pageserver.http_client()
|
||||
|
||||
generate_uploads_and_deletions(env)
|
||||
generate_uploads_and_deletions(env, pageserver=main_pageserver)
|
||||
|
||||
# Flush: pending deletions should all complete
|
||||
assert_deletion_queue(ps_http, lambda n: n > 0)
|
||||
@@ -296,14 +304,14 @@ def test_deferred_deletion(neon_env_builder: NeonEnvBuilder):
|
||||
assert timeline["remote_consistent_lsn"] == timeline["remote_consistent_lsn_visible"]
|
||||
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
main_pageserver.allowed_errors.extend(
|
||||
[".*Dropped remote consistent LSN updates.*", ".*Dropping stale deletions.*"]
|
||||
)
|
||||
|
||||
# Now advance the generation in the control plane: subsequent validations
|
||||
# from the running pageserver will fail. No more deletions should happen.
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
|
||||
generate_uploads_and_deletions(env, init=False, pageserver_id=env.pageserver.id)
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, other_pageserver.id)
|
||||
generate_uploads_and_deletions(env, init=False, pageserver=main_pageserver)
|
||||
|
||||
assert_deletion_queue(ps_http, lambda n: n > 0)
|
||||
queue_depth_before = get_deletion_queue_depth(ps_http)
|
||||
@@ -355,9 +363,14 @@ def test_deletion_queue_recovery(
|
||||
neon_env_builder.enable_pageserver_remote_storage(
|
||||
RemoteStorageKind.MOCK_S3,
|
||||
)
|
||||
neon_env_builder.num_pageservers = 2
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=TENANT_CONF)
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
attached_to_id = env.attachment_service.locate(env.initial_tenant)[0]["node_id"]
|
||||
main_pageserver = env.get_pageserver(attached_to_id)
|
||||
other_pageserver = [p for p in env.pageservers if p.id != attached_to_id][0]
|
||||
|
||||
ps_http = main_pageserver.http_client()
|
||||
|
||||
failpoints = [
|
||||
# Prevent deletion lists from being executed, to build up some backlog of deletions
|
||||
@@ -374,7 +387,7 @@ def test_deletion_queue_recovery(
|
||||
|
||||
ps_http.configure_failpoints(failpoints)
|
||||
|
||||
generate_uploads_and_deletions(env)
|
||||
generate_uploads_and_deletions(env, pageserver=main_pageserver)
|
||||
|
||||
# There should be entries in the deletion queue
|
||||
assert_deletion_queue(ps_http, lambda n: n > 0)
|
||||
@@ -401,7 +414,7 @@ def test_deletion_queue_recovery(
|
||||
# also wait to see the header hit the disk: this seems paranoid but the race
|
||||
# can really happen on a heavily overloaded test machine.
|
||||
def assert_header_written():
|
||||
assert (env.pageserver.workdir / "deletion" / "header-01").exists()
|
||||
assert (main_pageserver.workdir / "deletion" / "header-01").exists()
|
||||
|
||||
wait_until(20, 1, assert_header_written)
|
||||
|
||||
@@ -411,13 +424,13 @@ def test_deletion_queue_recovery(
|
||||
before_restart_depth = get_deletion_queue_validated(ps_http)
|
||||
|
||||
log.info(f"Restarting pageserver with {before_restart_depth} deletions enqueued")
|
||||
env.pageserver.stop(immediate=True)
|
||||
main_pageserver.stop(immediate=True)
|
||||
|
||||
if keep_attachment == KeepAttachment.LOSE:
|
||||
some_other_pageserver = 101010
|
||||
some_other_pageserver = other_pageserver.id
|
||||
env.attachment_service.attach_hook_issue(env.initial_tenant, some_other_pageserver)
|
||||
|
||||
env.pageserver.start()
|
||||
main_pageserver.start()
|
||||
|
||||
def assert_deletions_submitted(n: int):
|
||||
assert ps_http.get_metric_value("pageserver_deletion_queue_submitted_total") == n
|
||||
@@ -440,7 +453,7 @@ def test_deletion_queue_recovery(
|
||||
# validated before restart.
|
||||
assert get_deletion_queue_executed(ps_http) == before_restart_depth
|
||||
else:
|
||||
env.pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
|
||||
main_pageserver.allowed_errors.extend([".*Dropping stale deletions.*"])
|
||||
|
||||
# If we lost the attachment, we should have dropped our pre-restart deletions.
|
||||
assert get_deletion_queue_dropped(ps_http) == before_restart_depth
|
||||
@@ -449,8 +462,8 @@ def test_deletion_queue_recovery(
|
||||
assert get_deletion_queue_dropped_lsn_updates(ps_http) == 0
|
||||
|
||||
# Restart again
|
||||
env.pageserver.stop(immediate=True)
|
||||
env.pageserver.start()
|
||||
main_pageserver.stop(immediate=True)
|
||||
main_pageserver.start()
|
||||
|
||||
# No deletion lists should be recovered: this demonstrates that deletion lists
|
||||
# were cleaned up after being executed or dropped in the previous process lifetime.
|
||||
@@ -469,7 +482,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
|
||||
ps_http = env.pageserver.http_client()
|
||||
|
||||
generate_uploads_and_deletions(env, pageserver_id=env.pageserver.id)
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver)
|
||||
|
||||
env.pageserver.allowed_errors.extend(
|
||||
[
|
||||
@@ -486,7 +499,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
# Remember how many validations had happened before the control plane went offline
|
||||
validated = get_deletion_queue_validated(ps_http)
|
||||
|
||||
generate_uploads_and_deletions(env, init=False, pageserver_id=env.pageserver.id)
|
||||
generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver)
|
||||
|
||||
# The running pageserver should stop progressing deletions
|
||||
time.sleep(10)
|
||||
@@ -502,7 +515,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
)
|
||||
|
||||
# The pageserver should provide service to clients
|
||||
generate_uploads_and_deletions(env, init=False, pageserver_id=env.pageserver.id)
|
||||
generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver)
|
||||
|
||||
# The pageserver should neither validate nor execute any deletions, it should have
|
||||
# loaded the DeletionLists from before though
|
||||
@@ -523,7 +536,7 @@ def test_emergency_mode(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
env.pageserver.stop() # Non-immediate: implicitly checking that shutdown doesn't hang waiting for CP
|
||||
env.pageserver.start()
|
||||
|
||||
generate_uploads_and_deletions(env, init=False, pageserver_id=env.pageserver.id)
|
||||
generate_uploads_and_deletions(env, init=False, pageserver=env.pageserver)
|
||||
ps_http.deletion_queue_flush(execute=True)
|
||||
assert get_deletion_queue_depth(ps_http) == 0
|
||||
assert get_deletion_queue_validated(ps_http) > 0
|
||||
@@ -561,7 +574,7 @@ def test_eviction_across_generations(neon_env_builder: NeonEnvBuilder):
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.initial_timeline
|
||||
|
||||
generate_uploads_and_deletions(env)
|
||||
generate_uploads_and_deletions(env, pageserver=env.pageserver)
|
||||
|
||||
read_all(env, tenant_id, timeline_id)
|
||||
evict_all_layers(env, tenant_id, timeline_id)
|
||||
|
||||
Reference in New Issue
Block a user