Merge pull request #11051 from neondatabase/vlad/release-8005-and-cherry-picks

Storage release 2025-02-28
This commit is contained in:
Vlad Lazar
2025-02-28 19:40:33 +00:00
committed by GitHub
6 changed files with 43 additions and 22 deletions

View File

@@ -165,6 +165,8 @@ pub struct NeonStorageControllerConf {
#[serde(with = "humantime_serde")]
pub long_reconcile_threshold: Option<Duration>,
pub load_safekeepers: bool,
}
impl NeonStorageControllerConf {
@@ -188,6 +190,7 @@ impl Default for NeonStorageControllerConf {
max_secondary_lag_bytes: None,
heartbeat_interval: Self::DEFAULT_HEARTBEAT_INTERVAL,
long_reconcile_threshold: None,
load_safekeepers: true,
}
}
}

View File

@@ -537,6 +537,10 @@ impl StorageController {
args.push("--start-as-candidate".to_string());
}
if self.config.load_safekeepers {
args.push("--load-safekeepers".to_string());
}
if let Some(private_key) = &self.private_key {
let claims = Claims::new(None, Scope::PageServerApi);
let jwt_token =

View File

@@ -473,21 +473,15 @@ async fn wait_for_active_tenant(
}
let mut update_rx = tenant.subscribe_for_state_updates();
loop {
tokio::select! {
_ = cancel.cancelled() => return ControlFlow::Break(()),
result = update_rx.changed() => if result.is_err() {
tokio::select! {
result = update_rx.wait_for(|s| s == &TenantState::Active) => {
if result.is_err() {
return ControlFlow::Break(());
}
}
match &*update_rx.borrow() {
TenantState::Active => {
debug!("Tenant state changed to active, continuing the task loop");
return ControlFlow::Continue(());
}
state => debug!("Not running the task loop, tenant is not active: {state:?}"),
}
debug!("Tenant state changed to active, continuing the task loop");
ControlFlow::Continue(())
},
_ = cancel.cancelled() => ControlFlow::Break(()),
}
}

View File

@@ -138,6 +138,10 @@ struct Cli {
// Flag to use https for requests to pageserver API.
#[arg(long, default_value = "false")]
use_https_pageserver_api: bool,
/// Whether to load safekeeprs from the database and heartbeat them
#[arg(long, default_value = "false")]
load_safekeepers: bool,
}
enum StrictMode {
@@ -350,6 +354,7 @@ async fn async_main() -> anyhow::Result<()> {
start_as_candidate: args.start_as_candidate,
http_service_port: args.listen.port() as i32,
use_https_pageserver_api: args.use_https_pageserver_api,
load_safekeepers: args.load_safekeepers,
};
// Validate that we can connect to the database

View File

@@ -389,6 +389,8 @@ pub struct Config {
pub long_reconcile_threshold: Duration,
pub use_https_pageserver_api: bool,
pub load_safekeepers: bool,
}
impl From<DatabaseError> for ApiError {
@@ -1405,15 +1407,20 @@ impl Service {
.set(nodes.len() as i64);
tracing::info!("Loading safekeepers from database...");
let safekeepers = persistence
.list_safekeepers()
.await?
.into_iter()
.map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new()))
.collect::<Vec<_>>();
let safekeepers = if config.load_safekeepers {
persistence
.list_safekeepers()
.await?
.into_iter()
.map(|skp| Safekeeper::from_persistence(skp, CancellationToken::new()))
.collect::<Vec<_>>()
} else {
tracing::info!("Skipping safekeeper loading");
Default::default()
};
let safekeepers: HashMap<NodeId, Safekeeper> =
safekeepers.into_iter().map(|n| (n.get_id(), n)).collect();
tracing::info!("Loaded {} safekeepers from database.", safekeepers.len());
tracing::info!("Loading shards from database...");
let mut tenant_shard_persistence = persistence.load_active_tenant_shards().await?;
@@ -8022,7 +8029,8 @@ impl Service {
) -> Result<(), DatabaseError> {
let node_id = NodeId(record.id as u64);
self.persistence.safekeeper_upsert(record.clone()).await?;
{
if self.config.load_safekeepers {
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
match safekeepers.entry(node_id) {
@@ -8054,7 +8062,7 @@ impl Service {
.await?;
let node_id = NodeId(id as u64);
// After the change has been persisted successfully, update the in-memory state
{
if self.config.load_safekeepers {
let mut locked = self.inner.write().unwrap();
let mut safekeepers = (*locked.safekeepers).clone();
let sk = safekeepers

View File

@@ -1128,6 +1128,13 @@ class NeonEnv:
if self.storage_controller_config is not None:
cfg["storage_controller"] = self.storage_controller_config
# Disable new storcon flag in compat tests
if config.test_may_use_compatibility_snapshot_binaries:
if "storage_controller" in cfg:
cfg["storage_controller"]["load_safekeepers"] = False
else:
cfg["storage_controller"] = {"load_safekeepers": False}
# Create config for pageserver
http_auth_type = "NeonJWT" if config.auth_enabled else "Trust"
pg_auth_type = "NeonJWT" if config.auth_enabled else "Trust"