Compare commits

...

3 Commits

Author SHA1 Message Date
Christian Schwarz
053c3cd31a fixup: tests need more allowed_errors 2024-01-24 10:03:36 +01:00
Christian Schwarz
c2ee2952e1 make clippy happy 2024-01-24 09:30:01 +01:00
Christian Schwarz
cad723b458 fix(attachment_service): /attach-hook detachment doesn't survive restart
Before this PR, when we'd use `attachment_service/attach-hook` +
`page_server_mgmt_api/v1/tenant/:tenant/detach` to detach a tenant shard, the
tenant was still in attachments.json with `generation_pageserver=None`.

When we'd then restart attachment_service, the tenant shard id would be
again returned from `/re-attach`.

Admittedly, I haven't fully understood the reconciliation logic yet,
but, this patch seems to do what I need it to do.

This problem is blocking https://github.com/neondatabase/neon/pull/6214
2024-01-24 09:28:11 +01:00
5 changed files with 109 additions and 91 deletions

View File

@@ -239,15 +239,12 @@ impl Persistence {
.await
}
pub(crate) async fn detach(&self, tenant_shard_id: TenantShardId) -> anyhow::Result<()> {
self.mutating_transaction(|locked| {
let Some(shard) = locked.tenants.get_mut(&tenant_shard_id) else {
anyhow::bail!("Tried to increment generation of unknown shard");
};
shard.generation_pageserver = None;
Ok(())
})
.await
pub(crate) async fn detach(
&self,
tenant_shard_id: TenantShardId,
) -> Option<TenantShardPersistence> {
self.mutating_transaction(|locked| locked.tenants.remove(&tenant_shard_id))
.await
}
pub(crate) async fn re_attach(

View File

@@ -389,7 +389,7 @@ impl Reconciler {
// Nothing to do
tracing::info!("Observed configuration already correct.")
}
_ => {
observed_conf => {
// In all cases other than a matching observed configuration, we will
// reconcile this location. This includes locations with different configurations, as well
// as locations with unknown (None) observed state.
@@ -399,6 +399,7 @@ impl Reconciler {
.await?;
wanted_conf.generation = self.generation.into();
tracing::info!("Observed configuration requires update.");
tracing::debug!(?wanted_conf, ?observed_conf, "observed configuration");
self.location_config(node_id, wanted_conf, None).await?;
if let Err(e) = self
.compute_hook

View File

@@ -329,94 +329,110 @@ impl Service {
&self,
attach_req: AttachHookRequest,
) -> anyhow::Result<AttachHookResponse> {
#[derive(Debug)]
enum Mode {
Insert { new: bool, node_id: NodeId },
Detach,
}
// This is a test hook. To enable using it on tenants that were created directly with
// the pageserver API (not via this service), we will auto-create any missing tenant
// shards with default state.
let insert = {
let tenant_shard_id = attach_req.tenant_shard_id;
let mode = {
let locked = self.inner.write().unwrap();
!locked.tenants.contains_key(&attach_req.tenant_shard_id)
if let Some(node_id) = attach_req.node_id {
Mode::Insert {
new: !locked.tenants.contains_key(&attach_req.tenant_shard_id),
node_id,
}
} else {
Mode::Detach
}
};
if insert {
let tsp = TenantShardPersistence {
tenant_id: attach_req.tenant_shard_id.tenant_id.to_string(),
shard_number: attach_req.tenant_shard_id.shard_number.0 as i32,
shard_count: attach_req.tenant_shard_id.shard_count.0 as i32,
shard_stripe_size: 0,
generation: 0,
generation_pageserver: None,
placement_policy: serde_json::to_string(&PlacementPolicy::default()).unwrap(),
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
};
tracing::info!(?mode, "attach-hook start");
match mode {
Mode::Insert { new, node_id } => {
if new {
let tsp = TenantShardPersistence {
tenant_id: tenant_shard_id.tenant_id.to_string(),
shard_number: tenant_shard_id.shard_number.0 as i32,
shard_count: tenant_shard_id.shard_count.0 as i32,
shard_stripe_size: 0,
generation: 0,
generation_pageserver: None,
placement_policy: serde_json::to_string(&PlacementPolicy::default())
.unwrap(),
config: serde_json::to_string(&TenantConfig::default()).unwrap(),
};
self.persistence.insert_tenant_shards(vec![tsp]).await?;
self.persistence.insert_tenant_shards(vec![tsp]).await?;
let mut locked = self.inner.write().unwrap();
locked.tenants.insert(
attach_req.tenant_shard_id,
TenantState::new(
attach_req.tenant_shard_id,
ShardIdentity::unsharded(),
PlacementPolicy::Single,
),
);
let mut locked = self.inner.write().unwrap();
locked.tenants.insert(
tenant_shard_id,
TenantState::new(
tenant_shard_id,
ShardIdentity::unsharded(),
PlacementPolicy::Single,
),
);
}
let new_generation = self
.persistence
.increment_generation(tenant_shard_id, node_id)
.await?;
let mut locked = self.inner.write().unwrap();
let tenant_state = locked
.tenants
.get_mut(&tenant_shard_id)
.expect("Checked for existence above");
tenant_state.generation = new_generation;
tenant_state.intent.attached = Some(node_id);
tracing::info!(
"attach_hook: tenant {} set generation {:?}, pageserver {}",
tenant_shard_id,
tenant_state.generation,
node_id,
);
Ok(AttachHookResponse {
gen: tenant_state.generation.into(),
})
}
Mode::Detach => {
let res = { self.persistence.detach(tenant_shard_id).await };
let mut locked = self.inner.write().unwrap();
let tenant_state = locked.tenants.remove(&tenant_shard_id);
match res {
Some(detached) => {
tracing::info!(
tenant_id = %tenant_shard_id,
ps_id = ?detached.generation_pageserver,
generation = ?detached.generation,
"dropping",
);
assert!(tenant_state.is_some(), "persistence state said it existed");
}
None => {
tracing::info!(
tenant_id = %tenant_shard_id,
"no-op: tenant already has no pageserver");
assert!(
tenant_state.is_none(),
"persistence state said it already doesn't exist"
);
}
}
Ok(AttachHookResponse { gen: None })
}
}
let new_generation = if let Some(req_node_id) = attach_req.node_id {
Some(
self.persistence
.increment_generation(attach_req.tenant_shard_id, req_node_id)
.await?,
)
} else {
self.persistence.detach(attach_req.tenant_shard_id).await?;
None
};
let mut locked = self.inner.write().unwrap();
let tenant_state = locked
.tenants
.get_mut(&attach_req.tenant_shard_id)
.expect("Checked for existence above");
if let Some(new_generation) = new_generation {
tenant_state.generation = new_generation;
}
if let Some(attaching_pageserver) = attach_req.node_id.as_ref() {
tracing::info!(
tenant_id = %attach_req.tenant_shard_id,
ps_id = %attaching_pageserver,
generation = ?tenant_state.generation,
"issuing",
);
} else if let Some(ps_id) = tenant_state.intent.attached {
tracing::info!(
tenant_id = %attach_req.tenant_shard_id,
%ps_id,
generation = ?tenant_state.generation,
"dropping",
);
} else {
tracing::info!(
tenant_id = %attach_req.tenant_shard_id,
"no-op: tenant already has no pageserver");
}
tenant_state.intent.attached = attach_req.node_id;
tracing::info!(
"attach_hook: tenant {} set generation {:?}, pageserver {}",
attach_req.tenant_shard_id,
tenant_state.generation,
// TODO: this is an odd number of 0xf's
attach_req.node_id.unwrap_or(utils::id::NodeId(0xfffffff))
);
Ok(AttachHookResponse {
gen: attach_req
.node_id
.map(|_| tenant_state.generation.into().unwrap()),
})
}
pub(crate) fn inspect(&self, inspect_req: InspectRequest) -> InspectResponse {

View File

@@ -79,7 +79,7 @@ pub(crate) struct IntentState {
pub(crate) secondary: Vec<NodeId>,
}
#[derive(Default, Clone)]
#[derive(Default, Clone, Debug)]
pub(crate) struct ObservedState {
pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
}
@@ -93,7 +93,7 @@ pub(crate) struct ObservedState {
/// what it is (e.g. we failed partway through configuring it)
/// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
/// and that configuration will still be present unless something external interfered.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub(crate) struct ObservedStateLocation {
/// If None, it means we do not know the status of this shard's location on this node, but
/// we know that we might have some state on this node.

View File

@@ -953,6 +953,10 @@ def test_timeline_logical_size_task_priority(neon_env_builder: NeonEnvBuilder):
endpoint.stop()
env.pageserver.tenant_detach(tenant_id)
env.pageserver.allowed_errors.append(
# tenant detach causes this because the underlying attach-hook removes the tenant from attachment_service entirely
".*Dropped remote consistent LSN updates.*",
)
env.pageserver.stop()
env.pageserver.start(
extra_env_vars={