review cleanup

This commit is contained in:
Dmitry Rodionov
2022-06-27 13:41:19 +03:00
committed by Dmitry Rodionov
parent e1e24336b7
commit d9d4ef12c3
4 changed files with 18 additions and 35 deletions

View File

@@ -282,7 +282,7 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
drop(index_accessor);
// download index parts for every tenant timeline
let remote_timelines = match try_download_tenant_index(state, tenant_id).await {
let remote_timelines = match gather_tenant_timelines_index_parts(state, tenant_id).await {
Ok(Some(remote_timelines)) => remote_timelines,
Ok(None) => return Err(ApiError::NotFound("Unknown remote tenant".to_string())),
Err(e) => {
@@ -323,19 +323,23 @@ async fn tenant_attach_handler(request: Request<Body>) -> Result<Response<Body>,
json_response(StatusCode::ACCEPTED, ())
}
async fn try_download_tenant_index(
/// Note: is expensive from s3 access perspective,
/// for details see comment to `storage_sync::gather_tenant_timelines_index_parts`
async fn gather_tenant_timelines_index_parts(
state: &State,
tenant_id: ZTenantId,
) -> anyhow::Result<Option<Vec<(ZTimelineId, RemoteTimeline)>>> {
let index_parts = match state.remote_storage.as_ref() {
Some(GenericRemoteStorage::Local(local_storage)) => {
storage_sync::download_tenant_index_parts(state.conf, local_storage, tenant_id).await
storage_sync::gather_tenant_timelines_index_parts(state.conf, local_storage, tenant_id)
.await
}
// FIXME here s3 storage contains its own limits, that are separate from sync storage thread ones
// because it is a different instance. We can move this limit to some global static
// or use one instance everywhere.
Some(GenericRemoteStorage::S3(s3_storage)) => {
storage_sync::download_tenant_index_parts(state.conf, s3_storage, tenant_id).await
storage_sync::gather_tenant_timelines_index_parts(state.conf, s3_storage, tenant_id)
.await
}
None => return Ok(None),
}

View File

@@ -191,7 +191,7 @@ use metrics::{
use utils::zid::{ZTenantId, ZTenantTimelineId, ZTimelineId};
use self::download::download_index_parts;
pub use self::download::download_tenant_index_parts;
pub use self::download::gather_tenant_timelines_index_parts;
pub use self::download::TEMP_DOWNLOAD_EXTENSION;
lazy_static! {

View File

@@ -71,7 +71,10 @@ where
index_parts
}
pub async fn download_tenant_index_parts<P, S>(
/// Note: The function is rather expensive from s3 access point of view, it will execute ceil(N/1000) + N requests.
/// At least one request to obtain a list of tenant timelines (more requests is there are more than 1000 timelines).
/// And then will attempt to download all index files that belong to these timelines.
pub async fn gather_tenant_timelines_index_parts<P, S>(
conf: &'static PageServerConf,
storage: &S,
tenant_id: ZTenantId,

View File

@@ -401,31 +401,7 @@ pub fn get_local_timeline_with_load(
}
pub fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> anyhow::Result<()> {
// shutdown the timeline threads (this shuts down the walreceiver)
// FIXME it does not shut down wal receiver
// Things needed to be done
// *. check no ancestors
// *. remove from repo map
// *. remove from global tenant timelines map
// -- no new connections can see the timeline
// *. shutdown threads
// *. join walreceiver (any flushing thread?)
// *. delete files while ensuring that no gc or compaction is in progress
// 7. should we checkpoint before detach? That can be harmful during relocation,
// because it will upload to s3 something that other pageserver didnt see
// TODO put falpoints at every step. Iterate over failpoints
// in detach test and check that timeline is either attached or detached
// verify with a try to start a compute
// TODO adjust remote_index
// what is harder, write whole tenant detach correctly, or fix the timeline based one.
// TODO bail on active page_service threads?
// TODO what about inprogress downloads or uploads?
// can it be idempotent?
// FAILPOINTS: broken repo.detach_timeline
// broken wal_receiver
// broken rmdir
// shutdown the timeline tasks (this shuts down the walreceiver)
let (sender, receiver) = std::sync::mpsc::channel::<()>();
tenants_state::try_send_timeline_update(LocalTimelineUpdate::Detach {
@@ -433,12 +409,12 @@ pub fn delete_timeline(tenant_id: ZTenantId, timeline_id: ZTimelineId) -> anyhow
join_confirmation_sender: sender,
});
info!("waiting for wal receiver to shutdown");
debug!("waiting for wal receiver to shutdown");
let _ = receiver.recv();
info!("wal receiver shutdown confirmed");
info!("waiting for threads to shutdown");
debug!("wal receiver shutdown confirmed");
debug!("waiting for threads to shutdown");
thread_mgr::shutdown_threads(None, None, Some(timeline_id));
info!("thread shutdown completed");
debug!("thread shutdown completed");
match tenants_state::write_tenants().get_mut(&tenant_id) {
Some(tenant) => {
tenant