diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml
index d27713f083..36922d5294 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -1127,6 +1127,7 @@ jobs:
-f deployProxy=false \
-f deployStorage=true \
-f deployStorageBroker=true \
+ -f deployStorageController=true \
-f branch=main \
-f dockerTag=${{needs.tag.outputs.build-tag}} \
-f deployPreprodRegion=true
@@ -1136,6 +1137,7 @@ jobs:
-f deployProxy=false \
-f deployStorage=true \
-f deployStorageBroker=true \
+ -f deployStorageController=true \
-f branch=main \
-f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release-proxy" ]]; then
@@ -1144,6 +1146,7 @@ jobs:
-f deployProxy=true \
-f deployStorage=false \
-f deployStorageBroker=false \
+ -f deployStorageController=false \
-f branch=main \
-f dockerTag=${{needs.tag.outputs.build-tag}} \
-f deployPreprodRegion=true
diff --git a/Dockerfile.compute-node b/Dockerfile.compute-node
index c73b9ce5c9..bd4534ce1d 100644
--- a/Dockerfile.compute-node
+++ b/Dockerfile.compute-node
@@ -944,6 +944,9 @@ RUN mkdir /var/db && useradd -m -d /var/db/postgres postgres && \
COPY --from=postgres-cleanup-layer --chown=postgres /usr/local/pgsql /usr/local
COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
+# Create remote extension download directory
+RUN mkdir /usr/local/download_extensions && chown -R postgres:postgres /usr/local/download_extensions
+
# Install:
# libreadline8 for psql
# libicu67, locales for collations (including ICU and plpgsql_check)
diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs
index 0fa315682d..88dc4aca2b 100644
--- a/compute_tools/src/compute.rs
+++ b/compute_tools/src/compute.rs
@@ -1262,10 +1262,12 @@ LIMIT 100",
.await
.map_err(DownloadError::Other);
- self.ext_download_progress
- .write()
- .expect("bad lock")
- .insert(ext_archive_name.to_string(), (download_start, true));
+ if download_size.is_ok() {
+ self.ext_download_progress
+ .write()
+ .expect("bad lock")
+ .insert(ext_archive_name.to_string(), (download_start, true));
+ }
download_size
}
diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs
index 4006062fc2..5643634633 100644
--- a/compute_tools/src/spec.rs
+++ b/compute_tools/src/spec.rs
@@ -743,21 +743,24 @@ pub fn handle_extension_neon(client: &mut Client) -> Result<()> {
// which may happen in two cases:
// - extension was just installed
// - extension was already installed and is up to date
- // DISABLED due to compute node unpinning epic
- // let query = "ALTER EXTENSION neon UPDATE";
- // info!("update neon extension version with query: {}", query);
- // client.simple_query(query)?;
+ let query = "ALTER EXTENSION neon UPDATE";
+ info!("update neon extension version with query: {}", query);
+ if let Err(e) = client.simple_query(query) {
+ error!(
+ "failed to upgrade neon extension during `handle_extension_neon`: {}",
+ e
+ );
+ }
Ok(())
}
#[instrument(skip_all)]
-pub fn handle_neon_extension_upgrade(_client: &mut Client) -> Result<()> {
- info!("handle neon extension upgrade (not really)");
- // DISABLED due to compute node unpinning epic
- // let query = "ALTER EXTENSION neon UPDATE";
- // info!("update neon extension version with query: {}", query);
- // client.simple_query(query)?;
+pub fn handle_neon_extension_upgrade(client: &mut Client) -> Result<()> {
+ info!("handle neon extension upgrade");
+ let query = "ALTER EXTENSION neon UPDATE";
+ info!("update neon extension version with query: {}", query);
+ client.simple_query(query)?;
Ok(())
}
diff --git a/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/down.sql b/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/down.sql
new file mode 100644
index 0000000000..33c06dc03d
--- /dev/null
+++ b/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/down.sql
@@ -0,0 +1,3 @@
+-- This file should undo anything in `up.sql`
+
+ALTER TABLE tenant_shards drop scheduling_policy;
\ No newline at end of file
diff --git a/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/up.sql b/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/up.sql
new file mode 100644
index 0000000000..aa00f0d2ca
--- /dev/null
+++ b/control_plane/attachment_service/migrations/2024-03-27-133204_tenant_policies/up.sql
@@ -0,0 +1,2 @@
+
+ALTER TABLE tenant_shards add scheduling_policy VARCHAR NOT NULL DEFAULT '"Active"';
diff --git a/control_plane/attachment_service/src/compute_hook.rs b/control_plane/attachment_service/src/compute_hook.rs
index bebc62ac2f..1a8dc6b86d 100644
--- a/control_plane/attachment_service/src/compute_hook.rs
+++ b/control_plane/attachment_service/src/compute_hook.rs
@@ -14,7 +14,6 @@ use utils::{
use crate::service::Config;
-const BUSY_DELAY: Duration = Duration::from_secs(1);
const SLOWDOWN_DELAY: Duration = Duration::from_secs(5);
pub(crate) const API_CONCURRENCY: usize = 32;
@@ -280,11 +279,10 @@ impl ComputeHook {
Err(NotifyError::SlowDown)
}
StatusCode::LOCKED => {
- // Delay our retry if busy: the usual fast exponential backoff in backoff::retry
- // is not appropriate
- tokio::time::timeout(BUSY_DELAY, cancel.cancelled())
- .await
- .ok();
+ // We consider this fatal, because it's possible that the operation blocking the control one is
+ // also the one that is waiting for this reconcile. We should let the reconciler calling
+ // this hook fail, to give control plane a chance to un-lock.
+ tracing::info!("Control plane reports tenant is locked, dropping out of notify");
Err(NotifyError::Busy)
}
StatusCode::SERVICE_UNAVAILABLE
@@ -306,7 +304,12 @@ impl ComputeHook {
let client = reqwest::Client::new();
backoff::retry(
|| self.do_notify_iteration(&client, url, &reconfigure_request, cancel),
- |e| matches!(e, NotifyError::Fatal(_) | NotifyError::Unexpected(_)),
+ |e| {
+ matches!(
+ e,
+ NotifyError::Fatal(_) | NotifyError::Unexpected(_) | NotifyError::Busy
+ )
+ },
3,
10,
"Send compute notification",
diff --git a/control_plane/attachment_service/src/http.rs b/control_plane/attachment_service/src/http.rs
index 036019cd38..1f3f78bffa 100644
--- a/control_plane/attachment_service/src/http.rs
+++ b/control_plane/attachment_service/src/http.rs
@@ -34,7 +34,8 @@ use utils::{
};
use pageserver_api::controller_api::{
- NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantShardMigrateRequest,
+ NodeAvailability, NodeConfigureRequest, NodeRegisterRequest, TenantPolicyRequest,
+ TenantShardMigrateRequest,
};
use pageserver_api::upcall_api::{ReAttachRequest, ValidateRequest};
@@ -478,6 +479,22 @@ async fn handle_tenant_shard_migrate(
)
}
+async fn handle_tenant_update_policy(mut req: Request
) -> Result, ApiError> {
+ check_permissions(&req, Scope::Admin)?;
+
+ let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
+ let update_req = json_request::(&mut req).await?;
+ let state = get_state(&req);
+
+ json_response(
+ StatusCode::OK,
+ state
+ .service
+ .tenant_update_policy(tenant_id, update_req)
+ .await?,
+ )
+}
+
async fn handle_tenant_drop(req: Request) -> Result, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
@@ -509,6 +526,14 @@ async fn handle_consistency_check(req: Request) -> Result,
json_response(StatusCode::OK, state.service.consistency_check().await?)
}
+async fn handle_reconcile_all(req: Request) -> Result, ApiError> {
+ check_permissions(&req, Scope::Admin)?;
+
+ let state = get_state(&req);
+
+ json_response(StatusCode::OK, state.service.reconcile_all_now().await?)
+}
+
/// Status endpoint is just used for checking that our HTTP listener is up
async fn handle_status(_req: Request) -> Result, ApiError> {
json_response(StatusCode::OK, ())
@@ -726,6 +751,9 @@ pub fn make_router(
RequestName("debug_v1_consistency_check"),
)
})
+ .post("/debug/v1/reconcile_all", |r| {
+ request_span(r, handle_reconcile_all)
+ })
.put("/debug/v1/failpoints", |r| {
request_span(r, |r| failpoints_handler(r, CancellationToken::new()))
})
@@ -765,6 +793,13 @@ pub fn make_router(
RequestName("control_v1_tenant_describe"),
)
})
+ .put("/control/v1/tenant/:tenant_id/policy", |r| {
+ named_request_span(
+ r,
+ handle_tenant_update_policy,
+ RequestName("control_v1_tenant_policy"),
+ )
+ })
// Tenant operations
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.
diff --git a/control_plane/attachment_service/src/metrics.rs b/control_plane/attachment_service/src/metrics.rs
index ccf5e9b07c..cabf416b9f 100644
--- a/control_plane/attachment_service/src/metrics.rs
+++ b/control_plane/attachment_service/src/metrics.rs
@@ -37,6 +37,9 @@ pub(crate) struct StorageControllerMetricGroup {
pub(crate) storage_controller_reconcile_complete:
measured::CounterVec,
+ /// Count of how many times we make an optimization change to a tenant's scheduling
+ pub(crate) storage_controller_schedule_optimization: measured::Counter,
+
/// HTTP request status counters for handled requests
pub(crate) storage_controller_http_request_status:
measured::CounterVec,
@@ -101,6 +104,7 @@ impl StorageControllerMetricGroup {
status: StaticLabelSet::new(),
},
),
+ storage_controller_schedule_optimization: measured::Counter::new(),
storage_controller_http_request_status: measured::CounterVec::new(
HttpRequestStatusLabelGroupSet {
path: lasso::ThreadedRodeo::new(),
diff --git a/control_plane/attachment_service/src/persistence.rs b/control_plane/attachment_service/src/persistence.rs
index dafd52017b..d60392bdbc 100644
--- a/control_plane/attachment_service/src/persistence.rs
+++ b/control_plane/attachment_service/src/persistence.rs
@@ -9,6 +9,7 @@ use camino::Utf8PathBuf;
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
+use pageserver_api::controller_api::ShardSchedulingPolicy;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
use pageserver_api::models::TenantConfig;
use pageserver_api::shard::ShardConfigError;
@@ -107,6 +108,12 @@ pub(crate) enum AbortShardSplitStatus {
pub(crate) type DatabaseResult = Result;
+/// Some methods can operate on either a whole tenant or a single shard
+pub(crate) enum TenantFilter {
+ Tenant(TenantId),
+ Shard(TenantShardId),
+}
+
impl Persistence {
// The default postgres connection limit is 100. We use up to 99, to leave one free for a human admin under
// normal circumstances. This assumes we have exclusive use of the database cluster to which we connect.
@@ -140,7 +147,7 @@ impl Persistence {
/// Wraps `with_conn` in order to collect latency and error metrics
async fn with_measured_conn(&self, op: DatabaseOperation, func: F) -> DatabaseResult
where
- F: Fn(&mut PgConnection) -> DatabaseResult + Send + 'static,
+ F: FnOnce(&mut PgConnection) -> DatabaseResult + Send + 'static,
R: Send + 'static,
{
let latency = &METRICS_REGISTRY
@@ -168,7 +175,7 @@ impl Persistence {
/// Call the provided function in a tokio blocking thread, with a Diesel database connection.
async fn with_conn(&self, func: F) -> DatabaseResult
where
- F: Fn(&mut PgConnection) -> DatabaseResult + Send + 'static,
+ F: FnOnce(&mut PgConnection) -> DatabaseResult + Send + 'static,
R: Send + 'static,
{
let mut conn = self.connection_pool.get()?;
@@ -275,6 +282,11 @@ impl Persistence {
// Backward compat for test data after PR https://github.com/neondatabase/neon/pull/7165
shard.placement_policy = "{\"Attached\":0}".to_string();
}
+
+ if shard.scheduling_policy.is_empty() {
+ shard.scheduling_policy =
+ serde_json::to_string(&ShardSchedulingPolicy::default()).unwrap();
+ }
}
let tenants: Vec = decoded.tenants.into_values().collect();
@@ -465,59 +477,45 @@ impl Persistence {
/// that we only do the first time a tenant is set to an attached policy via /location_config.
pub(crate) async fn update_tenant_shard(
&self,
- tenant_shard_id: TenantShardId,
- input_placement_policy: PlacementPolicy,
- input_config: TenantConfig,
+ tenant: TenantFilter,
+ input_placement_policy: Option,
+ input_config: Option,
input_generation: Option,
+ input_scheduling_policy: Option,
) -> DatabaseResult<()> {
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(DatabaseOperation::UpdateTenantShard, move |conn| {
- let query = diesel::update(tenant_shards)
- .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
- .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
- .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32));
+ let query = match tenant {
+ TenantFilter::Shard(tenant_shard_id) => diesel::update(tenant_shards)
+ .filter(tenant_id.eq(tenant_shard_id.tenant_id.to_string()))
+ .filter(shard_number.eq(tenant_shard_id.shard_number.0 as i32))
+ .filter(shard_count.eq(tenant_shard_id.shard_count.literal() as i32))
+ .into_boxed(),
+ TenantFilter::Tenant(input_tenant_id) => diesel::update(tenant_shards)
+ .filter(tenant_id.eq(input_tenant_id.to_string()))
+ .into_boxed(),
+ };
- if let Some(input_generation) = input_generation {
- // Update includes generation column
- query
- .set((
- generation.eq(Some(input_generation.into().unwrap() as i32)),
- placement_policy
- .eq(serde_json::to_string(&input_placement_policy).unwrap()),
- config.eq(serde_json::to_string(&input_config).unwrap()),
- ))
- .execute(conn)?;
- } else {
- // Update does not include generation column
- query
- .set((
- placement_policy
- .eq(serde_json::to_string(&input_placement_policy).unwrap()),
- config.eq(serde_json::to_string(&input_config).unwrap()),
- ))
- .execute(conn)?;
+ #[derive(AsChangeset)]
+ #[diesel(table_name = crate::schema::tenant_shards)]
+ struct ShardUpdate {
+ generation: Option,
+ placement_policy: Option,
+ config: Option,
+ scheduling_policy: Option,
}
- Ok(())
- })
- .await?;
+ let update = ShardUpdate {
+ generation: input_generation.map(|g| g.into().unwrap() as i32),
+ placement_policy: input_placement_policy
+ .map(|p| serde_json::to_string(&p).unwrap()),
+ config: input_config.map(|c| serde_json::to_string(&c).unwrap()),
+ scheduling_policy: input_scheduling_policy
+ .map(|p| serde_json::to_string(&p).unwrap()),
+ };
- Ok(())
- }
-
- pub(crate) async fn update_tenant_config(
- &self,
- input_tenant_id: TenantId,
- input_config: TenantConfig,
- ) -> DatabaseResult<()> {
- use crate::schema::tenant_shards::dsl::*;
-
- self.with_measured_conn(DatabaseOperation::UpdateTenantConfig, move |conn| {
- diesel::update(tenant_shards)
- .filter(tenant_id.eq(input_tenant_id.to_string()))
- .set((config.eq(serde_json::to_string(&input_config).unwrap()),))
- .execute(conn)?;
+ query.set(update).execute(conn)?;
Ok(())
})
@@ -728,6 +726,8 @@ pub(crate) struct TenantShardPersistence {
pub(crate) splitting: SplitState,
#[serde(default)]
pub(crate) config: String,
+ #[serde(default)]
+ pub(crate) scheduling_policy: String,
}
impl TenantShardPersistence {
diff --git a/control_plane/attachment_service/src/reconciler.rs b/control_plane/attachment_service/src/reconciler.rs
index a62357f9ac..72eb8faccb 100644
--- a/control_plane/attachment_service/src/reconciler.rs
+++ b/control_plane/attachment_service/src/reconciler.rs
@@ -487,6 +487,7 @@ impl Reconciler {
while let Err(e) = self.compute_notify().await {
match e {
NotifyError::Fatal(_) => return Err(ReconcileError::Notify(e)),
+ NotifyError::ShuttingDown => return Err(ReconcileError::Cancel),
_ => {
tracing::warn!(
"Live migration blocked by compute notification error, retrying: {e}"
diff --git a/control_plane/attachment_service/src/scheduler.rs b/control_plane/attachment_service/src/scheduler.rs
index 981ba26cce..782189d11f 100644
--- a/control_plane/attachment_service/src/scheduler.rs
+++ b/control_plane/attachment_service/src/scheduler.rs
@@ -58,6 +58,70 @@ pub(crate) struct Scheduler {
nodes: HashMap,
}
+/// Score for soft constraint scheduling: lower scores are preferred to higher scores.
+///
+/// For example, we may set an affinity score based on the number of shards from the same
+/// tenant already on a node, to implicitly prefer to balance out shards.
+#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord)]
+pub(crate) struct AffinityScore(pub(crate) usize);
+
+impl AffinityScore {
+ /// If we have no anti-affinity at all toward a node, this is its score. It means
+ /// the scheduler has a free choice amongst nodes with this score, and may pick a node
+ /// based on other information such as total utilization.
+ pub(crate) const FREE: Self = Self(0);
+
+ pub(crate) fn inc(&mut self) {
+ self.0 += 1;
+ }
+}
+
+impl std::ops::Add for AffinityScore {
+ type Output = Self;
+
+ fn add(self, rhs: Self) -> Self::Output {
+ Self(self.0 + rhs.0)
+ }
+}
+
+// For carrying state between multiple calls to [`TenantState::schedule`], e.g. when calling
+// it for many shards in the same tenant.
+#[derive(Debug, Default)]
+pub(crate) struct ScheduleContext {
+ /// Sparse map of nodes: omitting a node implicitly makes its affinity [`AffinityScore::FREE`]
+ pub(crate) nodes: HashMap,
+
+ /// Specifically how many _attached_ locations are on each node
+ pub(crate) attached_nodes: HashMap,
+}
+
+impl ScheduleContext {
+ /// Input is a list of nodes we would like to avoid using again within this context. The more
+ /// times a node is passed into this call, the less inclined we are to use it.
+ pub(crate) fn avoid(&mut self, nodes: &[NodeId]) {
+ for node_id in nodes {
+ let entry = self.nodes.entry(*node_id).or_insert(AffinityScore::FREE);
+ entry.inc()
+ }
+ }
+
+ pub(crate) fn push_attached(&mut self, node_id: NodeId) {
+ let entry = self.attached_nodes.entry(node_id).or_default();
+ *entry += 1;
+ }
+
+ pub(crate) fn get_node_affinity(&self, node_id: NodeId) -> AffinityScore {
+ self.nodes
+ .get(&node_id)
+ .copied()
+ .unwrap_or(AffinityScore::FREE)
+ }
+
+ pub(crate) fn get_node_attachments(&self, node_id: NodeId) -> usize {
+ self.attached_nodes.get(&node_id).copied().unwrap_or(0)
+ }
+}
+
impl Scheduler {
pub(crate) fn new<'a>(nodes: impl Iterator- ) -> Self {
let mut scheduler_nodes = HashMap::new();
@@ -224,27 +288,47 @@ impl Scheduler {
node.and_then(|(node_id, may_schedule)| if may_schedule { Some(node_id) } else { None })
}
- pub(crate) fn schedule_shard(&self, hard_exclude: &[NodeId]) -> Result {
+ /// hard_exclude: it is forbidden to use nodes in this list, typically becacuse they
+ /// are already in use by this shard -- we use this to avoid picking the same node
+ /// as both attached and secondary location. This is a hard constraint: if we cannot
+ /// find any nodes that aren't in this list, then we will return a [`ScheduleError::ImpossibleConstraint`].
+ ///
+ /// context: we prefer to avoid using nodes identified in the context, according
+ /// to their anti-affinity score. We use this to prefeer to avoid placing shards in
+ /// the same tenant on the same node. This is a soft constraint: the context will never
+ /// cause us to fail to schedule a shard.
+ pub(crate) fn schedule_shard(
+ &self,
+ hard_exclude: &[NodeId],
+ context: &ScheduleContext,
+ ) -> Result {
if self.nodes.is_empty() {
return Err(ScheduleError::NoPageservers);
}
- let mut tenant_counts: Vec<(NodeId, usize)> = self
+ let mut scores: Vec<(NodeId, AffinityScore, usize)> = self
.nodes
.iter()
.filter_map(|(k, v)| {
if hard_exclude.contains(k) || v.may_schedule == MaySchedule::No {
None
} else {
- Some((*k, v.shard_count))
+ Some((
+ *k,
+ context.nodes.get(k).copied().unwrap_or(AffinityScore::FREE),
+ v.shard_count,
+ ))
}
})
.collect();
- // Sort by tenant count. Nodes with the same tenant count are sorted by ID.
- tenant_counts.sort_by_key(|i| (i.1, i.0));
+ // Sort by, in order of precedence:
+ // 1st: Affinity score. We should never pick a higher-score node if a lower-score node is available
+ // 2nd: Utilization. Within nodes with the same affinity, use the least loaded nodes.
+ // 3rd: Node ID. This is a convenience to make selection deterministic in tests and empty systems.
+ scores.sort_by_key(|i| (i.1, i.2, i.0));
- if tenant_counts.is_empty() {
+ if scores.is_empty() {
// After applying constraints, no pageservers were left. We log some detail about
// the state of nodes to help understand why this happened. This is not logged as an error because
// it is legitimately possible for enough nodes to be Offline to prevent scheduling a shard.
@@ -260,10 +344,11 @@ impl Scheduler {
return Err(ScheduleError::ImpossibleConstraint);
}
- let node_id = tenant_counts.first().unwrap().0;
+ // Lowest score wins
+ let node_id = scores.first().unwrap().0;
tracing::info!(
- "scheduler selected node {node_id} (elegible nodes {:?}, exclude: {hard_exclude:?})",
- tenant_counts.iter().map(|i| i.0 .0).collect::>()
+ "scheduler selected node {node_id} (elegible nodes {:?}, hard exclude: {hard_exclude:?}, soft exclude: {context:?})",
+ scores.iter().map(|i| i.0 .0).collect::>()
);
// Note that we do not update shard count here to reflect the scheduling: that
@@ -271,6 +356,12 @@ impl Scheduler {
Ok(node_id)
}
+
+ /// Unit test access to internal state
+ #[cfg(test)]
+ pub(crate) fn get_node_shard_count(&self, node_id: NodeId) -> usize {
+ self.nodes.get(&node_id).unwrap().shard_count
+ }
}
#[cfg(test)]
@@ -316,15 +407,17 @@ mod tests {
let mut t1_intent = IntentState::new();
let mut t2_intent = IntentState::new();
- let scheduled = scheduler.schedule_shard(&[])?;
+ let context = ScheduleContext::default();
+
+ let scheduled = scheduler.schedule_shard(&[], &context)?;
t1_intent.set_attached(&mut scheduler, Some(scheduled));
- let scheduled = scheduler.schedule_shard(&[])?;
+ let scheduled = scheduler.schedule_shard(&[], &context)?;
t2_intent.set_attached(&mut scheduler, Some(scheduled));
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
assert_eq!(scheduler.nodes.get(&NodeId(2)).unwrap().shard_count, 1);
- let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers())?;
+ let scheduled = scheduler.schedule_shard(&t1_intent.all_pageservers(), &context)?;
t1_intent.push_secondary(&mut scheduler, scheduled);
assert_eq!(scheduler.nodes.get(&NodeId(1)).unwrap().shard_count, 1);
diff --git a/control_plane/attachment_service/src/schema.rs b/control_plane/attachment_service/src/schema.rs
index 76e4e56a66..ff37d0fe77 100644
--- a/control_plane/attachment_service/src/schema.rs
+++ b/control_plane/attachment_service/src/schema.rs
@@ -22,6 +22,7 @@ diesel::table! {
placement_policy -> Varchar,
splitting -> Int2,
config -> Text,
+ scheduling_policy -> Varchar,
}
}
diff --git a/control_plane/attachment_service/src/service.rs b/control_plane/attachment_service/src/service.rs
index 925910253b..7502d9d186 100644
--- a/control_plane/attachment_service/src/service.rs
+++ b/control_plane/attachment_service/src/service.rs
@@ -8,7 +8,10 @@ use std::{
};
use crate::{
- id_lock_map::IdLockMap, persistence::AbortShardSplitStatus, reconciler::ReconcileError,
+ id_lock_map::IdLockMap,
+ persistence::{AbortShardSplitStatus, TenantFilter},
+ reconciler::ReconcileError,
+ scheduler::ScheduleContext,
};
use anyhow::Context;
use control_plane::storage_controller::{
@@ -20,9 +23,10 @@ use hyper::StatusCode;
use pageserver_api::{
controller_api::{
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
- TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
- TenantDescribeResponseShard, TenantLocateResponse, TenantShardMigrateRequest,
- TenantShardMigrateResponse, UtilizationScore,
+ ShardSchedulingPolicy, TenantCreateResponse, TenantCreateResponseShard,
+ TenantDescribeResponse, TenantDescribeResponseShard, TenantLocateResponse,
+ TenantPolicyRequest, TenantShardMigrateRequest, TenantShardMigrateResponse,
+ UtilizationScore,
},
models::{SecondaryProgress, TenantConfigRequest},
};
@@ -51,7 +55,6 @@ use utils::{
generation::Generation,
http::error::ApiError,
id::{NodeId, TenantId, TimelineId},
- seqwait::SeqWait,
sync::gate::Gate,
};
@@ -66,7 +69,6 @@ use crate::{
IntentState, ObservedState, ObservedStateLocation, ReconcileResult, ReconcileWaitError,
ReconcilerWaiter, TenantState,
},
- Sequence,
};
// For operations that should be quick, like attaching a new tenant
@@ -344,9 +346,15 @@ impl Service {
}
// Populate each tenant's intent state
+ let mut schedule_context = ScheduleContext::default();
for (tenant_shard_id, tenant_state) in tenants.iter_mut() {
+ if tenant_shard_id.shard_number == ShardNumber(0) {
+ // Reset scheduling context each time we advance to the next Tenant
+ schedule_context = ScheduleContext::default();
+ }
+
tenant_state.intent_from_observed(scheduler);
- if let Err(e) = tenant_state.schedule(scheduler) {
+ if let Err(e) = tenant_state.schedule(scheduler, &mut schedule_context) {
// Non-fatal error: we are unable to properly schedule the tenant, perhaps because
// not enough pageservers are available. The tenant may well still be available
// to clients.
@@ -670,7 +678,13 @@ impl Service {
let mut interval = tokio::time::interval(BACKGROUND_RECONCILE_PERIOD);
while !self.cancel.is_cancelled() {
tokio::select! {
- _ = interval.tick() => { self.reconcile_all(); }
+ _ = interval.tick() => {
+ let reconciles_spawned = self.reconcile_all();
+ if reconciles_spawned == 0 {
+ // Run optimizer only when we didn't find any other work to do
+ self.optimize_all();
+ }
+ }
_ = self.cancel.cancelled() => return
}
}
@@ -957,30 +971,14 @@ impl Service {
}
for tsp in tenant_shard_persistence {
let tenant_shard_id = tsp.get_tenant_shard_id()?;
- let shard_identity = tsp.get_shard_identity()?;
+
// We will populate intent properly later in [`Self::startup_reconcile`], initially populate
// it with what we can infer: the node for which a generation was most recently issued.
let mut intent = IntentState::new();
if let Some(generation_pageserver) = tsp.generation_pageserver {
intent.set_attached(&mut scheduler, Some(NodeId(generation_pageserver as u64)));
}
-
- let new_tenant = TenantState {
- tenant_shard_id,
- shard: shard_identity,
- sequence: Sequence::initial(),
- generation: tsp.generation.map(|g| Generation::new(g as u32)),
- policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
- intent,
- observed: ObservedState::new(),
- config: serde_json::from_str(&tsp.config).unwrap(),
- reconciler: None,
- splitting: tsp.splitting,
- waiter: Arc::new(SeqWait::new(Sequence::initial())),
- error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
- last_error: Arc::default(),
- pending_compute_notification: false,
- };
+ let new_tenant = TenantState::from_persistent(tsp, intent)?;
tenants.insert(tenant_shard_id, new_tenant);
}
@@ -1104,6 +1102,8 @@ impl Service {
placement_policy: serde_json::to_string(&PlacementPolicy::Attached(0)).unwrap(),
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
splitting: SplitState::default(),
+ scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
+ .unwrap(),
};
match self.persistence.insert_tenant_shards(vec![tsp]).await {
@@ -1156,9 +1156,10 @@ impl Service {
// when we reattaching a detached tenant.
self.persistence
.update_tenant_shard(
- attach_req.tenant_shard_id,
- PlacementPolicy::Attached(0),
- conf,
+ TenantFilter::Shard(attach_req.tenant_shard_id),
+ Some(PlacementPolicy::Attached(0)),
+ Some(conf),
+ None,
None,
)
.await?;
@@ -1615,6 +1616,8 @@ impl Service {
placement_policy: serde_json::to_string(&placement_policy).unwrap(),
config: serde_json::to_string(&create_req.config).unwrap(),
splitting: SplitState::default(),
+ scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
+ .unwrap(),
})
.collect();
@@ -1637,6 +1640,8 @@ impl Service {
Err(e) => return Err(ApiError::InternalServerError(anyhow::anyhow!(e))),
};
+ let mut schedule_context = ScheduleContext::default();
+
let (waiters, response_shards) = {
let mut locked = self.inner.write().unwrap();
let (nodes, tenants, scheduler) = locked.parts_mut();
@@ -1658,11 +1663,14 @@ impl Service {
// attached and secondary locations (independently) away frorm those
// pageservers also holding a shard for this tenant.
- entry.get_mut().schedule(scheduler).map_err(|e| {
- ApiError::Conflict(format!(
- "Failed to schedule shard {tenant_shard_id}: {e}"
- ))
- })?;
+ entry
+ .get_mut()
+ .schedule(scheduler, &mut schedule_context)
+ .map_err(|e| {
+ ApiError::Conflict(format!(
+ "Failed to schedule shard {tenant_shard_id}: {e}"
+ ))
+ })?;
if let Some(node_id) = entry.get().intent.get_attached() {
let generation = entry
@@ -1690,7 +1698,7 @@ impl Service {
state.generation = initial_generation;
state.config = create_req.config.clone();
- if let Err(e) = state.schedule(scheduler) {
+ if let Err(e) = state.schedule(scheduler, &mut schedule_context) {
schcedule_error = Some(e);
}
@@ -1898,6 +1906,7 @@ impl Service {
// Persist updates
// Ordering: write to the database before applying changes in-memory, so that
// we will not appear time-travel backwards on a restart.
+ let mut schedule_context = ScheduleContext::default();
for ShardUpdate {
tenant_shard_id,
placement_policy,
@@ -1907,10 +1916,11 @@ impl Service {
{
self.persistence
.update_tenant_shard(
- *tenant_shard_id,
- placement_policy.clone(),
- tenant_config.clone(),
+ TenantFilter::Shard(*tenant_shard_id),
+ Some(placement_policy.clone()),
+ Some(tenant_config.clone()),
*generation,
+ None,
)
.await?;
}
@@ -1944,7 +1954,7 @@ impl Service {
shard.generation = Some(generation);
}
- shard.schedule(scheduler)?;
+ shard.schedule(scheduler, &mut schedule_context)?;
let maybe_waiter = self.maybe_reconcile_shard(shard, nodes);
if let Some(waiter) = maybe_waiter {
@@ -1988,7 +1998,13 @@ impl Service {
let config = req.config;
self.persistence
- .update_tenant_config(req.tenant_id, config.clone())
+ .update_tenant_shard(
+ TenantFilter::Tenant(req.tenant_id),
+ None,
+ Some(config.clone()),
+ None,
+ None,
+ )
.await?;
let waiters = {
@@ -2098,7 +2114,7 @@ impl Service {
let scheduler = &locked.scheduler;
// Right now we only perform the operation on a single node without parallelization
// TODO fan out the operation to multiple nodes for better performance
- let node_id = scheduler.schedule_shard(&[])?;
+ let node_id = scheduler.schedule_shard(&[], &ScheduleContext::default())?;
let node = locked
.nodes
.get(&node_id)
@@ -2341,6 +2357,58 @@ impl Service {
Ok(StatusCode::NOT_FOUND)
}
+ /// Naming: this configures the storage controller's policies for a tenant, whereas [`Self::tenant_config_set`] is "set the TenantConfig"
+ /// for a tenant. The TenantConfig is passed through to pageservers, whereas this function modifies
+ /// the tenant's policies (configuration) within the storage controller
+ pub(crate) async fn tenant_update_policy(
+ &self,
+ tenant_id: TenantId,
+ req: TenantPolicyRequest,
+ ) -> Result<(), ApiError> {
+ // We require an exclusive lock, because we are updating persistent and in-memory state
+ let _tenant_lock = self.tenant_op_locks.exclusive(tenant_id).await;
+
+ let TenantPolicyRequest {
+ placement,
+ scheduling,
+ } = req;
+
+ self.persistence
+ .update_tenant_shard(
+ TenantFilter::Tenant(tenant_id),
+ placement.clone(),
+ None,
+ None,
+ scheduling,
+ )
+ .await?;
+
+ let mut schedule_context = ScheduleContext::default();
+ let mut locked = self.inner.write().unwrap();
+ let (nodes, tenants, scheduler) = locked.parts_mut();
+ for (shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
+ if let Some(placement) = &placement {
+ shard.policy = placement.clone();
+
+ tracing::info!(tenant_id=%shard_id.tenant_id, shard_id=%shard_id.shard_slug(),
+ "Updated placement policy to {placement:?}");
+ }
+
+ if let Some(scheduling) = &scheduling {
+ shard.set_scheduling_policy(*scheduling);
+
+ tracing::info!(tenant_id=%shard_id.tenant_id, shard_id=%shard_id.shard_slug(),
+ "Updated scheduling policy to {scheduling:?}");
+ }
+
+ // In case scheduling is being switched back on, try it now.
+ shard.schedule(scheduler, &mut schedule_context).ok();
+ self.maybe_reconcile_shard(shard, nodes);
+ }
+
+ Ok(())
+ }
+
pub(crate) async fn tenant_timeline_create(
&self,
tenant_id: TenantId,
@@ -2798,7 +2866,7 @@ impl Service {
tracing::info!("Restoring parent shard {tenant_shard_id}");
shard.splitting = SplitState::Idle;
- if let Err(e) = shard.schedule(scheduler) {
+ if let Err(e) = shard.schedule(scheduler, &mut ScheduleContext::default()) {
// If this shard can't be scheduled now (perhaps due to offline nodes or
// capacity issues), that must not prevent us rolling back a split. In this
// case it should be eventually scheduled in the background.
@@ -2922,6 +2990,7 @@ impl Service {
)
};
+ let mut schedule_context = ScheduleContext::default();
for child in child_ids {
let mut child_shard = parent_ident;
child_shard.number = child.shard_number;
@@ -2957,7 +3026,7 @@ impl Service {
child_locations.push((child, pageserver, child_shard.stripe_size));
- if let Err(e) = child_state.schedule(scheduler) {
+ if let Err(e) = child_state.schedule(scheduler, &mut schedule_context) {
// This is not fatal, because we've implicitly already got an attached
// location for the child shard. Failure here just means we couldn't
// find a secondary (e.g. because cluster is overloaded).
@@ -3250,6 +3319,10 @@ impl Service {
placement_policy: serde_json::to_string(&policy).unwrap(),
config: serde_json::to_string(&config).unwrap(),
splitting: SplitState::Splitting,
+
+ // Scheduling policies do not carry through to children
+ scheduling_policy: serde_json::to_string(&ShardSchedulingPolicy::default())
+ .unwrap(),
});
}
@@ -3817,6 +3890,7 @@ impl Service {
AvailabilityTransition::ToOffline => {
tracing::info!("Node {} transition to offline", node_id);
let mut tenants_affected: usize = 0;
+
for (tenant_shard_id, tenant_state) in tenants {
if let Some(observed_loc) = tenant_state.observed.locations.get_mut(&node_id) {
// When a node goes offline, we set its observed configuration to None, indicating unknown: we will
@@ -3833,7 +3907,13 @@ impl Service {
if tenant_state.intent.demote_attached(node_id) {
tenant_state.sequence = tenant_state.sequence.next();
- match tenant_state.schedule(scheduler) {
+
+ // TODO: populate a ScheduleContext including all shards in the same tenant_id (only matters
+ // for tenants without secondary locations: if they have a secondary location, then this
+ // schedule() call is just promoting an existing secondary)
+ let mut schedule_context = ScheduleContext::default();
+
+ match tenant_state.schedule(scheduler, &mut schedule_context) {
Err(e) => {
// It is possible that some tenants will become unschedulable when too many pageservers
// go offline: in this case there isn't much we can do other than make the issue observable.
@@ -3884,9 +3964,6 @@ impl Service {
/// Helper for methods that will try and call pageserver APIs for
/// a tenant, such as timeline CRUD: they cannot proceed unless the tenant
/// is attached somewhere.
- ///
- /// TODO: this doesn't actually ensure attached unless the PlacementPolicy is
- /// an attached policy. We should error out if it isn't.
fn ensure_attached_schedule(
&self,
mut locked: std::sync::RwLockWriteGuard<'_, ServiceState>,
@@ -3895,10 +3972,27 @@ impl Service {
let mut waiters = Vec::new();
let (nodes, tenants, scheduler) = locked.parts_mut();
- for (_tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
- shard.schedule(scheduler)?;
+ let mut schedule_context = ScheduleContext::default();
+ for (tenant_shard_id, shard) in tenants.range_mut(TenantShardId::tenant_range(tenant_id)) {
+ shard.schedule(scheduler, &mut schedule_context)?;
+
+ // The shard's policies may not result in an attached location being scheduled: this
+ // is an error because our caller needs it attached somewhere.
+ if shard.intent.get_attached().is_none() {
+ return Err(anyhow::anyhow!(
+ "Tenant {tenant_id} not scheduled to be attached"
+ ));
+ };
+
+ if shard.stably_attached().is_some() {
+ // We do not require the shard to be totally up to date on reconciliation: we just require
+ // that it has been attached on the intended node. Other dirty state such as unattached secondary
+ // locations, or compute hook notifications can be ignored.
+ continue;
+ }
if let Some(waiter) = self.maybe_reconcile_shard(shard, nodes) {
+ tracing::info!("Waiting for shard {tenant_shard_id} to reconcile, in order to ensure it is attached");
waiters.push(waiter);
}
}
@@ -3960,8 +4054,144 @@ impl Service {
let (nodes, tenants, _scheduler) = locked.parts_mut();
let pageservers = nodes.clone();
+ let mut schedule_context = ScheduleContext::default();
+
let mut reconciles_spawned = 0;
- for (_tenant_shard_id, shard) in tenants.iter_mut() {
+ for (tenant_shard_id, shard) in tenants.iter_mut() {
+ if tenant_shard_id.is_zero() {
+ schedule_context = ScheduleContext::default();
+ }
+
+ // Eventual consistency: if an earlier reconcile job failed, and the shard is still
+ // dirty, spawn another rone
+ if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
+ reconciles_spawned += 1;
+ }
+
+ schedule_context.avoid(&shard.intent.all_pageservers());
+ }
+
+ reconciles_spawned
+ }
+
+ /// `optimize` in this context means identifying shards which have valid scheduled locations, but
+ /// could be scheduled somewhere better:
+ /// - Cutting over to a secondary if the node with the secondary is more lightly loaded
+ /// * e.g. after a node fails then recovers, to move some work back to it
+ /// - Cutting over to a secondary if it improves the spread of shard attachments within a tenant
+ /// * e.g. after a shard split, the initial attached locations will all be on the node where
+ /// we did the split, but are probably better placed elsewhere.
+ /// - Creating new secondary locations if it improves the spreading of a sharded tenant
+ /// * e.g. after a shard split, some locations will be on the same node (where the split
+ /// happened), and will probably be better placed elsewhere.
+ ///
+ /// To put it more briefly: whereas the scheduler respects soft constraints in a ScheduleContext at
+ /// the time of scheduling, this function looks for cases where a better-scoring location is available
+ /// according to those same soft constraints.
+ fn optimize_all(&self) -> usize {
+ let mut locked = self.inner.write().unwrap();
+ let (nodes, tenants, scheduler) = locked.parts_mut();
+ let pageservers = nodes.clone();
+
+ let mut schedule_context = ScheduleContext::default();
+
+ let mut reconciles_spawned = 0;
+
+ let mut tenant_shards: Vec<&TenantState> = Vec::new();
+
+ // Limit on how many shards' optmizations each call to this function will execute. Combined
+ // with the frequency of background calls, this acts as an implicit rate limit that runs a small
+ // trickle of optimizations in the background, rather than executing a large number in parallel
+ // when a change occurs.
+ const MAX_OPTIMIZATIONS_PER_PASS: usize = 2;
+
+ let mut work = Vec::new();
+
+ for (tenant_shard_id, shard) in tenants.iter() {
+ if tenant_shard_id.is_zero() {
+ // Reset accumulators on the first shard in a tenant
+ schedule_context = ScheduleContext::default();
+ tenant_shards.clear();
+ }
+
+ if work.len() >= MAX_OPTIMIZATIONS_PER_PASS {
+ break;
+ }
+
+ match shard.get_scheduling_policy() {
+ ShardSchedulingPolicy::Active => {
+ // Ok to do optimization
+ }
+ ShardSchedulingPolicy::Essential
+ | ShardSchedulingPolicy::Pause
+ | ShardSchedulingPolicy::Stop => {
+ // Policy prevents optimizing this shard.
+ continue;
+ }
+ }
+
+ // Accumulate the schedule context for all the shards in a tenant: we must have
+ // the total view of all shards before we can try to optimize any of them.
+ schedule_context.avoid(&shard.intent.all_pageservers());
+ if let Some(attached) = shard.intent.get_attached() {
+ schedule_context.push_attached(*attached);
+ }
+ tenant_shards.push(shard);
+
+ // Once we have seen the last shard in the tenant, proceed to search across all shards
+ // in the tenant for optimizations
+ if shard.shard.number.0 == shard.shard.count.count() - 1 {
+ if tenant_shards.iter().any(|s| s.reconciler.is_some()) {
+ // Do not start any optimizations while another change to the tenant is ongoing: this
+ // is not necessary for correctness, but simplifies operations and implicitly throttles
+ // optimization changes to happen in a "trickle" over time.
+ continue;
+ }
+
+ if tenant_shards.iter().any(|s| {
+ !matches!(s.splitting, SplitState::Idle)
+ || matches!(s.policy, PlacementPolicy::Detached)
+ }) {
+ // Never attempt to optimize a tenant that is currently being split, or
+ // a tenant that is meant to be detached
+ continue;
+ }
+
+ // TODO: optimization calculations are relatively expensive: create some fast-path for
+ // the common idle case (avoiding the search on tenants that we have recently checked)
+
+ for shard in &tenant_shards {
+ if let Some(optimization) =
+ // If idle, maybe ptimize attachments: if a shard has a secondary location that is preferable to
+ // its primary location based on soft constraints, cut it over.
+ shard.optimize_attachment(nodes, &schedule_context)
+ {
+ work.push((shard.tenant_shard_id, optimization));
+ break;
+ } else if let Some(optimization) =
+ // If idle, maybe optimize secondary locations: if a shard has a secondary location that would be
+ // better placed on another node, based on ScheduleContext, then adjust it. This
+ // covers cases like after a shard split, where we might have too many shards
+ // in the same tenant with secondary locations on the node where they originally split.
+ shard.optimize_secondary(scheduler, &schedule_context)
+ {
+ work.push((shard.tenant_shard_id, optimization));
+ break;
+ }
+
+ // TODO: extend this mechanism to prefer attaching on nodes with fewer attached
+ // tenants (i.e. extend schedule state to distinguish attached from secondary counts),
+ // for the total number of attachments on a node (not just within a tenant.)
+ }
+ }
+ }
+
+ for (tenant_shard_id, optimization) in work {
+ let shard = tenants
+ .get_mut(&tenant_shard_id)
+ .expect("We held lock from place we got this ID");
+ shard.apply_optimization(scheduler, optimization);
+
if self.maybe_reconcile_shard(shard, &pageservers).is_some() {
reconciles_spawned += 1;
}
@@ -3970,6 +4200,32 @@ impl Service {
reconciles_spawned
}
+ /// Useful for tests: run whatever work a background [`Self::reconcile_all`] would have done, but
+ /// also wait for any generated Reconcilers to complete. Calling this until it returns zero should
+ /// put the system into a quiescent state where future background reconciliations won't do anything.
+ pub(crate) async fn reconcile_all_now(&self) -> Result {
+ let reconciles_spawned = self.reconcile_all();
+ if reconciles_spawned == 0 {
+ // Only optimize when we are otherwise idle
+ self.optimize_all();
+ }
+
+ let waiters = {
+ let mut waiters = Vec::new();
+ let locked = self.inner.read().unwrap();
+ for (_tenant_shard_id, shard) in locked.tenants.iter() {
+ if let Some(waiter) = shard.get_waiter() {
+ waiters.push(waiter);
+ }
+ }
+ waiters
+ };
+
+ let waiter_count = waiters.len();
+ self.await_waiters(waiters, RECONCILE_TIMEOUT).await?;
+ Ok(waiter_count)
+ }
+
pub async fn shutdown(&self) {
// Note that this already stops processing any results from reconciles: so
// we do not expect that our [`TenantState`] objects will reach a neat
diff --git a/control_plane/attachment_service/src/tenant_state.rs b/control_plane/attachment_service/src/tenant_state.rs
index 83c921dc58..6717b8e178 100644
--- a/control_plane/attachment_service/src/tenant_state.rs
+++ b/control_plane/attachment_service/src/tenant_state.rs
@@ -7,8 +7,9 @@ use std::{
use crate::{
metrics::{self, ReconcileCompleteLabelGroup, ReconcileOutcome},
persistence::TenantShardPersistence,
+ scheduler::{AffinityScore, MaySchedule, ScheduleContext},
};
-use pageserver_api::controller_api::PlacementPolicy;
+use pageserver_api::controller_api::{PlacementPolicy, ShardSchedulingPolicy};
use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig},
shard::{ShardIdentity, TenantShardId},
@@ -116,6 +117,10 @@ pub(crate) struct TenantState {
/// sending it. This is the mechanism by which compute notifications are included in the scope
/// of state that we publish externally in an eventually consistent way.
pub(crate) pending_compute_notification: bool,
+
+ // Support/debug tool: if something is going wrong or flapping with scheduling, this may
+ // be set to a non-active state to avoid making changes while the issue is fixed.
+ scheduling_policy: ShardSchedulingPolicy,
}
#[derive(Default, Clone, Debug, Serialize)]
@@ -246,8 +251,13 @@ impl IntentState {
impl Drop for IntentState {
fn drop(&mut self) {
- // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler
- debug_assert!(self.attached.is_none() && self.secondary.is_empty());
+ // Must clear before dropping, to avoid leaving stale refcounts in the Scheduler.
+ // We do not check this while panicking, to avoid polluting unit test failures or
+ // other assertions with this assertion's output. It's still wrong to leak these,
+ // but if we already have a panic then we don't need to independently flag this case.
+ if !(std::thread::panicking()) {
+ debug_assert!(self.attached.is_none() && self.secondary.is_empty());
+ }
}
}
@@ -292,6 +302,26 @@ pub enum ReconcileWaitError {
Failed(TenantShardId, String),
}
+#[derive(Eq, PartialEq, Debug)]
+pub(crate) struct ReplaceSecondary {
+ old_node_id: NodeId,
+ new_node_id: NodeId,
+}
+
+#[derive(Eq, PartialEq, Debug)]
+pub(crate) struct MigrateAttachment {
+ old_attached_node_id: NodeId,
+ new_attached_node_id: NodeId,
+}
+
+#[derive(Eq, PartialEq, Debug)]
+pub(crate) enum ScheduleOptimization {
+ // Replace one of our secondary locations with a different node
+ ReplaceSecondary(ReplaceSecondary),
+ // Migrate attachment to an existing secondary location
+ MigrateAttachment(MigrateAttachment),
+}
+
impl ReconcilerWaiter {
pub(crate) async fn wait_timeout(&self, timeout: Duration) -> Result<(), ReconcileWaitError> {
tokio::select! {
@@ -370,6 +400,7 @@ impl TenantState {
error_waiter: Arc::new(SeqWait::new(Sequence(0))),
last_error: Arc::default(),
pending_compute_notification: false,
+ scheduling_policy: ShardSchedulingPolicy::default(),
}
}
@@ -425,6 +456,7 @@ impl TenantState {
fn schedule_attached(
&mut self,
scheduler: &mut Scheduler,
+ context: &ScheduleContext,
) -> Result<(bool, NodeId), ScheduleError> {
// No work to do if we already have an attached tenant
if let Some(node_id) = self.intent.attached {
@@ -438,14 +470,33 @@ impl TenantState {
Ok((true, promote_secondary))
} else {
// Pick a fresh node: either we had no secondaries or none were schedulable
- let node_id = scheduler.schedule_shard(&self.intent.secondary)?;
+ let node_id = scheduler.schedule_shard(&self.intent.secondary, context)?;
tracing::debug!("Selected {} as attached", node_id);
self.intent.set_attached(scheduler, Some(node_id));
Ok((true, node_id))
}
}
- pub(crate) fn schedule(&mut self, scheduler: &mut Scheduler) -> Result<(), ScheduleError> {
+ pub(crate) fn schedule(
+ &mut self,
+ scheduler: &mut Scheduler,
+ context: &mut ScheduleContext,
+ ) -> Result<(), ScheduleError> {
+ let r = self.do_schedule(scheduler, context);
+
+ context.avoid(&self.intent.all_pageservers());
+ if let Some(attached) = self.intent.get_attached() {
+ context.push_attached(*attached);
+ }
+
+ r
+ }
+
+ pub(crate) fn do_schedule(
+ &mut self,
+ scheduler: &mut Scheduler,
+ context: &ScheduleContext,
+ ) -> Result<(), ScheduleError> {
// TODO: before scheduling new nodes, check if any existing content in
// self.intent refers to pageservers that are offline, and pick other
// pageservers if so.
@@ -453,6 +504,16 @@ impl TenantState {
// TODO: respect the splitting bit on tenants: if they are currently splitting then we may not
// change their attach location.
+ match self.scheduling_policy {
+ ShardSchedulingPolicy::Active | ShardSchedulingPolicy::Essential => {}
+ ShardSchedulingPolicy::Pause | ShardSchedulingPolicy::Stop => {
+ // Warn to make it obvious why other things aren't happening/working, if we skip scheduling
+ tracing::warn!(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(),
+ "Scheduling is disabled by policy {:?}", self.scheduling_policy);
+ return Ok(());
+ }
+ }
+
// Build the set of pageservers already in use by this tenant, to avoid scheduling
// more work on the same pageservers we're already using.
let mut modified = false;
@@ -479,12 +540,13 @@ impl TenantState {
}
// Should have exactly one attached, and N secondaries
- let (modified_attached, attached_node_id) = self.schedule_attached(scheduler)?;
+ let (modified_attached, attached_node_id) =
+ self.schedule_attached(scheduler, context)?;
modified |= modified_attached;
let mut used_pageservers = vec![attached_node_id];
while self.intent.secondary.len() < secondary_count {
- let node_id = scheduler.schedule_shard(&used_pageservers)?;
+ let node_id = scheduler.schedule_shard(&used_pageservers, context)?;
self.intent.push_secondary(scheduler, node_id);
used_pageservers.push(node_id);
modified = true;
@@ -497,7 +559,7 @@ impl TenantState {
modified = true;
} else if self.intent.secondary.is_empty() {
// Populate secondary by scheduling a fresh node
- let node_id = scheduler.schedule_shard(&[])?;
+ let node_id = scheduler.schedule_shard(&[], context)?;
self.intent.push_secondary(scheduler, node_id);
modified = true;
}
@@ -524,6 +586,167 @@ impl TenantState {
Ok(())
}
+ /// Optimize attachments: if a shard has a secondary location that is preferable to
+ /// its primary location based on soft constraints, switch that secondary location
+ /// to be attached.
+ #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
+ pub(crate) fn optimize_attachment(
+ &self,
+ nodes: &HashMap,
+ schedule_context: &ScheduleContext,
+ ) -> Option {
+ let attached = (*self.intent.get_attached())?;
+ if self.intent.secondary.is_empty() {
+ // We can only do useful work if we have both attached and secondary locations: this
+ // function doesn't schedule new locations, only swaps between attached and secondaries.
+ return None;
+ }
+
+ let current_affinity_score = schedule_context.get_node_affinity(attached);
+ let current_attachment_count = schedule_context.get_node_attachments(attached);
+
+ // Generate score for each node, dropping any un-schedulable nodes.
+ let all_pageservers = self.intent.all_pageservers();
+ let mut scores = all_pageservers
+ .iter()
+ .flat_map(|node_id| {
+ if matches!(
+ nodes
+ .get(node_id)
+ .map(|n| n.may_schedule())
+ .unwrap_or(MaySchedule::No),
+ MaySchedule::No
+ ) {
+ None
+ } else {
+ let affinity_score = schedule_context.get_node_affinity(*node_id);
+ let attachment_count = schedule_context.get_node_attachments(*node_id);
+ Some((*node_id, affinity_score, attachment_count))
+ }
+ })
+ .collect::>();
+
+ // Sort precedence:
+ // 1st - prefer nodes with the lowest total affinity score
+ // 2nd - prefer nodes with the lowest number of attachments in this context
+ // 3rd - if all else is equal, sort by node ID for determinism in tests.
+ scores.sort_by_key(|i| (i.1, i.2, i.0));
+
+ if let Some((preferred_node, preferred_affinity_score, preferred_attachment_count)) =
+ scores.first()
+ {
+ if attached != *preferred_node {
+ // The best alternative must be more than 1 better than us, otherwise we could end
+ // up flapping back next time we're called (e.g. there's no point migrating from
+ // a location with score 1 to a score zero, because on next location the situation
+ // would be the same, but in reverse).
+ if current_affinity_score > *preferred_affinity_score + AffinityScore(1)
+ || current_attachment_count > *preferred_attachment_count + 1
+ {
+ tracing::info!(
+ "Identified optimization: migrate attachment {attached}->{preferred_node} (secondaries {:?})",
+ self.intent.get_secondary()
+ );
+ return Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
+ old_attached_node_id: attached,
+ new_attached_node_id: *preferred_node,
+ }));
+ }
+ } else {
+ tracing::debug!(
+ "Node {} is already preferred (score {:?})",
+ preferred_node,
+ preferred_affinity_score
+ );
+ }
+ }
+
+ // Fall-through: we didn't find an optimization
+ None
+ }
+
+ #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug()))]
+ pub(crate) fn optimize_secondary(
+ &self,
+ scheduler: &Scheduler,
+ schedule_context: &ScheduleContext,
+ ) -> Option {
+ if self.intent.secondary.is_empty() {
+ // We can only do useful work if we have both attached and secondary locations: this
+ // function doesn't schedule new locations, only swaps between attached and secondaries.
+ return None;
+ }
+
+ for secondary in self.intent.get_secondary() {
+ let Some(affinity_score) = schedule_context.nodes.get(secondary) else {
+ // We're already on a node unaffected any affinity constraints,
+ // so we won't change it.
+ continue;
+ };
+
+ // Let the scheduler suggest a node, where it would put us if we were scheduling afresh
+ // This implicitly limits the choice to nodes that are available, and prefers nodes
+ // with lower utilization.
+ let Ok(candidate_node) =
+ scheduler.schedule_shard(&self.intent.all_pageservers(), schedule_context)
+ else {
+ // A scheduling error means we have no possible candidate replacements
+ continue;
+ };
+
+ let candidate_affinity_score = schedule_context
+ .nodes
+ .get(&candidate_node)
+ .unwrap_or(&AffinityScore::FREE);
+
+ // The best alternative must be more than 1 better than us, otherwise we could end
+ // up flapping back next time we're called.
+ if *candidate_affinity_score + AffinityScore(1) < *affinity_score {
+ // If some other node is available and has a lower score than this node, then
+ // that other node is a good place to migrate to.
+ tracing::info!(
+ "Identified optimization: replace secondary {secondary}->{candidate_node} (current secondaries {:?})",
+ self.intent.get_secondary()
+ );
+ return Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
+ old_node_id: *secondary,
+ new_node_id: candidate_node,
+ }));
+ }
+ }
+
+ None
+ }
+
+ pub(crate) fn apply_optimization(
+ &mut self,
+ scheduler: &mut Scheduler,
+ optimization: ScheduleOptimization,
+ ) {
+ metrics::METRICS_REGISTRY
+ .metrics_group
+ .storage_controller_schedule_optimization
+ .inc();
+
+ match optimization {
+ ScheduleOptimization::MigrateAttachment(MigrateAttachment {
+ old_attached_node_id,
+ new_attached_node_id,
+ }) => {
+ self.intent.demote_attached(old_attached_node_id);
+ self.intent
+ .promote_attached(scheduler, new_attached_node_id);
+ }
+ ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
+ old_node_id,
+ new_node_id,
+ }) => {
+ self.intent.remove_secondary(scheduler, old_node_id);
+ self.intent.push_secondary(scheduler, new_node_id);
+ }
+ }
+ }
+
/// Query whether the tenant's observed state for attached node matches its intent state, and if so,
/// yield the node ID. This is appropriate for emitting compute hook notifications: we are checking that
/// the node in question is not only where we intend to attach, but that the tenant is indeed already attached there.
@@ -668,6 +891,19 @@ impl TenantState {
}
}
+ // Pre-checks done: finally check whether we may actually do the work
+ match self.scheduling_policy {
+ ShardSchedulingPolicy::Active
+ | ShardSchedulingPolicy::Essential
+ | ShardSchedulingPolicy::Pause => {}
+ ShardSchedulingPolicy::Stop => {
+ // We only reach this point if there is work to do and we're going to skip
+ // doing it: warn it obvious why this tenant isn't doing what it ought to.
+ tracing::warn!("Skipping reconcile for policy {:?}", self.scheduling_policy);
+ return None;
+ }
+ }
+
// Build list of nodes from which the reconciler should detach
let mut detach = Vec::new();
for node_id in self.observed.locations.keys() {
@@ -804,6 +1040,22 @@ impl TenantState {
})
}
+ /// Get a waiter for any reconciliation in flight, but do not start reconciliation
+ /// if it is not already running
+ pub(crate) fn get_waiter(&self) -> Option {
+ if self.reconciler.is_some() {
+ Some(ReconcilerWaiter {
+ tenant_shard_id: self.tenant_shard_id,
+ seq_wait: self.waiter.clone(),
+ error_seq_wait: self.error_waiter.clone(),
+ error: self.last_error.clone(),
+ seq: self.sequence,
+ })
+ } else {
+ None
+ }
+ }
+
/// Called when a ReconcileResult has been emitted and the service is updating
/// our state: if the result is from a sequence >= my ReconcileHandle, then drop
/// the handle to indicate there is no longer a reconciliation in progress.
@@ -829,6 +1081,40 @@ impl TenantState {
debug_assert!(!self.intent.all_pageservers().contains(&node_id));
}
+ pub(crate) fn set_scheduling_policy(&mut self, p: ShardSchedulingPolicy) {
+ self.scheduling_policy = p;
+ }
+
+ pub(crate) fn get_scheduling_policy(&self) -> &ShardSchedulingPolicy {
+ &self.scheduling_policy
+ }
+
+ pub(crate) fn from_persistent(
+ tsp: TenantShardPersistence,
+ intent: IntentState,
+ ) -> anyhow::Result {
+ let tenant_shard_id = tsp.get_tenant_shard_id()?;
+ let shard_identity = tsp.get_shard_identity()?;
+
+ Ok(Self {
+ tenant_shard_id,
+ shard: shard_identity,
+ sequence: Sequence::initial(),
+ generation: tsp.generation.map(|g| Generation::new(g as u32)),
+ policy: serde_json::from_str(&tsp.placement_policy).unwrap(),
+ intent,
+ observed: ObservedState::new(),
+ config: serde_json::from_str(&tsp.config).unwrap(),
+ reconciler: None,
+ splitting: tsp.splitting,
+ waiter: Arc::new(SeqWait::new(Sequence::initial())),
+ error_waiter: Arc::new(SeqWait::new(Sequence::initial())),
+ last_error: Arc::default(),
+ pending_compute_notification: false,
+ scheduling_policy: serde_json::from_str(&tsp.scheduling_policy).unwrap(),
+ })
+ }
+
pub(crate) fn to_persistent(&self) -> TenantShardPersistence {
TenantShardPersistence {
tenant_id: self.tenant_shard_id.tenant_id.to_string(),
@@ -840,6 +1126,7 @@ impl TenantState {
placement_policy: serde_json::to_string(&self.policy).unwrap(),
config: serde_json::to_string(&self.config).unwrap(),
splitting: SplitState::default(),
+ scheduling_policy: serde_json::to_string(&self.scheduling_policy).unwrap(),
}
}
}
@@ -878,6 +1165,32 @@ pub(crate) mod tests {
)
}
+ fn make_test_tenant(policy: PlacementPolicy, shard_count: ShardCount) -> Vec {
+ let tenant_id = TenantId::generate();
+
+ (0..shard_count.count())
+ .map(|i| {
+ let shard_number = ShardNumber(i);
+
+ let tenant_shard_id = TenantShardId {
+ tenant_id,
+ shard_number,
+ shard_count,
+ };
+ TenantState::new(
+ tenant_shard_id,
+ ShardIdentity::new(
+ shard_number,
+ shard_count,
+ pageserver_api::shard::ShardStripeSize(32768),
+ )
+ .unwrap(),
+ policy.clone(),
+ )
+ })
+ .collect()
+ }
+
/// Test the scheduling behaviors used when a tenant configured for HA is subject
/// to nodes being marked offline.
#[test]
@@ -887,10 +1200,11 @@ pub(crate) mod tests {
let mut nodes = make_test_nodes(3);
let mut scheduler = Scheduler::new(nodes.values());
+ let mut context = ScheduleContext::default();
let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1));
tenant_state
- .schedule(&mut scheduler)
+ .schedule(&mut scheduler, &mut context)
.expect("we have enough nodes, scheduling should work");
// Expect to initially be schedule on to different nodes
@@ -916,7 +1230,7 @@ pub(crate) mod tests {
// Scheduling the node should promote the still-available secondary node to attached
tenant_state
- .schedule(&mut scheduler)
+ .schedule(&mut scheduler, &mut context)
.expect("active nodes are available");
assert_eq!(tenant_state.intent.attached.unwrap(), secondary_node_id);
@@ -980,4 +1294,219 @@ pub(crate) mod tests {
tenant_state.intent.clear(&mut scheduler);
Ok(())
}
+
+ #[test]
+ fn scheduling_mode() -> anyhow::Result<()> {
+ let nodes = make_test_nodes(3);
+ let mut scheduler = Scheduler::new(nodes.values());
+
+ let mut tenant_state = make_test_tenant_shard(PlacementPolicy::Attached(1));
+
+ // In pause mode, schedule() shouldn't do anything
+ tenant_state.scheduling_policy = ShardSchedulingPolicy::Pause;
+ assert!(tenant_state
+ .schedule(&mut scheduler, &mut ScheduleContext::default())
+ .is_ok());
+ assert!(tenant_state.intent.all_pageservers().is_empty());
+
+ // In active mode, schedule() works
+ tenant_state.scheduling_policy = ShardSchedulingPolicy::Active;
+ assert!(tenant_state
+ .schedule(&mut scheduler, &mut ScheduleContext::default())
+ .is_ok());
+ assert!(!tenant_state.intent.all_pageservers().is_empty());
+
+ tenant_state.intent.clear(&mut scheduler);
+ Ok(())
+ }
+
+ #[test]
+ fn optimize_attachment() -> anyhow::Result<()> {
+ let nodes = make_test_nodes(3);
+ let mut scheduler = Scheduler::new(nodes.values());
+
+ let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
+ let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
+
+ // Initially: both nodes attached on shard 1, and both have secondary locations
+ // on different nodes.
+ shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
+ shard_a.intent.push_secondary(&mut scheduler, NodeId(2));
+ shard_b.intent.set_attached(&mut scheduler, Some(NodeId(1)));
+ shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
+
+ let mut schedule_context = ScheduleContext::default();
+ schedule_context.avoid(&shard_a.intent.all_pageservers());
+ schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
+ schedule_context.avoid(&shard_b.intent.all_pageservers());
+ schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
+
+ let optimization_a = shard_a.optimize_attachment(&nodes, &schedule_context);
+
+ // Either shard should recognize that it has the option to switch to a secondary location where there
+ // would be no other shards from the same tenant, and request to do so.
+ assert_eq!(
+ optimization_a,
+ Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
+ old_attached_node_id: NodeId(1),
+ new_attached_node_id: NodeId(2)
+ }))
+ );
+
+ // Note that these optimizing two shards in the same tenant with the same ScheduleContext is
+ // mutually exclusive (the optimization of one invalidates the stats) -- it is the responsibility
+ // of [`Service::optimize_all`] to avoid trying
+ // to do optimizations for multiple shards in the same tenant at the same time. Generating
+ // both optimizations is just done for test purposes
+ let optimization_b = shard_b.optimize_attachment(&nodes, &schedule_context);
+ assert_eq!(
+ optimization_b,
+ Some(ScheduleOptimization::MigrateAttachment(MigrateAttachment {
+ old_attached_node_id: NodeId(1),
+ new_attached_node_id: NodeId(3)
+ }))
+ );
+
+ // Applying these optimizations should result in the end state proposed
+ shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
+ assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(2)));
+ assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(1)]);
+ shard_b.apply_optimization(&mut scheduler, optimization_b.unwrap());
+ assert_eq!(shard_b.intent.get_attached(), &Some(NodeId(3)));
+ assert_eq!(shard_b.intent.get_secondary(), &vec![NodeId(1)]);
+
+ shard_a.intent.clear(&mut scheduler);
+ shard_b.intent.clear(&mut scheduler);
+
+ Ok(())
+ }
+
+ #[test]
+ fn optimize_secondary() -> anyhow::Result<()> {
+ let nodes = make_test_nodes(4);
+ let mut scheduler = Scheduler::new(nodes.values());
+
+ let mut shard_a = make_test_tenant_shard(PlacementPolicy::Attached(1));
+ let mut shard_b = make_test_tenant_shard(PlacementPolicy::Attached(1));
+
+ // Initially: both nodes attached on shard 1, and both have secondary locations
+ // on different nodes.
+ shard_a.intent.set_attached(&mut scheduler, Some(NodeId(1)));
+ shard_a.intent.push_secondary(&mut scheduler, NodeId(3));
+ shard_b.intent.set_attached(&mut scheduler, Some(NodeId(2)));
+ shard_b.intent.push_secondary(&mut scheduler, NodeId(3));
+
+ let mut schedule_context = ScheduleContext::default();
+ schedule_context.avoid(&shard_a.intent.all_pageservers());
+ schedule_context.push_attached(shard_a.intent.get_attached().unwrap());
+ schedule_context.avoid(&shard_b.intent.all_pageservers());
+ schedule_context.push_attached(shard_b.intent.get_attached().unwrap());
+
+ let optimization_a = shard_a.optimize_secondary(&scheduler, &schedule_context);
+
+ // Since there is a node with no locations available, the node with two locations for the
+ // same tenant should generate an optimization to move one away
+ assert_eq!(
+ optimization_a,
+ Some(ScheduleOptimization::ReplaceSecondary(ReplaceSecondary {
+ old_node_id: NodeId(3),
+ new_node_id: NodeId(4)
+ }))
+ );
+
+ shard_a.apply_optimization(&mut scheduler, optimization_a.unwrap());
+ assert_eq!(shard_a.intent.get_attached(), &Some(NodeId(1)));
+ assert_eq!(shard_a.intent.get_secondary(), &vec![NodeId(4)]);
+
+ shard_a.intent.clear(&mut scheduler);
+ shard_b.intent.clear(&mut scheduler);
+
+ Ok(())
+ }
+
+ // Optimize til quiescent: this emulates what Service::optimize_all does, when
+ // called repeatedly in the background.
+ fn optimize_til_idle(
+ nodes: &HashMap,
+ scheduler: &mut Scheduler,
+ shards: &mut [TenantState],
+ ) {
+ let mut loop_n = 0;
+ loop {
+ let mut schedule_context = ScheduleContext::default();
+ let mut any_changed = false;
+
+ for shard in shards.iter() {
+ schedule_context.avoid(&shard.intent.all_pageservers());
+ if let Some(attached) = shard.intent.get_attached() {
+ schedule_context.push_attached(*attached);
+ }
+ }
+
+ for shard in shards.iter_mut() {
+ let optimization = shard.optimize_attachment(nodes, &schedule_context);
+ if let Some(optimization) = optimization {
+ shard.apply_optimization(scheduler, optimization);
+ any_changed = true;
+ break;
+ }
+
+ let optimization = shard.optimize_secondary(scheduler, &schedule_context);
+ if let Some(optimization) = optimization {
+ shard.apply_optimization(scheduler, optimization);
+ any_changed = true;
+ break;
+ }
+ }
+
+ if !any_changed {
+ break;
+ }
+
+ // Assert no infinite loop
+ loop_n += 1;
+ assert!(loop_n < 1000);
+ }
+ }
+
+ /// Test the balancing behavior of shard scheduling: that it achieves a balance, and
+ /// that it converges.
+ #[test]
+ fn optimize_add_nodes() -> anyhow::Result<()> {
+ let nodes = make_test_nodes(4);
+
+ // Only show the scheduler a couple of nodes
+ let mut scheduler = Scheduler::new([].iter());
+ scheduler.node_upsert(nodes.get(&NodeId(1)).unwrap());
+ scheduler.node_upsert(nodes.get(&NodeId(2)).unwrap());
+
+ let mut shards = make_test_tenant(PlacementPolicy::Attached(1), ShardCount::new(4));
+ let mut schedule_context = ScheduleContext::default();
+ for shard in &mut shards {
+ assert!(shard
+ .schedule(&mut scheduler, &mut schedule_context)
+ .is_ok());
+ }
+
+ // We should see equal number of locations on the two nodes.
+ assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 4);
+ assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 4);
+
+ // Add another two nodes: we should see the shards spread out when their optimize
+ // methods are called
+ scheduler.node_upsert(nodes.get(&NodeId(3)).unwrap());
+ scheduler.node_upsert(nodes.get(&NodeId(4)).unwrap());
+ optimize_til_idle(&nodes, &mut scheduler, &mut shards);
+
+ assert_eq!(scheduler.get_node_shard_count(NodeId(1)), 2);
+ assert_eq!(scheduler.get_node_shard_count(NodeId(2)), 2);
+ assert_eq!(scheduler.get_node_shard_count(NodeId(3)), 2);
+ assert_eq!(scheduler.get_node_shard_count(NodeId(4)), 2);
+
+ for shard in shards.iter_mut() {
+ shard.intent.clear(&mut scheduler);
+ }
+
+ Ok(())
+ }
}
diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs
index c5eabc46db..abf815f07a 100644
--- a/control_plane/src/pageserver.rs
+++ b/control_plane/src/pageserver.rs
@@ -389,6 +389,10 @@ impl PageServerNode {
.remove("image_creation_threshold")
.map(|x| x.parse::())
.transpose()?,
+ image_layer_creation_check_threshold: settings
+ .remove("image_layer_creation_check_threshold")
+ .map(|x| x.parse::())
+ .transpose()?,
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.remove("walreceiver_connect_timeout")
@@ -501,6 +505,12 @@ impl PageServerNode {
.map(|x| x.parse::())
.transpose()
.context("Failed to parse 'image_creation_threshold' as non zero integer")?,
+ image_layer_creation_check_threshold: settings
+ .remove("image_layer_creation_check_threshold")
+ .map(|x| x.parse::())
+ .transpose()
+ .context("Failed to parse 'image_creation_check_threshold' as integer")?,
+
pitr_interval: settings.remove("pitr_interval").map(|x| x.to_string()),
walreceiver_connect_timeout: settings
.remove("walreceiver_connect_timeout")
diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs
index e33bd0f486..dcf9e38106 100644
--- a/libs/pageserver_api/src/controller_api.rs
+++ b/libs/pageserver_api/src/controller_api.rs
@@ -42,6 +42,12 @@ pub struct NodeConfigureRequest {
pub scheduling: Option,
}
+#[derive(Serialize, Deserialize)]
+pub struct TenantPolicyRequest {
+ pub placement: Option,
+ pub scheduling: Option,
+}
+
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantLocateResponseShard {
pub shard_id: TenantShardId,
@@ -170,6 +176,32 @@ impl FromStr for NodeAvailability {
}
}
+#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq, Debug)]
+pub enum ShardSchedulingPolicy {
+ // Normal mode: the tenant's scheduled locations may be updated at will, including
+ // for non-essential optimization.
+ Active,
+
+ // Disable optimizations, but permit scheduling when necessary to fulfil the PlacementPolicy.
+ // For example, this still permits a node's attachment location to change to a secondary in
+ // response to a node failure, or to assign a new secondary if a node was removed.
+ Essential,
+
+ // No scheduling: leave the shard running wherever it currently is. Even if the shard is
+ // unavailable, it will not be rescheduled to another node.
+ Pause,
+
+ // No reconciling: we will make no location_conf API calls to pageservers at all. If the
+ // shard is unavailable, it stays that way. If a node fails, this shard doesn't get failed over.
+ Stop,
+}
+
+impl Default for ShardSchedulingPolicy {
+ fn default() -> Self {
+ Self::Active
+ }
+}
+
#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
pub enum NodeSchedulingPolicy {
Active,
diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs
index aad4cc97fc..ad4ca6710d 100644
--- a/libs/pageserver_api/src/models.rs
+++ b/libs/pageserver_api/src/models.rs
@@ -301,6 +301,7 @@ pub struct TenantConfig {
pub heatmap_period: Option,
pub lazy_slru_download: Option,
pub timeline_get_throttle: Option,
+ pub image_layer_creation_check_threshold: Option,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
diff --git a/pageserver/compaction/src/compact_tiered.rs b/pageserver/compaction/src/compact_tiered.rs
index 60fc7ac925..5261746b22 100644
--- a/pageserver/compaction/src/compact_tiered.rs
+++ b/pageserver/compaction/src/compact_tiered.rs
@@ -43,7 +43,8 @@ pub async fn compact_tiered(
fanout: u64,
ctx: &E::RequestContext,
) -> anyhow::Result<()> {
- assert!(fanout >= 2);
+ assert!(fanout >= 1, "fanout needs to be at least 1 but is {fanout}");
+ let exp_base = fanout.max(2);
// Start at L0
let mut current_level_no = 0;
let mut current_level_target_height = target_file_size;
@@ -106,7 +107,7 @@ pub async fn compact_tiered(
break;
}
current_level_no += 1;
- current_level_target_height = current_level_target_height.saturating_mul(fanout);
+ current_level_target_height = current_level_target_height.saturating_mul(exp_base);
}
Ok(())
}
diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs
index 792d9e548d..0806ef0cf4 100644
--- a/pageserver/src/tenant.rs
+++ b/pageserver/src/tenant.rs
@@ -3653,6 +3653,9 @@ pub(crate) mod harness {
heatmap_period: Some(tenant_conf.heatmap_period),
lazy_slru_download: Some(tenant_conf.lazy_slru_download),
timeline_get_throttle: Some(tenant_conf.timeline_get_throttle),
+ image_layer_creation_check_threshold: Some(
+ tenant_conf.image_layer_creation_check_threshold,
+ ),
}
}
}
diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs
index 53a8c97e23..a2bb479f63 100644
--- a/pageserver/src/tenant/config.rs
+++ b/pageserver/src/tenant/config.rs
@@ -57,6 +57,9 @@ pub mod defaults {
// throughputs up to 1GiB/s per timeline.
pub const DEFAULT_MAX_WALRECEIVER_LSN_WAL_LAG: u64 = 1024 * 1024 * 1024;
pub const DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD: &str = "24 hour";
+ // By default ingest enough WAL for two new L0 layers before checking if new image
+ // image layers should be created.
+ pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2;
pub const DEFAULT_INGEST_BATCH_SIZE: u64 = 100;
}
@@ -362,6 +365,10 @@ pub struct TenantConf {
pub lazy_slru_download: bool,
pub timeline_get_throttle: pageserver_api::models::ThrottleConfig,
+
+ // How much WAL must be ingested before checking again whether a new image layer is required.
+ // Expresed in multiples of checkpoint distance.
+ pub image_layer_creation_check_threshold: u8,
}
/// Same as TenantConf, but this struct preserves the information about
@@ -454,6 +461,9 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
pub timeline_get_throttle: Option,
+
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub image_layer_creation_check_threshold: Option,
}
impl TenantConfOpt {
@@ -508,6 +518,9 @@ impl TenantConfOpt {
.timeline_get_throttle
.clone()
.unwrap_or(global_conf.timeline_get_throttle),
+ image_layer_creation_check_threshold: self
+ .image_layer_creation_check_threshold
+ .unwrap_or(global_conf.image_layer_creation_check_threshold),
}
}
}
@@ -548,6 +561,7 @@ impl Default for TenantConf {
heatmap_period: Duration::ZERO,
lazy_slru_download: false,
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
+ image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
}
}
}
@@ -621,6 +635,7 @@ impl From for models::TenantConfig {
heatmap_period: value.heatmap_period.map(humantime),
lazy_slru_download: value.lazy_slru_download,
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
+ image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
}
}
}
diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs
index b7132ee3bf..466d95f46d 100644
--- a/pageserver/src/tenant/storage_layer/delta_layer.rs
+++ b/pageserver/src/tenant/storage_layer/delta_layer.rs
@@ -47,6 +47,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::BytesMut;
use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
+use itertools::Itertools;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
@@ -946,6 +947,34 @@ impl DeltaLayerInner {
Ok(planner.finish())
}
+ fn get_min_read_buffer_size(
+ planned_reads: &[VectoredRead],
+ read_size_soft_max: usize,
+ ) -> usize {
+ let Some(largest_read) = planned_reads.iter().max_by_key(|read| read.size()) else {
+ return read_size_soft_max;
+ };
+
+ let largest_read_size = largest_read.size();
+ if largest_read_size > read_size_soft_max {
+ // If the read is oversized, it should only contain one key.
+ let offenders = largest_read
+ .blobs_at
+ .as_slice()
+ .iter()
+ .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
+ .join(", ");
+ tracing::warn!(
+ "Oversized vectored read ({} > {}) for keys {}",
+ largest_read_size,
+ read_size_soft_max,
+ offenders
+ );
+ }
+
+ largest_read_size
+ }
+
async fn do_reads_and_update_state(
&self,
reads: Vec,
@@ -959,7 +988,8 @@ impl DeltaLayerInner {
.expect("Layer is loaded with max vectored bytes config")
.0
.into();
- let mut buf = Some(BytesMut::with_capacity(max_vectored_read_bytes));
+ let buf_size = Self::get_min_read_buffer_size(&reads, max_vectored_read_bytes);
+ let mut buf = Some(BytesMut::with_capacity(buf_size));
// Note that reads are processed in reverse order (from highest key+lsn).
// This is the order that `ReconstructState` requires such that it can
@@ -986,7 +1016,7 @@ impl DeltaLayerInner {
// We have "lost" the buffer since the lower level IO api
// doesn't return the buffer on error. Allocate a new one.
- buf = Some(BytesMut::with_capacity(max_vectored_read_bytes));
+ buf = Some(BytesMut::with_capacity(buf_size));
continue;
}
@@ -1210,9 +1240,16 @@ impl<'a> pageserver_compaction::interface::CompactionDeltaEntry<'a, Key> for Del
mod test {
use std::collections::BTreeMap;
+ use itertools::MinMaxResult;
+ use rand::prelude::{SeedableRng, SliceRandom, StdRng};
+ use rand::RngCore;
+
use super::*;
use crate::{
- context::DownloadBehavior, task_mgr::TaskKind, tenant::disk_btree::tests::TestDisk,
+ context::DownloadBehavior,
+ task_mgr::TaskKind,
+ tenant::{disk_btree::tests::TestDisk, harness::TenantHarness},
+ DEFAULT_PG_VERSION,
};
/// Construct an index for a fictional delta layer and and then
@@ -1332,4 +1369,229 @@ mod test {
assert_eq!(planned_blobs, expected_blobs);
}
+
+ mod constants {
+ use utils::lsn::Lsn;
+
+ /// Offset used by all lsns in this test
+ pub(super) const LSN_OFFSET: Lsn = Lsn(0x08);
+ /// Number of unique keys including in the test data
+ pub(super) const KEY_COUNT: u8 = 60;
+ /// Max number of different lsns for each key
+ pub(super) const MAX_ENTRIES_PER_KEY: u8 = 20;
+ /// Possible value sizes for each key along with a probability weight
+ pub(super) const VALUE_SIZES: [(usize, u8); 3] = [(100, 2), (1024, 2), (1024 * 1024, 1)];
+ /// Probability that there will be a gap between the current key and the next one (33.3%)
+ pub(super) const KEY_GAP_CHANGES: [(bool, u8); 2] = [(true, 1), (false, 2)];
+ /// The minimum size of a key range in all the generated reads
+ pub(super) const MIN_RANGE_SIZE: i128 = 10;
+ /// The number of ranges included in each vectored read
+ pub(super) const RANGES_COUNT: u8 = 2;
+ /// The number of vectored reads performed
+ pub(super) const READS_COUNT: u8 = 100;
+ /// Soft max size of a vectored read. Will be violated if we have to read keys
+ /// with values larger than the limit
+ pub(super) const MAX_VECTORED_READ_BYTES: usize = 64 * 1024;
+ }
+
+ struct Entry {
+ key: Key,
+ lsn: Lsn,
+ value: Vec,
+ }
+
+ fn generate_entries(rng: &mut StdRng) -> Vec {
+ let mut current_key = Key::MIN;
+
+ let mut entries = Vec::new();
+ for _ in 0..constants::KEY_COUNT {
+ let count = rng.gen_range(1..constants::MAX_ENTRIES_PER_KEY);
+ let mut lsns_iter =
+ std::iter::successors(Some(Lsn(constants::LSN_OFFSET.0 + 0x08)), |lsn| {
+ Some(Lsn(lsn.0 + 0x08))
+ });
+ let mut lsns = Vec::new();
+ while lsns.len() < count as usize {
+ let take = rng.gen_bool(0.5);
+ let lsn = lsns_iter.next().unwrap();
+ if take {
+ lsns.push(lsn);
+ }
+ }
+
+ for lsn in lsns {
+ let size = constants::VALUE_SIZES
+ .choose_weighted(rng, |item| item.1)
+ .unwrap()
+ .0;
+ let mut buf = vec![0; size];
+ rng.fill_bytes(&mut buf);
+
+ entries.push(Entry {
+ key: current_key,
+ lsn,
+ value: buf,
+ })
+ }
+
+ let gap = constants::KEY_GAP_CHANGES
+ .choose_weighted(rng, |item| item.1)
+ .unwrap()
+ .0;
+ if gap {
+ current_key = current_key.add(2);
+ } else {
+ current_key = current_key.add(1);
+ }
+ }
+
+ entries
+ }
+
+ struct EntriesMeta {
+ key_range: Range,
+ lsn_range: Range,
+ index: BTreeMap<(Key, Lsn), Vec>,
+ }
+
+ fn get_entries_meta(entries: &[Entry]) -> EntriesMeta {
+ let key_range = match entries.iter().minmax_by_key(|e| e.key) {
+ MinMaxResult::MinMax(min, max) => min.key..max.key.next(),
+ _ => panic!("More than one entry is always expected"),
+ };
+
+ let lsn_range = match entries.iter().minmax_by_key(|e| e.lsn) {
+ MinMaxResult::MinMax(min, max) => min.lsn..Lsn(max.lsn.0 + 1),
+ _ => panic!("More than one entry is always expected"),
+ };
+
+ let mut index = BTreeMap::new();
+ for entry in entries.iter() {
+ index.insert((entry.key, entry.lsn), entry.value.clone());
+ }
+
+ EntriesMeta {
+ key_range,
+ lsn_range,
+ index,
+ }
+ }
+
+ fn pick_random_keyspace(rng: &mut StdRng, key_range: &Range) -> KeySpace {
+ let start = key_range.start.to_i128();
+ let end = key_range.end.to_i128();
+
+ let mut keyspace = KeySpace::default();
+
+ for _ in 0..constants::RANGES_COUNT {
+ let mut range: Option> = Option::default();
+ while range.is_none() || keyspace.overlaps(range.as_ref().unwrap()) {
+ let range_start = rng.gen_range(start..end);
+ let range_end_offset = range_start + constants::MIN_RANGE_SIZE;
+ if range_end_offset >= end {
+ range = Some(Key::from_i128(range_start)..Key::from_i128(end));
+ } else {
+ let range_end = rng.gen_range((range_start + constants::MIN_RANGE_SIZE)..end);
+ range = Some(Key::from_i128(range_start)..Key::from_i128(range_end));
+ }
+ }
+ keyspace.ranges.push(range.unwrap());
+ }
+
+ keyspace
+ }
+
+ #[tokio::test]
+ async fn test_delta_layer_vectored_read_end_to_end() -> anyhow::Result<()> {
+ let harness = TenantHarness::create("test_delta_layer_oversized_vectored_read")?;
+ let (tenant, ctx) = harness.load().await;
+
+ let timeline_id = TimelineId::generate();
+ let timeline = tenant
+ .create_test_timeline(timeline_id, constants::LSN_OFFSET, DEFAULT_PG_VERSION, &ctx)
+ .await?;
+
+ tracing::info!("Generating test data ...");
+
+ let rng = &mut StdRng::seed_from_u64(0);
+ let entries = generate_entries(rng);
+ let entries_meta = get_entries_meta(&entries);
+
+ tracing::info!("Done generating {} entries", entries.len());
+
+ tracing::info!("Writing test data to delta layer ...");
+ let mut writer = DeltaLayerWriter::new(
+ harness.conf,
+ timeline_id,
+ harness.tenant_shard_id,
+ entries_meta.key_range.start,
+ entries_meta.lsn_range.clone(),
+ )
+ .await?;
+
+ for entry in entries {
+ let (_, res) = writer
+ .put_value_bytes(entry.key, entry.lsn, entry.value, false)
+ .await;
+ res?;
+ }
+
+ let resident = writer.finish(entries_meta.key_range.end, &timeline).await?;
+
+ let inner = resident.get_inner_delta(&ctx).await?;
+
+ let file_size = inner.file.metadata().await?.len();
+ tracing::info!(
+ "Done writing test data to delta layer. Resulting file size is: {}",
+ file_size
+ );
+
+ for i in 0..constants::READS_COUNT {
+ tracing::info!("Doing vectored read {}/{}", i + 1, constants::READS_COUNT);
+
+ let block_reader = FileBlockReader::new(&inner.file, inner.file_id);
+ let index_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
+ inner.index_start_blk,
+ inner.index_root_blk,
+ block_reader,
+ );
+
+ let planner = VectoredReadPlanner::new(constants::MAX_VECTORED_READ_BYTES);
+ let mut reconstruct_state = ValuesReconstructState::new();
+ let keyspace = pick_random_keyspace(rng, &entries_meta.key_range);
+ let data_end_offset = inner.index_start_blk as u64 * PAGE_SZ as u64;
+
+ let vectored_reads = DeltaLayerInner::plan_reads(
+ keyspace.clone(),
+ entries_meta.lsn_range.clone(),
+ data_end_offset,
+ index_reader,
+ planner,
+ &mut reconstruct_state,
+ &ctx,
+ )
+ .await?;
+
+ let vectored_blob_reader = VectoredBlobReader::new(&inner.file);
+ let buf_size = DeltaLayerInner::get_min_read_buffer_size(
+ &vectored_reads,
+ constants::MAX_VECTORED_READ_BYTES,
+ );
+ let mut buf = Some(BytesMut::with_capacity(buf_size));
+
+ for read in vectored_reads {
+ let blobs_buf = vectored_blob_reader
+ .read_blobs(&read, buf.take().expect("Should have a buffer"))
+ .await?;
+ for meta in blobs_buf.blobs.iter() {
+ let value = &blobs_buf.buf[meta.start..meta.end];
+ assert_eq!(value, entries_meta.index[&(meta.meta.key, meta.meta.lsn)]);
+ }
+
+ buf = Some(blobs_buf.buf);
+ }
+ }
+
+ Ok(())
+ }
}
diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs
index 14c79e413c..5b44d2bc2c 100644
--- a/pageserver/src/tenant/storage_layer/image_layer.rs
+++ b/pageserver/src/tenant/storage_layer/image_layer.rs
@@ -44,6 +44,7 @@ use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{Bytes, BytesMut};
use camino::{Utf8Path, Utf8PathBuf};
use hex;
+use itertools::Itertools;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::LayerAccessKind;
use pageserver_api::shard::TenantShardId;
@@ -540,7 +541,25 @@ impl ImageLayerInner {
let vectored_blob_reader = VectoredBlobReader::new(&self.file);
for read in reads.into_iter() {
- let buf = BytesMut::with_capacity(max_vectored_read_bytes);
+ let buf_size = read.size();
+
+ if buf_size > max_vectored_read_bytes {
+ // If the read is oversized, it should only contain one key.
+ let offenders = read
+ .blobs_at
+ .as_slice()
+ .iter()
+ .map(|(_, blob_meta)| format!("{}@{}", blob_meta.key, blob_meta.lsn))
+ .join(", ");
+ tracing::warn!(
+ "Oversized vectored read ({} > {}) for keys {}",
+ buf_size,
+ max_vectored_read_bytes,
+ offenders
+ );
+ }
+
+ let buf = BytesMut::with_capacity(buf_size);
let res = vectored_blob_reader.read_blobs(&read, buf).await;
match res {
diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs
index 8ba37b5a86..27e60f783c 100644
--- a/pageserver/src/tenant/storage_layer/layer.rs
+++ b/pageserver/src/tenant/storage_layer/layer.rs
@@ -1759,6 +1759,18 @@ impl ResidentLayer {
pub(crate) fn metadata(&self) -> LayerFileMetadata {
self.owner.metadata()
}
+
+ #[cfg(test)]
+ pub(crate) async fn get_inner_delta<'a>(
+ &'a self,
+ ctx: &RequestContext,
+ ) -> anyhow::Result<&'a delta_layer::DeltaLayerInner> {
+ let owner = &self.owner.0;
+ match self.downloaded.get(owner, ctx).await? {
+ LayerKind::Delta(d) => Ok(d),
+ LayerKind::Image(_) => Err(anyhow::anyhow!("Expected a delta layer")),
+ }
+ }
}
impl AsLayerDesc for ResidentLayer {
diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs
index bc3fc1df1f..f3565c1fb3 100644
--- a/pageserver/src/tenant/timeline.rs
+++ b/pageserver/src/tenant/timeline.rs
@@ -309,6 +309,8 @@ pub struct Timeline {
/// Configuration: how often should the partitioning be recalculated.
repartition_threshold: u64,
+ last_image_layer_creation_check_at: AtomicLsn,
+
/// Current logical size of the "datadir", at the last LSN.
current_logical_size: LogicalSize,
@@ -1632,6 +1634,15 @@ impl Timeline {
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
}
+ fn get_image_layer_creation_check_threshold(&self) -> u8 {
+ let tenant_conf = self.tenant_conf.read().unwrap().tenant_conf.clone();
+ tenant_conf.image_layer_creation_check_threshold.unwrap_or(
+ self.conf
+ .default_tenant_conf
+ .image_layer_creation_check_threshold,
+ )
+ }
+
pub(super) fn tenant_conf_updated(&self) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
@@ -1769,6 +1780,7 @@ impl Timeline {
},
partitioning: tokio::sync::Mutex::new((KeyPartitioning::new(), Lsn(0))),
repartition_threshold: 0,
+ last_image_layer_creation_check_at: AtomicLsn::new(0),
last_received_wal: Mutex::new(None),
rel_size_cache: RwLock::new(HashMap::new()),
@@ -1797,6 +1809,7 @@ impl Timeline {
};
result.repartition_threshold =
result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE;
+
result
.metrics
.last_record_gauge
@@ -3501,6 +3514,24 @@ impl Timeline {
// Is it time to create a new image layer for the given partition?
async fn time_for_new_image_layer(&self, partition: &KeySpace, lsn: Lsn) -> bool {
+ let last = self.last_image_layer_creation_check_at.load();
+ if lsn != Lsn(0) {
+ let distance = lsn
+ .checked_sub(last)
+ .expect("Attempt to compact with LSN going backwards");
+
+ let min_distance = self.get_image_layer_creation_check_threshold() as u64
+ * self.get_checkpoint_distance();
+
+ // Skip the expensive delta layer counting below if we've not ingested
+ // sufficient WAL since the last check.
+ if distance.0 < min_distance {
+ return false;
+ }
+ }
+
+ self.last_image_layer_creation_check_at.store(lsn);
+
let threshold = self.get_image_creation_threshold();
let guard = self.layers.read().await;
diff --git a/pageserver/src/tenant/vectored_blob_io.rs b/pageserver/src/tenant/vectored_blob_io.rs
index 805f70b23b..3a6950cf88 100644
--- a/pageserver/src/tenant/vectored_blob_io.rs
+++ b/pageserver/src/tenant/vectored_blob_io.rs
@@ -61,7 +61,7 @@ pub struct VectoredRead {
}
impl VectoredRead {
- fn size(&self) -> usize {
+ pub fn size(&self) -> usize {
(self.end - self.start) as usize
}
}
diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c
index e31de3c6b5..1bc8a2e87c 100644
--- a/pgxn/neon/libpagestore.c
+++ b/pgxn/neon/libpagestore.c
@@ -111,6 +111,7 @@ static PageServer page_servers[MAX_SHARDS];
static bool pageserver_flush(shardno_t shard_no);
static void pageserver_disconnect(shardno_t shard_no);
+static void pageserver_disconnect_shard(shardno_t shard_no);
static bool
PagestoreShmemIsValid(void)
@@ -487,9 +488,31 @@ retry:
return ret;
}
-
+/*
+ * Reset prefetch and drop connection to the shard.
+ * It also drops connection to all other shards involved in prefetch.
+ */
static void
pageserver_disconnect(shardno_t shard_no)
+{
+ if (page_servers[shard_no].conn)
+ {
+ /*
+ * If the connection to any pageserver is lost, we throw away the
+ * whole prefetch queue, even for other pageservers. It should not
+ * cause big problems, because connection loss is supposed to be a
+ * rare event.
+ */
+ prefetch_on_ps_disconnect();
+ }
+ pageserver_disconnect_shard(shard_no);
+}
+
+/*
+ * Disconnect from specified shard
+ */
+static void
+pageserver_disconnect_shard(shardno_t shard_no)
{
/*
* If anything goes wrong while we were sending a request, it's not clear
@@ -503,14 +526,6 @@ pageserver_disconnect(shardno_t shard_no)
neon_shard_log(shard_no, LOG, "dropping connection to page server due to error");
PQfinish(page_servers[shard_no].conn);
page_servers[shard_no].conn = NULL;
-
- /*
- * If the connection to any pageserver is lost, we throw away the
- * whole prefetch queue, even for other pageservers. It should not
- * cause big problems, because connection loss is supposed to be a
- * rare event.
- */
- prefetch_on_ps_disconnect();
}
if (page_servers[shard_no].wes != NULL)
{
@@ -676,7 +691,8 @@ page_server_api api =
{
.send = pageserver_send,
.flush = pageserver_flush,
- .receive = pageserver_receive
+ .receive = pageserver_receive,
+ .disconnect = pageserver_disconnect_shard
};
static bool
diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h
index 2889ffacae..44ae766f76 100644
--- a/pgxn/neon/pagestore_client.h
+++ b/pgxn/neon/pagestore_client.h
@@ -180,6 +180,7 @@ typedef struct
bool (*send) (shardno_t shard_no, NeonRequest * request);
NeonResponse *(*receive) (shardno_t shard_no);
bool (*flush) (shardno_t shard_no);
+ void (*disconnect) (shardno_t shard_no);
} page_server_api;
extern void prefetch_on_ps_disconnect(void);
diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c
index 2d222e3c7c..b33cfab2bb 100644
--- a/pgxn/neon/pagestore_smgr.c
+++ b/pgxn/neon/pagestore_smgr.c
@@ -613,6 +613,14 @@ prefetch_on_ps_disconnect(void)
Assert(slot->status == PRFS_REQUESTED);
Assert(slot->my_ring_index == ring_index);
+ /*
+ * Drop connection to all shards which have prefetch requests.
+ * It is not a problem to call disconnect multiple times on the same connection
+ * because disconnect implementation in libpagestore.c will check if connection
+ * is alive and do nothing of connection was already dropped.
+ */
+ page_server->disconnect(slot->shard_no);
+
/* clean up the request */
slot->status = PRFS_TAG_REMAINS;
MyPState->n_requests_inflight -= 1;
@@ -1680,7 +1688,7 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum)
break;
default:
- neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
+ neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_exists", resp->tag);
}
pfree(resp);
return exists;
@@ -2216,7 +2224,7 @@ neon_read_at_lsn(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
((NeonErrorResponse *) resp)->message)));
break;
default:
- neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
+ neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_read_at_lsn", resp->tag);
}
/* buffer was used, clean up for later reuse */
@@ -2489,7 +2497,7 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum)
break;
default:
- neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
+ neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_nblocks", resp->tag);
}
update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks);
@@ -2544,7 +2552,7 @@ neon_dbsize(Oid dbNode)
break;
default:
- neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
+ neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_dbsize", resp->tag);
}
neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes",
@@ -2849,7 +2857,7 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
break;
default:
- neon_log(ERROR, "unexpected response from page server with tag 0x%02x", resp->tag);
+ neon_log(ERROR, "unexpected response from page server with tag 0x%02x in neon_read_slru_segment", resp->tag);
}
pfree(resp);
diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py
index 3d60f9bef5..d0519d3406 100644
--- a/test_runner/fixtures/neon_fixtures.py
+++ b/test_runner/fixtures/neon_fixtures.py
@@ -2116,6 +2116,7 @@ class NeonStorageController(MetricsGetter):
shard_count: Optional[int] = None,
shard_stripe_size: Optional[int] = None,
tenant_config: Optional[Dict[Any, Any]] = None,
+ placement_policy: Optional[str] = None,
):
"""
Use this rather than pageserver_api() when you need to include shard parameters
@@ -2135,6 +2136,8 @@ class NeonStorageController(MetricsGetter):
for k, v in tenant_config.items():
body[k] = v
+ body["placement_policy"] = placement_policy
+
response = self.request(
"POST",
f"{self.env.storage_controller_api}/v1/tenant",
@@ -2193,6 +2196,34 @@ class NeonStorageController(MetricsGetter):
log.info(f"Migrated tenant {tenant_shard_id} to pageserver {dest_ps_id}")
assert self.env.get_tenant_pageserver(tenant_shard_id).id == dest_ps_id
+ def tenant_policy_update(self, tenant_id: TenantId, body: dict[str, Any]):
+ log.info(f"tenant_policy_update({tenant_id}, {body})")
+ self.request(
+ "PUT",
+ f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/policy",
+ json=body,
+ headers=self.headers(TokenScope.ADMIN),
+ )
+
+ def reconcile_all(self):
+ r = self.request(
+ "POST",
+ f"{self.env.storage_controller_api}/debug/v1/reconcile_all",
+ headers=self.headers(TokenScope.ADMIN),
+ )
+ r.raise_for_status()
+ n = r.json()
+ log.info(f"reconcile_all waited for {n} shards")
+ return n
+
+ def reconcile_until_idle(self, timeout_secs=30):
+ start_at = time.time()
+ n = 1
+ while n > 0:
+ n = self.reconcile_all()
+ if time.time() - start_at > timeout_secs:
+ raise RuntimeError("Timeout in reconcile_until_idle")
+
def consistency_check(self):
"""
Throw an exception if the service finds any inconsistencies in its state
diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py
index 3058926b25..909d25980b 100644
--- a/test_runner/regress/test_attach_tenant_config.py
+++ b/test_runner/regress/test_attach_tenant_config.py
@@ -189,6 +189,7 @@ def test_fully_custom_config(positive_env: NeonEnv):
},
"trace_read_requests": True,
"walreceiver_connect_timeout": "13m",
+ "image_layer_creation_check_threshold": 1,
}
ps_http = env.pageserver.http_client()
diff --git a/test_runner/regress/test_layer_eviction.py b/test_runner/regress/test_layer_eviction.py
index 7bbc0cc160..fefb30bbdd 100644
--- a/test_runner/regress/test_layer_eviction.py
+++ b/test_runner/regress/test_layer_eviction.py
@@ -165,6 +165,7 @@ def test_gc_of_remote_layers(neon_env_builder: NeonEnvBuilder):
"compaction_threshold": "3",
# "image_creation_threshold": set at runtime
"compaction_target_size": f"{128 * (1024**2)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
+ "image_layer_creation_check_threshold": "0", # always check if a new image layer can be created
}
def tenant_update_config(changes):
diff --git a/test_runner/regress/test_layers_from_future.py b/test_runner/regress/test_layers_from_future.py
index ca4295c5cb..f311a8bf2c 100644
--- a/test_runner/regress/test_layers_from_future.py
+++ b/test_runner/regress/test_layers_from_future.py
@@ -53,6 +53,7 @@ def test_issue_5878(neon_env_builder: NeonEnvBuilder):
"checkpoint_timeout": "24h", # something we won't reach
"checkpoint_distance": f"{50 * (1024**2)}", # something we won't reach, we checkpoint manually
"image_creation_threshold": "100", # we want to control when image is created
+ "image_layer_creation_check_threshold": "0",
"compaction_threshold": f"{l0_l1_threshold}",
"compaction_target_size": f"{128 * (1024**3)}", # make it so that we only have 1 partition => image coverage for delta layers => enables gc of delta layers
}
diff --git a/test_runner/regress/test_logical_replication.py b/test_runner/regress/test_logical_replication.py
index 3f4ca8070d..1bac528397 100644
--- a/test_runner/regress/test_logical_replication.py
+++ b/test_runner/regress/test_logical_replication.py
@@ -364,3 +364,67 @@ def test_slots_and_branching(neon_simple_env: NeonEnv):
# Check that we can create slot with the same name
ws_cur = ws_branch.connect().cursor()
ws_cur.execute("select pg_create_logical_replication_slot('my_slot', 'pgoutput')")
+
+
+def test_replication_shutdown(neon_simple_env: NeonEnv):
+ # Ensure Postgres can exit without stuck when a replication job is active + neon extension installed
+ env = neon_simple_env
+ env.neon_cli.create_branch("test_replication_shutdown_publisher", "empty")
+ pub = env.endpoints.create("test_replication_shutdown_publisher")
+
+ env.neon_cli.create_branch("test_replication_shutdown_subscriber")
+ sub = env.endpoints.create("test_replication_shutdown_subscriber")
+
+ pub.respec(skip_pg_catalog_updates=False)
+ pub.start()
+
+ sub.respec(skip_pg_catalog_updates=False)
+ sub.start()
+
+ pub.wait_for_migrations()
+ sub.wait_for_migrations()
+
+ with pub.cursor() as cur:
+ cur.execute(
+ "CREATE ROLE mr_whiskers WITH PASSWORD 'cat' LOGIN INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser"
+ )
+ cur.execute("CREATE DATABASE neondb WITH OWNER mr_whiskers")
+ cur.execute("GRANT ALL PRIVILEGES ON DATABASE neondb TO neon_superuser")
+
+ # If we don't do this, creating the subscription will fail later on PG16
+ pub.edit_hba(["host all mr_whiskers 0.0.0.0/0 md5"])
+
+ with sub.cursor() as cur:
+ cur.execute(
+ "CREATE ROLE mr_whiskers WITH PASSWORD 'cat' LOGIN INHERIT CREATEROLE CREATEDB BYPASSRLS REPLICATION IN ROLE neon_superuser"
+ )
+ cur.execute("CREATE DATABASE neondb WITH OWNER mr_whiskers")
+ cur.execute("GRANT ALL PRIVILEGES ON DATABASE neondb TO neon_superuser")
+
+ with pub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as cur:
+ cur.execute("CREATE PUBLICATION pub FOR ALL TABLES")
+ cur.execute("CREATE TABLE t (a int)")
+ cur.execute("INSERT INTO t VALUES (10), (20)")
+ cur.execute("SELECT * from t")
+ res = cur.fetchall()
+ assert [r[0] for r in res] == [10, 20]
+
+ with sub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as cur:
+ cur.execute("CREATE TABLE t (a int)")
+
+ pub_conn = f"host=localhost port={pub.pg_port} dbname=neondb user=mr_whiskers password=cat"
+ query = f"CREATE SUBSCRIPTION sub CONNECTION '{pub_conn}' PUBLICATION pub"
+ log.info(f"Creating subscription: {query}")
+ cur.execute(query)
+
+ with pub.cursor(dbname="neondb", user="mr_whiskers", password="cat") as pcur:
+ pcur.execute("INSERT INTO t VALUES (30), (40)")
+
+ def check_that_changes_propagated():
+ cur.execute("SELECT * FROM t")
+ res = cur.fetchall()
+ log.info(res)
+ assert len(res) == 4
+ assert [r[0] for r in res] == [10, 20, 30, 40]
+
+ wait_until(10, 0.5, check_that_changes_propagated)
diff --git a/test_runner/regress/test_neon_extension.py b/test_runner/regress/test_neon_extension.py
index e31e1cab51..39b4865026 100644
--- a/test_runner/regress/test_neon_extension.py
+++ b/test_runner/regress/test_neon_extension.py
@@ -1,3 +1,4 @@
+import time
from contextlib import closing
from fixtures.log_helper import log
@@ -43,6 +44,12 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
with closing(endpoint_main.connect()) as conn:
with conn.cursor() as cur:
+ cur.execute("SELECT extversion from pg_extension where extname='neon'")
+ # IMPORTANT:
+ # If the version has changed, the test should be updated.
+ # Ensure that the default version is also updated in the neon.control file
+ assert cur.fetchone() == ("1.3",)
+ cur.execute("SELECT * from neon.NEON_STAT_FILE_CACHE")
all_versions = ["1.3", "1.2", "1.1", "1.0"]
current_version = "1.3"
for idx, begin_version in enumerate(all_versions):
@@ -60,3 +67,30 @@ def test_neon_extension_compatibility(neon_env_builder: NeonEnvBuilder):
cur.execute(
f"ALTER EXTENSION neon UPDATE TO '{begin_version}'; -- {target_version}->{begin_version}"
)
+
+
+# Verify that the neon extension can be auto-upgraded to the latest version.
+def test_neon_extension_auto_upgrade(neon_env_builder: NeonEnvBuilder):
+ env = neon_env_builder.init_start()
+ env.neon_cli.create_branch("test_neon_extension_auto_upgrade")
+
+ endpoint_main = env.endpoints.create("test_neon_extension_auto_upgrade")
+ # don't skip pg_catalog updates - it runs CREATE EXTENSION neon
+ endpoint_main.respec(skip_pg_catalog_updates=False)
+ endpoint_main.start()
+
+ with closing(endpoint_main.connect()) as conn:
+ with conn.cursor() as cur:
+ cur.execute("ALTER EXTENSION neon UPDATE TO '1.0';")
+ cur.execute("SELECT extversion from pg_extension where extname='neon'")
+ assert cur.fetchone() == ("1.0",) # Ensure the extension gets downgraded
+
+ endpoint_main.stop()
+ time.sleep(1)
+ endpoint_main.start()
+ time.sleep(1)
+
+ with closing(endpoint_main.connect()) as conn:
+ with conn.cursor() as cur:
+ cur.execute("SELECT extversion from pg_extension where extname='neon'")
+ assert cur.fetchone() != ("1.0",) # Ensure the extension gets upgraded
diff --git a/test_runner/regress/test_ondemand_download.py b/test_runner/regress/test_ondemand_download.py
index 914f068afb..ba0d53704b 100644
--- a/test_runner/regress/test_ondemand_download.py
+++ b/test_runner/regress/test_ondemand_download.py
@@ -568,6 +568,8 @@ def test_compaction_downloads_on_demand_with_image_creation(neon_env_builder: Ne
"image_creation_threshold": 100,
# repartitioning parameter, unused
"compaction_target_size": 128 * 1024**2,
+ # Always check if a new image layer can be created
+ "image_layer_creation_check_threshold": 0,
# pitr_interval and gc_horizon are not interesting because we dont run gc
}
@@ -632,7 +634,8 @@ def test_compaction_downloads_on_demand_with_image_creation(neon_env_builder: Ne
# threshold to expose image creation to downloading all of the needed
# layers -- threshold of 2 would sound more reasonable, but keeping it as 1
# to be less flaky
- env.neon_cli.config_tenant(tenant_id, {"image_creation_threshold": "1"})
+ conf["image_creation_threshold"] = "1"
+ env.neon_cli.config_tenant(tenant_id, {k: str(v) for k, v in conf.items()})
pageserver_http.timeline_compact(tenant_id, timeline_id)
layers = pageserver_http.layer_map_info(tenant_id, timeline_id)
diff --git a/test_runner/regress/test_pageserver_generations.py b/test_runner/regress/test_pageserver_generations.py
index 56b4548b64..41fa03cdf8 100644
--- a/test_runner/regress/test_pageserver_generations.py
+++ b/test_runner/regress/test_pageserver_generations.py
@@ -53,6 +53,7 @@ TENANT_CONF = {
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
+ "image_layer_creation_check_threshold": "0",
}
diff --git a/test_runner/regress/test_remote_storage.py b/test_runner/regress/test_remote_storage.py
index 986d6c4dbf..47200a856e 100644
--- a/test_runner/regress/test_remote_storage.py
+++ b/test_runner/regress/test_remote_storage.py
@@ -245,6 +245,7 @@ def test_remote_storage_upload_queue_retries(
"compaction_period": "0s",
# create image layers eagerly, so that GC can remove some layers
"image_creation_threshold": "1",
+ "image_layer_creation_check_threshold": "0",
}
)
diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py
index 9aebf16c68..2699654f80 100644
--- a/test_runner/regress/test_sharding.py
+++ b/test_runner/regress/test_sharding.py
@@ -146,7 +146,7 @@ def test_sharding_split_smoke(
# 8 shards onto separate pageservers
shard_count = 4
split_shard_count = 8
- neon_env_builder.num_pageservers = split_shard_count
+ neon_env_builder.num_pageservers = split_shard_count * 2
# 1MiB stripes: enable getting some meaningful data distribution without
# writing large quantities of data in this test. The stripe size is given
@@ -174,6 +174,7 @@ def test_sharding_split_smoke(
placement_policy='{"Attached": 1}',
conf=non_default_tenant_config,
)
+
workload = Workload(env, tenant_id, timeline_id, branch_name="main")
workload.init()
@@ -252,6 +253,10 @@ def test_sharding_split_smoke(
# The old parent shards should no longer exist on disk
assert not shards_on_disk(old_shard_ids)
+ # Enough background reconciliations should result in the shards being properly distributed.
+ # Run this before the workload, because its LSN-waiting code presumes stable locations.
+ env.storage_controller.reconcile_until_idle()
+
workload.validate()
workload.churn_rows(256)
@@ -265,27 +270,6 @@ def test_sharding_split_smoke(
pageserver.http_client().timeline_gc(tenant_shard_id, timeline_id, None)
workload.validate()
- migrate_to_pageserver_ids = list(
- set(p.id for p in env.pageservers) - set(pre_split_pageserver_ids)
- )
- assert len(migrate_to_pageserver_ids) == split_shard_count - shard_count
-
- # Migrate shards away from the node where the split happened
- for ps_id in pre_split_pageserver_ids:
- shards_here = [
- tenant_shard_id
- for (tenant_shard_id, pageserver) in all_shards
- if pageserver.id == ps_id
- ]
- assert len(shards_here) == 2
- migrate_shard = shards_here[0]
- destination = migrate_to_pageserver_ids.pop()
-
- log.info(f"Migrating shard {migrate_shard} from {ps_id} to {destination}")
- env.storage_controller.tenant_shard_migrate(migrate_shard, destination)
-
- workload.validate()
-
# Assert on how many reconciles happened during the process. This is something of an
# implementation detail, but it is useful to detect any bugs that might generate spurious
# extra reconcile iterations.
@@ -294,8 +278,9 @@ def test_sharding_split_smoke(
# - shard_count reconciles for the original setup of the tenant
# - shard_count reconciles for detaching the original secondary locations during split
# - split_shard_count reconciles during shard splitting, for setting up secondaries.
- # - shard_count reconciles for the migrations we did to move child shards away from their split location
- expect_reconciles = shard_count * 2 + split_shard_count + shard_count
+ # - shard_count of the child shards will need to fail over to their secondaries
+ # - shard_count of the child shard secondary locations will get moved to emptier nodes
+ expect_reconciles = shard_count * 2 + split_shard_count + shard_count * 2
reconcile_ok = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
)
@@ -343,6 +328,31 @@ def test_sharding_split_smoke(
assert sum(total.values()) == split_shard_count * 2
check_effective_tenant_config()
+ # More specific check: that we are fully balanced. This is deterministic because
+ # the order in which we consider shards for optimization is deterministic, and the
+ # order of preference of nodes is also deterministic (lower node IDs win).
+ log.info(f"total: {total}")
+ assert total == {
+ 1: 1,
+ 2: 1,
+ 3: 1,
+ 4: 1,
+ 5: 1,
+ 6: 1,
+ 7: 1,
+ 8: 1,
+ 9: 1,
+ 10: 1,
+ 11: 1,
+ 12: 1,
+ 13: 1,
+ 14: 1,
+ 15: 1,
+ 16: 1,
+ }
+ log.info(f"attached: {attached}")
+ assert attached == {1: 1, 2: 1, 3: 1, 5: 1, 6: 1, 7: 1, 9: 1, 11: 1}
+
# Ensure post-split pageserver locations survive a restart (i.e. the child shards
# correctly wrote config to disk, and the storage controller responds correctly
# to /re-attach)
@@ -401,6 +411,7 @@ def test_sharding_split_stripe_size(
env.storage_controller.tenant_shard_split(
tenant_id, shard_count=2, shard_stripe_size=new_stripe_size
)
+ env.storage_controller.reconcile_until_idle()
# Check that we ended up with the stripe size that we expected, both on the pageserver
# and in the notifications to compute
@@ -869,6 +880,7 @@ def test_sharding_split_failures(
# Having failed+rolled back, we should be able to split again
# No failures this time; it will succeed
env.storage_controller.tenant_shard_split(tenant_id, shard_count=split_shard_count)
+ env.storage_controller.reconcile_until_idle(timeout_secs=30)
workload.churn_rows(10)
workload.validate()
@@ -922,6 +934,10 @@ def test_sharding_split_failures(
finish_split()
assert_split_done()
+ # Having completed the split, pump the background reconciles to ensure that
+ # the scheduler reaches an idle state
+ env.storage_controller.reconcile_until_idle(timeout_secs=30)
+
env.storage_controller.consistency_check()
diff --git a/test_runner/regress/test_sharding_service.py b/test_runner/regress/test_sharding_service.py
index fc6c137667..5a86e03d2b 100644
--- a/test_runner/regress/test_sharding_service.py
+++ b/test_runner/regress/test_sharding_service.py
@@ -433,10 +433,13 @@ def test_sharding_service_compute_hook(
# Set up fake HTTP notify endpoint
notifications = []
+ handle_params = {"status": 200}
+
def handler(request: Request):
- log.info(f"Notify request: {request}")
+ status = handle_params["status"]
+ log.info(f"Notify request[{status}]: {request}")
notifications.append(request.json)
- return Response(status=200)
+ return Response(status=status)
httpserver.expect_request("/notify", method="PUT").respond_with_handler(handler)
@@ -504,6 +507,24 @@ def test_sharding_service_compute_hook(
wait_until(10, 1, received_split_notification)
+ # If the compute hook is unavailable, that should not block creating a tenant and
+ # creating a timeline. This simulates a control plane refusing to accept notifications
+ handle_params["status"] = 423
+ degraded_tenant_id = TenantId.generate()
+ degraded_timeline_id = TimelineId.generate()
+ env.storage_controller.tenant_create(degraded_tenant_id)
+ env.storage_controller.pageserver_api().timeline_create(
+ PgVersion.NOT_SET, degraded_tenant_id, degraded_timeline_id
+ )
+
+ # Ensure we hit the handler error path
+ env.storage_controller.allowed_errors.append(
+ ".*Failed to notify compute of attached pageserver.*tenant busy.*"
+ )
+ env.storage_controller.allowed_errors.append(".*Reconcile error.*tenant busy.*")
+ assert notifications[-1] is not None
+ assert notifications[-1]["tenant_id"] == str(degraded_tenant_id)
+
env.storage_controller.consistency_check()
@@ -1015,3 +1036,98 @@ def test_sharding_service_re_attach(neon_env_builder: NeonEnvBuilder):
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
)
assert reconciles_after_restart == reconciles_before_restart
+
+
+def test_storage_controller_shard_scheduling_policy(neon_env_builder: NeonEnvBuilder):
+ """
+ Check that emergency hooks for disabling rogue tenants' reconcilers work as expected.
+ """
+ env = neon_env_builder.init_configs()
+ env.start()
+
+ tenant_id = TenantId.generate()
+
+ env.storage_controller.allowed_errors.extend(
+ [
+ # We will intentionally cause reconcile errors
+ ".*Reconcile error.*",
+ # Message from using a scheduling policy
+ ".*Scheduling is disabled by policy.*",
+ ".*Skipping reconcile for policy.*",
+ # Message from a node being offline
+ ".*Call to node .* management API .* failed",
+ ]
+ )
+
+ # Stop pageserver so that reconcile cannot complete
+ env.pageserver.stop()
+
+ env.storage_controller.tenant_create(tenant_id, placement_policy="Detached")
+
+ # Try attaching it: we should see reconciles failing
+ env.storage_controller.tenant_policy_update(
+ tenant_id,
+ {
+ "placement": {"Attached": 0},
+ },
+ )
+
+ def reconcile_errors() -> int:
+ return int(
+ env.storage_controller.get_metric_value(
+ "storage_controller_reconcile_complete_total", filter={"status": "error"}
+ )
+ or 0
+ )
+
+ def reconcile_ok() -> int:
+ return int(
+ env.storage_controller.get_metric_value(
+ "storage_controller_reconcile_complete_total", filter={"status": "ok"}
+ )
+ or 0
+ )
+
+ def assert_errors_gt(n) -> int:
+ e = reconcile_errors()
+ assert e > n
+ return e
+
+ errs = wait_until(10, 1, lambda: assert_errors_gt(0))
+
+ # Try reconciling again, it should fail again
+ with pytest.raises(StorageControllerApiException):
+ env.storage_controller.reconcile_all()
+ errs = wait_until(10, 1, lambda: assert_errors_gt(errs))
+
+ # Configure the tenant to disable reconciles
+ env.storage_controller.tenant_policy_update(
+ tenant_id,
+ {
+ "scheduling": "Stop",
+ },
+ )
+
+ # Try reconciling again, it should not cause an error (silently skip)
+ env.storage_controller.reconcile_all()
+ assert reconcile_errors() == errs
+
+ # Start the pageserver and re-enable reconciles
+ env.pageserver.start()
+ env.storage_controller.tenant_policy_update(
+ tenant_id,
+ {
+ "scheduling": "Active",
+ },
+ )
+
+ def assert_ok_gt(n) -> int:
+ o = reconcile_ok()
+ assert o > n
+ return o
+
+ # We should see a successful reconciliation
+ wait_until(10, 1, lambda: assert_ok_gt(0))
+
+ # And indeed the tenant should be attached
+ assert len(env.pageserver.http_client().tenant_list_locations()["tenant_shards"]) == 1
diff --git a/vendor/postgres-v14 b/vendor/postgres-v14
index 748643b468..a7b4c66156 160000
--- a/vendor/postgres-v14
+++ b/vendor/postgres-v14
@@ -1 +1 @@
-Subproject commit 748643b4683e9fe3b105011a6ba8a687d032cd65
+Subproject commit a7b4c66156bce00afa60e5592d4284ba9e40b4cf
diff --git a/vendor/postgres-v15 b/vendor/postgres-v15
index e7651e79c0..64b8c7bccc 160000
--- a/vendor/postgres-v15
+++ b/vendor/postgres-v15
@@ -1 +1 @@
-Subproject commit e7651e79c0c27fbddc3c724f5b9553222c28e395
+Subproject commit 64b8c7bccc6b77e04795e2d4cf6ad82dc8d987ed
diff --git a/vendor/revisions.json b/vendor/revisions.json
index 3c1b866137..75dc095168 100644
--- a/vendor/revisions.json
+++ b/vendor/revisions.json
@@ -1,5 +1,5 @@
{
"postgres-v16": "3946b2e2ea71d07af092099cb5bcae76a69b90d6",
- "postgres-v15": "e7651e79c0c27fbddc3c724f5b9553222c28e395",
- "postgres-v14": "748643b4683e9fe3b105011a6ba8a687d032cd65"
+ "postgres-v15": "64b8c7bccc6b77e04795e2d4cf6ad82dc8d987ed",
+ "postgres-v14": "a7b4c66156bce00afa60e5592d4284ba9e40b4cf"
}
diff --git a/vm-image-spec.yaml b/vm-image-spec.yaml
index 5b93088303..c760744491 100644
--- a/vm-image-spec.yaml
+++ b/vm-image-spec.yaml
@@ -187,6 +187,14 @@ files:
query: |
select sum(pg_database_size(datname)) as total from pg_database;
+ - metric_name: lfc_approximate_working_set_size
+ type: gauge
+ help: 'Approximate working set size in pages of 8192 bytes'
+ key_labels:
+ values: [approximate_working_set_size]
+ query: |
+ select neon.approximate_working_set_size(false) as approximate_working_set_size;
+
build: |
# Build cgroup-tools
#