mirror of
https://github.com/neondatabase/neon.git
synced 2026-07-03 12:10:36 +00:00
DNM dirty hacks
This commit is contained in:
@@ -612,6 +612,8 @@ async fn handle_tenant_create(mut req: Request<Body>) -> Result<Response<Body>,
|
||||
})?;
|
||||
}
|
||||
|
||||
locked.save().await.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(
|
||||
StatusCode::OK,
|
||||
TenantCreateResponse {
|
||||
@@ -842,6 +844,17 @@ async fn handle_tenant_shard_split(mut req: Request<Body>) -> Result<Response<Bo
|
||||
))
|
||||
})?;
|
||||
|
||||
tracing::info!(
|
||||
"Split {} into {}",
|
||||
tenant_shard_id,
|
||||
response
|
||||
.new_shards
|
||||
.iter()
|
||||
.map(|s| format!("{:?}", s))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
);
|
||||
|
||||
replacements.insert(*tenant_shard_id, response.new_shards);
|
||||
}
|
||||
|
||||
@@ -863,6 +876,8 @@ async fn handle_tenant_shard_split(mut req: Request<Body>) -> Result<Response<Bo
|
||||
)
|
||||
};
|
||||
|
||||
locked.tenants.remove(&replaced);
|
||||
|
||||
for child in children {
|
||||
let mut child_shard = shard_ident;
|
||||
child_shard.number = child.shard_number;
|
||||
@@ -902,6 +917,8 @@ async fn handle_tenant_shard_split(mut req: Request<Body>) -> Result<Response<Bo
|
||||
}
|
||||
}
|
||||
|
||||
locked.save().await.map_err(ApiError::InternalServerError)?;
|
||||
|
||||
json_response(StatusCode::OK, response)
|
||||
}
|
||||
|
||||
|
||||
@@ -544,6 +544,10 @@ async fn handle_tenant(
|
||||
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
for shard in attachment_service.tenant_locate(tenant_id).await?.shards {
|
||||
println!(
|
||||
"Getting status for {} from {}",
|
||||
shard.shard_id, shard.node_id
|
||||
);
|
||||
let pageserver =
|
||||
PageServerNode::from_env(env, env.get_pageserver_conf(shard.node_id)?);
|
||||
|
||||
@@ -554,8 +558,10 @@ async fn handle_tenant(
|
||||
.tenant_info
|
||||
.current_physical_size
|
||||
.unwrap();
|
||||
|
||||
println!("add_row",);
|
||||
shard_table.add_row([
|
||||
format!("{}", shard.shard_id.shard_number.0),
|
||||
format!("{}", shard.shard_id),
|
||||
format!("{}", shard.node_id.0),
|
||||
format!("{}", size),
|
||||
]);
|
||||
@@ -585,9 +591,19 @@ async fn handle_tenant(
|
||||
let shard_count: u8 = matches.get_one::<u8>("shard-count").cloned().unwrap_or(0);
|
||||
|
||||
let attachment_service = AttachmentService::from_env(env);
|
||||
attachment_service
|
||||
let result = attachment_service
|
||||
.tenant_split(tenant_id, shard_count)
|
||||
.await?;
|
||||
println!(
|
||||
"Split tenant {} into shards {}",
|
||||
tenant_id,
|
||||
result
|
||||
.new_shards
|
||||
.iter()
|
||||
.map(|s| format!("{:?}", s))
|
||||
.collect::<Vec<_>>()
|
||||
.join(",")
|
||||
);
|
||||
}
|
||||
|
||||
Some((sub_name, _)) => bail!("Unexpected tenant subcommand '{}'", sub_name),
|
||||
|
||||
@@ -152,7 +152,7 @@ pub async fn migrate_tenant(
|
||||
|
||||
let cplane = ComputeControlPlane::load(env.clone())?;
|
||||
for (endpoint_name, endpoint) in &cplane.endpoints {
|
||||
if endpoint.tenant_id == tenant_shard_id.tenant_id {
|
||||
if endpoint.tenant_id == tenant_shard_id.tenant_id && endpoint.status() == "running" {
|
||||
println!(
|
||||
"🔁 Reconfiguring endpoint {} to use pageserver {}",
|
||||
endpoint_name, dest_ps.conf.id
|
||||
@@ -178,19 +178,24 @@ pub async fn migrate_tenant(
|
||||
continue;
|
||||
}
|
||||
|
||||
// Downgrade to a secondary location
|
||||
let secondary_conf = build_location_config(
|
||||
LocationConfigMode::Secondary,
|
||||
None,
|
||||
Some(LocationConfigSecondary { warm: true }),
|
||||
);
|
||||
// // Downgrade to a secondary location
|
||||
// let secondary_conf = build_location_config(
|
||||
// LocationConfigMode::Secondary,
|
||||
// None,
|
||||
// Some(LocationConfigSecondary { warm: true }),
|
||||
// );
|
||||
|
||||
println!(
|
||||
"💤 Switching to secondary mode on pageserver {}",
|
||||
other_ps.conf.id
|
||||
);
|
||||
// println!(
|
||||
// "💤 Switching to secondary mode on pageserver {}",
|
||||
// other_ps.conf.id
|
||||
// );
|
||||
// other_ps
|
||||
// .location_config(tenant_shard_id, secondary_conf, None)
|
||||
// .await?;
|
||||
let detached_conf = build_location_config(LocationConfigMode::Detached, None, None);
|
||||
println!("💤 Detaching on pageserver {}", other_ps.conf.id);
|
||||
other_ps
|
||||
.location_config(tenant_shard_id, secondary_conf, None)
|
||||
.location_config(tenant_shard_id, detached_conf, None)
|
||||
.await?;
|
||||
}
|
||||
|
||||
|
||||
@@ -405,13 +405,20 @@ impl PageServerHandler {
|
||||
// shards (e.g. during splitting when the compute is not yet aware of the split), the tenant
|
||||
// that we look up here may not be the one that serves all the actual requests: we will double
|
||||
// check the mapping of key->shard later before calling into Timeline for getpage requests.
|
||||
let tenant = mgr::get_active_tenant_with_timeout(
|
||||
let tenant = match mgr::get_active_tenant_with_timeout(
|
||||
tenant_id,
|
||||
ShardSelector::First,
|
||||
ACTIVE_TENANT_TIMEOUT,
|
||||
&task_mgr::shutdown_token(),
|
||||
)
|
||||
.await?;
|
||||
.await
|
||||
{
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
tracing::warn!("Error at start of handle_pagerequests: {}", e);
|
||||
return Err(e.into());
|
||||
}
|
||||
};
|
||||
|
||||
// Make request tracer if needed
|
||||
let mut tracer = if tenant.get_trace_read_requests() {
|
||||
@@ -426,9 +433,18 @@ impl PageServerHandler {
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
let timeline = tenant
|
||||
.get_timeline(timeline_id, true)
|
||||
.map_err(|e| anyhow::anyhow!(e))?;
|
||||
let timeline = match tenant.get_timeline(timeline_id, true) {
|
||||
Ok(t) => t,
|
||||
Err(e) => {
|
||||
tracing::warn!("Error getting timeline: {}", e);
|
||||
return Err(QueryError::Other(anyhow::anyhow!(e)));
|
||||
}
|
||||
};
|
||||
|
||||
tracing::info!(
|
||||
"handle_pagerequests: got timeline {}",
|
||||
timeline.tenant_shard_id
|
||||
);
|
||||
|
||||
// Avoid starting new requests if the timeline has already started shutting down,
|
||||
// and block timeline shutdown until this request is complete, or drops out due
|
||||
@@ -815,6 +831,10 @@ impl PageServerHandler {
|
||||
|
||||
let key = rel_block_to_key(req.rel, req.blkno);
|
||||
let page = if timeline.get_shard_identity().is_key_local(&key) {
|
||||
tracing::debug!(
|
||||
"handle_get_page_at_lsn: using shard {}",
|
||||
timeline.tenant_shard_id
|
||||
);
|
||||
timeline
|
||||
.get_rel_page_at_lsn(req.rel, req.blkno, lsn, req.latest, ctx)
|
||||
.await?
|
||||
@@ -851,6 +871,11 @@ impl PageServerHandler {
|
||||
Err(e) => return Err(e.into()),
|
||||
};
|
||||
|
||||
tracing::debug!(
|
||||
"handle_get_page_at_lsn: using shard {}",
|
||||
timeline.tenant_shard_id
|
||||
);
|
||||
|
||||
// Take a GateGuard for the duration of this request. If we were using our main Timeline object,
|
||||
// the GateGuard was already held over the whole connection.
|
||||
let _timeline_guard = timeline.gate.enter().map_err(|_| QueryError::Shutdown)?;
|
||||
|
||||
@@ -1533,6 +1533,7 @@ impl Tenant {
|
||||
})?;
|
||||
|
||||
if active_only && !timeline.is_active() {
|
||||
tracing::warn!("Timeline {} is not active", timeline.timeline_id);
|
||||
Err(GetTimelineError::NotActive {
|
||||
tenant_id: self.tenant_shard_id.tenant_id,
|
||||
timeline_id,
|
||||
|
||||
@@ -903,10 +903,15 @@ impl Timeline {
|
||||
background_jobs_can_start: Option<&completion::Barrier>,
|
||||
ctx: &RequestContext,
|
||||
) {
|
||||
tracing::info!("activate 1");
|
||||
self.spawn_initial_logical_size_computation_task(ctx);
|
||||
tracing::info!("activate 2");
|
||||
self.launch_wal_receiver(ctx, broker_client);
|
||||
tracing::info!("activate 3");
|
||||
self.set_state(TimelineState::Active);
|
||||
tracing::info!("activate 4");
|
||||
self.launch_eviction_task(background_jobs_can_start);
|
||||
tracing::info!("activate 5");
|
||||
}
|
||||
|
||||
/// Graceful shutdown, may do a lot of I/O as we flush any open layers to disk and then
|
||||
|
||||
Reference in New Issue
Block a user