mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 17:32:56 +00:00
Move to different function, clears up things a little
This commit is contained in:
@@ -3272,30 +3272,12 @@ impl Service {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_create(
|
||||
async fn tenant_timeline_create_pageservers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
mut create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineCreateResponseStorcon, ApiError> {
|
||||
let safekeepers = create_req.safekeepers.unwrap_or_default();
|
||||
tracing::info!(
|
||||
%safekeepers,
|
||||
"Creating timeline {}/{}",
|
||||
tenant_id,
|
||||
create_req.new_timeline_id,
|
||||
);
|
||||
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineCreate,
|
||||
)
|
||||
.await;
|
||||
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
|
||||
let timeline_id = create_req.new_timeline_id;
|
||||
let create_mode = create_req.mode.clone();
|
||||
|
||||
let timeline_info = self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
) -> Result<TimelineInfo, ApiError> {
|
||||
Ok(self.tenant_remote_mutation(tenant_id, move |mut targets| async move {
|
||||
if targets.0.is_empty() {
|
||||
return Err(ApiError::NotFound(
|
||||
anyhow::anyhow!("Tenant not found").into(),
|
||||
@@ -3411,178 +3393,207 @@ impl Service {
|
||||
}
|
||||
|
||||
Ok(timeline_info)
|
||||
}).await??;
|
||||
let on_sk_fut = async {
|
||||
if safekeepers {
|
||||
let pg_version = timeline_info.pg_version;
|
||||
let start_lsn = match create_mode {
|
||||
models::TimelineCreateRequestMode::Bootstrap { .. } => {
|
||||
timeline_info.last_record_lsn
|
||||
}
|
||||
models::TimelineCreateRequestMode::Branch { .. } => {
|
||||
timeline_info.last_record_lsn
|
||||
}
|
||||
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
|
||||
// Can't do return Err because of async block, must do ? plus unreachable!()
|
||||
Err(ApiError::InternalServerError(anyhow!(
|
||||
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
|
||||
)))?;
|
||||
unreachable!()
|
||||
}
|
||||
};
|
||||
|
||||
// Choose initial set of safekeepers respecting affinity
|
||||
let sks = self.safekeepers_for_new_timeline().await?;
|
||||
let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::<Vec<_>>();
|
||||
// Add timeline to db
|
||||
let timeline_persist = TimelinePersistence {
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
generation: 0,
|
||||
sk_set: sks_persistence.clone(),
|
||||
new_sk_set: sks_persistence.clone(),
|
||||
cplane_notified_generation: 0,
|
||||
status: String::from(TimelineStatus::Creating),
|
||||
};
|
||||
self.persistence.insert_timeline(timeline_persist).await?;
|
||||
// reconcile: create timeline on safekeepers
|
||||
// If quorum is reached, return if we are outside of a specified timeout
|
||||
let sk_persistences = self
|
||||
.persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|p| (p.id, p))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let jwt = self.config.jwt_token.clone().map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
|
||||
let mut members = Vec::new();
|
||||
for sk in sks.iter() {
|
||||
let Some(sk_p) = sk_persistences.get(&(sk.0 as i64)) else {
|
||||
// Can't do return Err because of async block, must do ? plus unreachable!()
|
||||
Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't find persisted entry for safekeeper with id {sk}"
|
||||
)))?;
|
||||
unreachable!()
|
||||
};
|
||||
members.push(SafekeeperId {
|
||||
id: NodeId(sk_p.id as u64),
|
||||
host: sk_p.host.clone(),
|
||||
pg_port: sk_p.port as u16,
|
||||
});
|
||||
}
|
||||
let mut mconf = safekeeper_api::membership::Configuration::empty();
|
||||
mconf.members = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let req = safekeeper_api::models::TimelineCreateRequest {
|
||||
commit_lsn: None,
|
||||
mconf,
|
||||
pg_version,
|
||||
start_lsn,
|
||||
system_id: None,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
wal_seg_size: None,
|
||||
};
|
||||
for sk in sks.iter() {
|
||||
// Unwrap is fine as we already would have returned error above
|
||||
let sk_p = sk_persistences.get(&(sk.0 as i64)).unwrap();
|
||||
let sk_clone = *sk;
|
||||
let base_url = sk_p.base_url();
|
||||
let jwt = jwt.clone();
|
||||
let req = req.clone();
|
||||
joinset.spawn(async move {
|
||||
let client = SafekeeperClient::new(sk_clone, base_url, jwt);
|
||||
// TODO: logging on error, retries
|
||||
client.create_timeline(req).await.map_err(|e| {
|
||||
ApiError::InternalServerError(
|
||||
anyhow::Error::new(e)
|
||||
.context("error creating timeline on safekeeper"),
|
||||
)
|
||||
})
|
||||
});
|
||||
}
|
||||
// After we have built the joinset, we now wait for the tasks to complete,
|
||||
// but with a specified timeout to make sure we return swiftly, either with
|
||||
// a failure or success.
|
||||
const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
let reconcile_deadline =
|
||||
tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
|
||||
|
||||
// Treat the first two tasks to finish differently, mostly when they timeout,
|
||||
// because then we won't have a successful quorum.
|
||||
// For the third task, we don't rely on it succeeding, and we need this to support
|
||||
// continuing operations even if one safekeeper is down.
|
||||
let timeout_or_quorum = tokio::time::timeout_at(reconcile_deadline, async {
|
||||
(
|
||||
joinset.join_next().await.unwrap(),
|
||||
joinset.join_next().await.unwrap(),
|
||||
)
|
||||
})
|
||||
.await;
|
||||
let mut reconcile_results = Vec::new();
|
||||
match timeout_or_quorum {
|
||||
Ok((Ok(res_1), Ok(res_2))) => {
|
||||
reconcile_results.push(res_1);
|
||||
reconcile_results.push(res_2);
|
||||
}
|
||||
Ok((Err(_), Ok(_)) | (_, Err(_))) => {
|
||||
Err(ApiError::InternalServerError(anyhow!(
|
||||
"task was cancelled while reconciling timeline creation"
|
||||
)))?;
|
||||
unreachable!()
|
||||
}
|
||||
Err(_) => {
|
||||
Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't reconcile timeline creation on safekeepers within timeout"
|
||||
)))?;
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
let timeout_or_last = tokio::time::timeout_at(
|
||||
reconcile_deadline,
|
||||
joinset.join_next().map(Option::unwrap),
|
||||
)
|
||||
.await;
|
||||
if let Ok(Ok(res)) = timeout_or_last {
|
||||
reconcile_results.push(res);
|
||||
} else {
|
||||
// No error if cancelled or timed out: we already have feedback from a quorum of safekeepers
|
||||
// TODO: maybe log?
|
||||
}
|
||||
// check now if quorum was reached in reconcile_results
|
||||
let successful = reconcile_results
|
||||
.into_iter()
|
||||
.filter_map(|res| res.ok())
|
||||
.collect::<Vec<_>>();
|
||||
tracing::info!(
|
||||
"Got {} successful results from reconciliation",
|
||||
successful.len()
|
||||
);
|
||||
if successful.len() < 2 {
|
||||
// Failure
|
||||
} else if successful.len() == 3 {
|
||||
// Success, state of timeline is
|
||||
} else if successful.len() == 2 {
|
||||
} else {
|
||||
unreachable!(
|
||||
"unexpected amount of successful reconciliations {}",
|
||||
successful.len()
|
||||
);
|
||||
}
|
||||
// TODO
|
||||
// TODO update database state from "creating" to "created" or something
|
||||
|
||||
// notify cplane about creation
|
||||
// TODO (this should probably be in a function so that the reconciler can use it too)
|
||||
Ok::<_, ApiError>((Some(0), Some(sks)))
|
||||
} else {
|
||||
Ok((None, None))
|
||||
}).await??)
|
||||
}
|
||||
async fn tenant_timeline_create_safekeepers(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
timeline_info: &TimelineInfo,
|
||||
create_mode: models::TimelineCreateRequestMode,
|
||||
) -> Result<(u32, Vec<NodeId>), ApiError> {
|
||||
let timeline_id = timeline_info.timeline_id;
|
||||
let pg_version = timeline_info.pg_version;
|
||||
let start_lsn = match create_mode {
|
||||
models::TimelineCreateRequestMode::Bootstrap { .. } => timeline_info.last_record_lsn,
|
||||
models::TimelineCreateRequestMode::Branch { .. } => timeline_info.last_record_lsn,
|
||||
models::TimelineCreateRequestMode::ImportPgdata { .. } => {
|
||||
// Can't do return Err because of async block, must do ? plus unreachable!()
|
||||
Err(ApiError::InternalServerError(anyhow!(
|
||||
"import pgdata doesn't specify the start lsn, aborting creation on safekeepers"
|
||||
)))?;
|
||||
unreachable!()
|
||||
}
|
||||
};
|
||||
let on_sk_res = on_sk_fut.await;
|
||||
let (safekeepers_generation, safekeepers) = on_sk_res?;
|
||||
|
||||
// Choose initial set of safekeepers respecting affinity
|
||||
let sks = self.safekeepers_for_new_timeline().await?;
|
||||
let sks_persistence = sks.iter().map(|sk| sk.0 as i64).collect::<Vec<_>>();
|
||||
// Add timeline to db
|
||||
let timeline_persist = TimelinePersistence {
|
||||
tenant_id: tenant_id.to_string(),
|
||||
timeline_id: timeline_id.to_string(),
|
||||
generation: 0,
|
||||
sk_set: sks_persistence.clone(),
|
||||
new_sk_set: sks_persistence.clone(),
|
||||
cplane_notified_generation: 0,
|
||||
status: String::from(TimelineStatus::Creating),
|
||||
};
|
||||
self.persistence.insert_timeline(timeline_persist).await?;
|
||||
// reconcile: create timeline on safekeepers
|
||||
// If quorum is reached, return if we are outside of a specified timeout
|
||||
let sk_persistences = self
|
||||
.persistence
|
||||
.list_safekeepers()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|p| (p.id, p))
|
||||
.collect::<HashMap<_, _>>();
|
||||
let jwt = self.config.jwt_token.clone().map(SecretString::from);
|
||||
let mut joinset = JoinSet::new();
|
||||
|
||||
let mut members = Vec::new();
|
||||
for sk in sks.iter() {
|
||||
let Some(sk_p) = sk_persistences.get(&(sk.0 as i64)) else {
|
||||
// Can't do return Err because of async block, must do ? plus unreachable!()
|
||||
Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't find persisted entry for safekeeper with id {sk}"
|
||||
)))?;
|
||||
unreachable!()
|
||||
};
|
||||
members.push(SafekeeperId {
|
||||
id: NodeId(sk_p.id as u64),
|
||||
host: sk_p.host.clone(),
|
||||
pg_port: sk_p.port as u16,
|
||||
});
|
||||
}
|
||||
let mut mconf = safekeeper_api::membership::Configuration::empty();
|
||||
mconf.members = MemberSet::new(members).map_err(ApiError::InternalServerError)?;
|
||||
|
||||
let req = safekeeper_api::models::TimelineCreateRequest {
|
||||
commit_lsn: None,
|
||||
mconf,
|
||||
pg_version,
|
||||
start_lsn,
|
||||
system_id: None,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
wal_seg_size: None,
|
||||
};
|
||||
for sk in sks.iter() {
|
||||
// Unwrap is fine as we already would have returned error above
|
||||
let sk_p = sk_persistences.get(&(sk.0 as i64)).unwrap();
|
||||
let sk_clone = *sk;
|
||||
let base_url = sk_p.base_url();
|
||||
let jwt = jwt.clone();
|
||||
let req = req.clone();
|
||||
joinset.spawn(async move {
|
||||
let client = SafekeeperClient::new(sk_clone, base_url, jwt);
|
||||
// TODO: logging on error, retries
|
||||
client.create_timeline(req).await.map_err(|e| {
|
||||
ApiError::InternalServerError(
|
||||
anyhow::Error::new(e).context("error creating timeline on safekeeper"),
|
||||
)
|
||||
})
|
||||
});
|
||||
}
|
||||
// After we have built the joinset, we now wait for the tasks to complete,
|
||||
// but with a specified timeout to make sure we return swiftly, either with
|
||||
// a failure or success.
|
||||
const SK_CREATE_TIMELINE_RECONCILE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
let reconcile_deadline = tokio::time::Instant::now() + SK_CREATE_TIMELINE_RECONCILE_TIMEOUT;
|
||||
|
||||
// Treat the first two tasks to finish differently, mostly when they timeout,
|
||||
// because then we won't have a successful quorum.
|
||||
// For the third task, we don't rely on it succeeding, and we need this to support
|
||||
// continuing operations even if one safekeeper is down.
|
||||
let timeout_or_quorum = tokio::time::timeout_at(reconcile_deadline, async {
|
||||
(
|
||||
joinset.join_next().await.unwrap(),
|
||||
joinset.join_next().await.unwrap(),
|
||||
)
|
||||
})
|
||||
.await;
|
||||
let mut reconcile_results = Vec::new();
|
||||
match timeout_or_quorum {
|
||||
Ok((Ok(res_1), Ok(res_2))) => {
|
||||
reconcile_results.push(res_1);
|
||||
reconcile_results.push(res_2);
|
||||
}
|
||||
Ok((Err(_), Ok(_)) | (_, Err(_))) => {
|
||||
Err(ApiError::InternalServerError(anyhow!(
|
||||
"task was cancelled while reconciling timeline creation"
|
||||
)))?;
|
||||
unreachable!()
|
||||
}
|
||||
Err(_) => {
|
||||
Err(ApiError::InternalServerError(anyhow!(
|
||||
"couldn't reconcile timeline creation on safekeepers within timeout"
|
||||
)))?;
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
let timeout_or_last =
|
||||
tokio::time::timeout_at(reconcile_deadline, joinset.join_next().map(Option::unwrap))
|
||||
.await;
|
||||
if let Ok(Ok(res)) = timeout_or_last {
|
||||
reconcile_results.push(res);
|
||||
} else {
|
||||
// No error if cancelled or timed out: we already have feedback from a quorum of safekeepers
|
||||
// TODO: maybe log?
|
||||
}
|
||||
// check now if quorum was reached in reconcile_results
|
||||
let successful = reconcile_results
|
||||
.into_iter()
|
||||
.filter_map(|res| res.ok())
|
||||
.collect::<Vec<_>>();
|
||||
tracing::info!(
|
||||
"Got {} successful results from reconciliation",
|
||||
successful.len()
|
||||
);
|
||||
if successful.len() < 2 {
|
||||
// Failure
|
||||
} else if successful.len() == 3 {
|
||||
// Success, state of timeline is
|
||||
} else if successful.len() == 2 {
|
||||
} else {
|
||||
unreachable!(
|
||||
"unexpected amount of successful reconciliations {}",
|
||||
successful.len()
|
||||
);
|
||||
}
|
||||
// TODO
|
||||
// TODO update database state from "creating" to "created" or something
|
||||
|
||||
// notify cplane about creation
|
||||
// TODO (this should probably be in a function so that the reconciler can use it too)
|
||||
Ok((0, sks))
|
||||
}
|
||||
|
||||
pub(crate) async fn tenant_timeline_create(
|
||||
&self,
|
||||
tenant_id: TenantId,
|
||||
create_req: TimelineCreateRequest,
|
||||
) -> Result<TimelineCreateResponseStorcon, ApiError> {
|
||||
let safekeepers = create_req.safekeepers.unwrap_or_default();
|
||||
tracing::info!(
|
||||
%safekeepers,
|
||||
"Creating timeline {}/{}",
|
||||
tenant_id,
|
||||
create_req.new_timeline_id,
|
||||
);
|
||||
|
||||
let _tenant_lock = trace_shared_lock(
|
||||
&self.tenant_op_locks,
|
||||
tenant_id,
|
||||
TenantOperations::TimelineCreate,
|
||||
)
|
||||
.await;
|
||||
failpoint_support::sleep_millis_async!("tenant-create-timeline-shared-lock");
|
||||
let create_mode = create_req.mode.clone();
|
||||
|
||||
let timeline_info = self
|
||||
.tenant_timeline_create_pageservers(tenant_id, create_req)
|
||||
.await?;
|
||||
|
||||
let (safekeepers_generation, safekeepers) = if safekeepers {
|
||||
let res = self
|
||||
.tenant_timeline_create_safekeepers(tenant_id, &timeline_info, create_mode)
|
||||
.await?;
|
||||
(Some(res.0), Some(res.1))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
Ok(TimelineCreateResponseStorcon {
|
||||
timeline_info,
|
||||
safekeepers_generation,
|
||||
|
||||
Reference in New Issue
Block a user