mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-14 19:50:38 +00:00
Compare commits
3 Commits
wp-mref
...
problame/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
053c3cd31a | ||
|
|
c2ee2952e1 | ||
|
|
cad723b458 |
@@ -239,15 +239,12 @@ impl Persistence {
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
|
||||
self.mutating_transaction(|locked| {
|
||||
let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
|
||||
anyhow::bail!("Tried to increment generation of unknown shard");
|
||||
};
|
||||
shard.generation_pageserver = None;
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
pub(crate) async fn detach(
|
||||
&self,
|
||||
tenant_shard_id: TenantShardId,
|
||||
) -> Option<TenantShardPersistence> {
|
||||
self.mutating_transaction(|locked| locked.tenants.remove(&tenant_shard_id))
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn re_attach(
|
||||
|
||||
@@ -389,7 +389,7 @@ impl Reconciler {
|
||||
// Nothing to do
|
||||
tracing::info!("Observed configuration already correct.")
|
||||
}
|
||||
_ => {
|
||||
observed_conf => {
|
||||
// In all cases other than a matching observed configuration, we will
|
||||
// reconcile this location. This includes locations with different configurations, as well
|
||||
// as locations with unknown (None) observed state.
|
||||
@@ -399,6 +399,7 @@ impl Reconciler {
|
||||
.await?;
|
||||
wanted_conf.generation = self.generation.into();
|
||||
tracing::info!("Observed configuration requires update.");
|
||||
tracing::debug!(?wanted_conf, ?observed_conf, "observed configuration");
|
||||
self.location_config(node_id, wanted_conf, None).await?;
|
||||
if let Err(e) = self
|
||||
.compute_hook
|
||||
|
||||
@@ -329,94 +329,110 @@ impl Service {
|
||||
&self,
|
||||
attach_req: AttachHookRequest,
|
||||
) -> anyhow::Result<AttachHookResponse> {
|
||||
#[derive(Debug)]
|
||||
enum Mode {
|
||||
Insert { new: bool, node_id: NodeId },
|
||||
Detach,
|
||||
}
|
||||
|
||||
// This is a test hook. To enable using it on tenants that were created directly with
|
||||
// the pageserver API (not via this service), we will auto-create any missing tenant
|
||||
// shards with default state.
|
||||
let insert = {
|
||||
let tenant_shard_id = attach_req.tenant_shard_id;
|
||||
let mode = {
|
||||
let locked = self.inner.write().unwrap();
|
||||
!locked.tenants.contains_key(&attach_req.tenant_shard_id)
|
||||
if let Some(node_id) = attach_req.node_id {
|
||||
Mode::Insert {
|
||||
new: !locked.tenants.contains_key(&attach_req.tenant_shard_id),
|
||||
node_id,
|
||||
}
|
||||
} else {
|
||||
Mode::Detach
|
||||
}
|
||||
};
|
||||
|
||||
if insert {
|
||||
let tsp = TenantShardPersistence {
|
||||
tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
|
||||
shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
|
||||
shard_count: attach_req.tenant_shard_id.shard_count.0 as i32,
|
||||
shard_stripe_size: 0,
|
||||
generation: 0,
|
||||
generation_pageserver: None,
|
||||
placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
|
||||
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
|
||||
};
|
||||
tracing::info!(?mode, "attach-hook start");
|
||||
match mode {
|
||||
Mode::Insert { new, node_id } => {
|
||||
if new {
|
||||
let tsp = TenantShardPersistence {
|
||||
tenant_id: tenant_shard_id.tenant_id.to_string(),
|
||||
shard_number: tenant_shard_id.shard_number.0 as i32,
|
||||
shard_count: tenant_shard_id.shard_count.0 as i32,
|
||||
shard_stripe_size: 0,
|
||||
generation: 0,
|
||||
generation_pageserver: None,
|
||||
placement_policy: serde_json::to_string(&PlacementPolicy::default())
|
||||
.unwrap(),
|
||||
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
|
||||
};
|
||||
|
||||
self.persistence.insert_tenant_shards(vec![tsp]).await?;
|
||||
self.persistence.insert_tenant_shards(vec![tsp]).await?;
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
locked.tenants.insert(
|
||||
attach_req.tenant_shard_id,
|
||||
TenantState::new(
|
||||
attach_req.tenant_shard_id,
|
||||
ShardIdentity::unsharded(),
|
||||
PlacementPolicy::Single,
|
||||
),
|
||||
);
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
locked.tenants.insert(
|
||||
tenant_shard_id,
|
||||
TenantState::new(
|
||||
tenant_shard_id,
|
||||
ShardIdentity::unsharded(),
|
||||
PlacementPolicy::Single,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
let new_generation = self
|
||||
.persistence
|
||||
.increment_generation(tenant_shard_id, node_id)
|
||||
.await?;
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let tenant_state = locked
|
||||
.tenants
|
||||
.get_mut(&tenant_shard_id)
|
||||
.expect("Checked for existence above");
|
||||
tenant_state.generation = new_generation;
|
||||
tenant_state.intent.attached = Some(node_id);
|
||||
|
||||
tracing::info!(
|
||||
"attach_hook: tenant {} set generation {:?}, pageserver {}",
|
||||
tenant_shard_id,
|
||||
tenant_state.generation,
|
||||
node_id,
|
||||
);
|
||||
|
||||
Ok(AttachHookResponse {
|
||||
gen: tenant_state.generation.into(),
|
||||
})
|
||||
}
|
||||
Mode::Detach => {
|
||||
let res = { self.persistence.detach(tenant_shard_id).await };
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let tenant_state = locked.tenants.remove(&tenant_shard_id);
|
||||
match res {
|
||||
Some(detached) => {
|
||||
tracing::info!(
|
||||
tenant_id = %tenant_shard_id,
|
||||
ps_id = ?detached.generation_pageserver,
|
||||
generation = ?detached.generation,
|
||||
"dropping",
|
||||
);
|
||||
assert!(tenant_state.is_some(), "persistence state said it existed");
|
||||
}
|
||||
None => {
|
||||
tracing::info!(
|
||||
tenant_id = %tenant_shard_id,
|
||||
"no-op: tenant already has no pageserver");
|
||||
assert!(
|
||||
tenant_state.is_none(),
|
||||
"persistence state said it already doesn't exist"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(AttachHookResponse { gen: None })
|
||||
}
|
||||
}
|
||||
|
||||
let new_generation = if let Some(req_node_id) = attach_req.node_id {
|
||||
Some(
|
||||
self.persistence
|
||||
.increment_generation(attach_req.tenant_shard_id, req_node_id)
|
||||
.await?,
|
||||
)
|
||||
} else {
|
||||
self.persistence.detach(attach_req.tenant_shard_id).await?;
|
||||
None
|
||||
};
|
||||
|
||||
let mut locked = self.inner.write().unwrap();
|
||||
let tenant_state = locked
|
||||
.tenants
|
||||
.get_mut(&attach_req.tenant_shard_id)
|
||||
.expect("Checked for existence above");
|
||||
|
||||
if let Some(new_generation) = new_generation {
|
||||
tenant_state.generation = new_generation;
|
||||
}
|
||||
|
||||
if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_shard_id,
|
||||
ps_id = %attaching_pageserver,
|
||||
generation = ?tenant_state.generation,
|
||||
"issuing",
|
||||
);
|
||||
} else if let Some(ps_id) = tenant_state.intent.attached {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_shard_id,
|
||||
%ps_id,
|
||||
generation = ?tenant_state.generation,
|
||||
"dropping",
|
||||
);
|
||||
} else {
|
||||
tracing::info!(
|
||||
tenant_id = %attach_req.tenant_shard_id,
|
||||
"no-op: tenant already has no pageserver");
|
||||
}
|
||||
tenant_state.intent.attached = attach_req.node_id;
|
||||
|
||||
tracing::info!(
|
||||
"attach_hook: tenant {} set generation {:?}, pageserver {}",
|
||||
attach_req.tenant_shard_id,
|
||||
tenant_state.generation,
|
||||
// TODO: this is an odd number of 0xf's
|
||||
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
|
||||
);
|
||||
|
||||
Ok(AttachHookResponse {
|
||||
gen: attach_req
|
||||
.node_id
|
||||
.map(|_| tenant_state.generation.into().unwrap()),
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn inspect(&self, inspect_req: InspectRequest) -> InspectResponse {
|
||||
|
||||
@@ -79,7 +79,7 @@ pub(crate) struct IntentState {
|
||||
pub(crate) secondary: Vec<NodeId>,
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
#[derive(Default, Clone, Debug)]
|
||||
pub(crate) struct ObservedState {
|
||||
pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
|
||||
}
|
||||
@@ -93,7 +93,7 @@ pub(crate) struct ObservedState {
|
||||
/// what it is (e.g. we failed partway through configuring it)
|
||||
/// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
|
||||
/// and that configuration will still be present unless something external interfered.
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub(crate) struct ObservedStateLocation {
|
||||
/// If None, it means we do not know the status of this shard's location on this node, but
|
||||
/// we know that we might have some state on this node.
|
||||
|
||||
@@ -953,6 +953,10 @@ def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder):
|
||||
|
||||
endpoint.stop()
|
||||
env.pageserver.tenant_detach(tenant_id)
|
||||
env.pageserver.allowed_errors.append(
|
||||
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
|
||||
".*Dropped remote consistent LSN updates.*",
|
||||
)
|
||||
env.pageserver.stop()
|
||||
env.pageserver.start(
|
||||
extra_env_vars={
|
||||
|
||||
Reference in New Issue
Block a user