diff --git a/pageserver/ctl/src/draw_timeline_dir.rs b/pageserver/ctl/src/draw_timeline_dir.rs index 4dff8af1fc..389519c65a 100644 --- a/pageserver/ctl/src/draw_timeline_dir.rs +++ b/pageserver/ctl/src/draw_timeline_dir.rs @@ -52,7 +52,6 @@ use anyhow::{Context, Result}; use pageserver::repository::Key; -use pageserver::METADATA_FILE_NAME; use std::cmp::Ordering; use std::io::{self, BufRead}; use std::path::PathBuf; @@ -159,10 +158,6 @@ pub fn main() -> Result<()> { let line = PathBuf::from_str(&line).unwrap(); let filename = line.file_name().unwrap(); let filename = filename.to_str().unwrap(); - if filename == METADATA_FILE_NAME { - // Don't try and parse "metadata" like a key-lsn range - continue; - } let (key_range, lsn_range) = parse_filename(filename); files.push(Layer { filename: filename.to_owned(), diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 49f8a41b37..c0099aa704 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -383,7 +383,7 @@ fn start_pageserver( let shutdown_pageserver = tokio_util::sync::CancellationToken::new(); // Set up remote storage client - let remote_storage = Some(create_remote_storage_client(conf)?); + let remote_storage = create_remote_storage_client(conf)?; // Set up deletion queue let (deletion_queue, deletion_workers) = DeletionQueue::new( @@ -516,16 +516,12 @@ fn start_pageserver( } }); - let secondary_controller = if let Some(remote_storage) = &remote_storage { - secondary::spawn_tasks( - tenant_manager.clone(), - remote_storage.clone(), - background_jobs_barrier.clone(), - shutdown_pageserver.clone(), - ) - } else { - secondary::null_controller() - }; + let secondary_controller = secondary::spawn_tasks( + tenant_manager.clone(), + remote_storage.clone(), + background_jobs_barrier.clone(), + shutdown_pageserver.clone(), + ); // shared state between the disk-usage backed eviction background task and the http endpoint // that allows triggering disk-usage based eviction manually. note that the http endpoint @@ -533,15 +529,13 @@ fn start_pageserver( // been configured. let disk_usage_eviction_state: Arc = Arc::default(); - if let Some(remote_storage) = &remote_storage { - launch_disk_usage_global_eviction_task( - conf, - remote_storage.clone(), - disk_usage_eviction_state.clone(), - tenant_manager.clone(), - background_jobs_barrier.clone(), - )?; - } + launch_disk_usage_global_eviction_task( + conf, + remote_storage.clone(), + disk_usage_eviction_state.clone(), + tenant_manager.clone(), + background_jobs_barrier.clone(), + )?; // Start up the service to handle HTTP mgmt API request. We created the // listener earlier already. @@ -693,14 +687,7 @@ fn start_pageserver( // Right now that tree doesn't reach very far, and `task_mgr` is used instead. // The plan is to change that over time. shutdown_pageserver.take(); - let bg_remote_storage = remote_storage.clone(); - let bg_deletion_queue = deletion_queue.clone(); - pageserver::shutdown_pageserver( - &tenant_manager, - bg_remote_storage.map(|_| bg_deletion_queue), - 0, - ) - .await; + pageserver::shutdown_pageserver(&tenant_manager, deletion_queue.clone(), 0).await; unreachable!() }) } diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index c937309d83..8790a9b0a8 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -632,7 +632,7 @@ impl DeletionQueue { /// /// If remote_storage is None, then the returned workers will also be None. pub fn new( - remote_storage: Option, + remote_storage: GenericRemoteStorage, control_plane_client: Option, conf: &'static PageServerConf, ) -> (Self, Option>) @@ -658,23 +658,6 @@ impl DeletionQueue { // longer to flush after Tenants have all been torn down. let cancel = CancellationToken::new(); - let remote_storage = match remote_storage { - None => { - return ( - Self { - client: DeletionQueueClient { - tx, - executor_tx, - lsn_table: lsn_table.clone(), - }, - cancel, - }, - None, - ) - } - Some(r) => r, - }; - ( Self { client: DeletionQueueClient { @@ -765,7 +748,7 @@ mod test { /// Simulate a pageserver restart by destroying and recreating the deletion queue async fn restart(&mut self) { let (deletion_queue, workers) = DeletionQueue::new( - Some(self.storage.clone()), + self.storage.clone(), Some(self.mock_control_plane.clone()), self.harness.conf, ); @@ -875,7 +858,7 @@ mod test { let mock_control_plane = MockControlPlane::new(); let (deletion_queue, worker) = DeletionQueue::new( - Some(storage.clone()), + storage.clone(), Some(mock_control_plane.clone()), harness.conf, ); diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 2370561756..0a98d32f02 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -104,7 +104,7 @@ pub struct State { tenant_manager: Arc, auth: Option>, allowlist_routes: Vec, - remote_storage: Option, + remote_storage: GenericRemoteStorage, broker_client: storage_broker::BrokerClientChannel, disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, @@ -118,7 +118,7 @@ impl State { conf: &'static PageServerConf, tenant_manager: Arc, auth: Option>, - remote_storage: Option, + remote_storage: GenericRemoteStorage, broker_client: storage_broker::BrokerClientChannel, disk_usage_eviction_state: Arc, deletion_queue_client: DeletionQueueClient, @@ -813,12 +813,6 @@ async fn tenant_attach_handler( let generation = get_request_generation(state, maybe_body.as_ref().and_then(|r| r.generation))?; - if state.remote_storage.is_none() { - return Err(ApiError::BadRequest(anyhow!( - "attach_tenant is not possible because pageserver was configured without remote storage" - ))); - } - let tenant_shard_id = TenantShardId::unsharded(tenant_id); let shard_params = ShardParameters::default(); let location_conf = LocationConf::attached_single(tenant_conf, generation, &shard_params); @@ -1643,12 +1637,6 @@ async fn tenant_time_travel_remote_storage_handler( ))); } - let Some(storage) = state.remote_storage.as_ref() else { - return Err(ApiError::InternalServerError(anyhow::anyhow!( - "remote storage not configured, cannot run time travel" - ))); - }; - if timestamp > done_if_after { return Err(ApiError::BadRequest(anyhow!( "The done_if_after timestamp comes before the timestamp to recover to" @@ -1658,7 +1646,7 @@ async fn tenant_time_travel_remote_storage_handler( tracing::info!("Issuing time travel request internally. timestamp={timestamp_raw}, done_if_after={done_if_after_raw}"); remote_timeline_client::upload::time_travel_recover_tenant( - storage, + &state.remote_storage, &tenant_shard_id, timestamp, done_if_after, @@ -1903,11 +1891,6 @@ async fn deletion_queue_flush( ) -> Result, ApiError> { let state = get_state(&r); - if state.remote_storage.is_none() { - // Nothing to do if remote storage is disabled. - return json_response(StatusCode::OK, ()); - } - let execute = parse_query_param(&r, "execute")?.unwrap_or(false); let flush = async { @@ -2072,18 +2055,11 @@ async fn disk_usage_eviction_run( }; let state = get_state(&r); - - let Some(storage) = state.remote_storage.as_ref() else { - return Err(ApiError::InternalServerError(anyhow::anyhow!( - "remote storage not configured, cannot run eviction iteration" - ))); - }; - let eviction_state = state.disk_usage_eviction_state.clone(); let res = crate::disk_usage_eviction_task::disk_usage_eviction_task_iteration_impl( &eviction_state, - storage, + &state.remote_storage, usage, &state.tenant_manager, config.eviction_order, @@ -2120,29 +2096,23 @@ async fn tenant_scan_remote_handler( let state = get_state(&request); let tenant_id: TenantId = parse_request_param(&request, "tenant_id")?; - let Some(remote_storage) = state.remote_storage.as_ref() else { - return Err(ApiError::BadRequest(anyhow::anyhow!( - "Remote storage not configured" - ))); - }; - let mut response = TenantScanRemoteStorageResponse::default(); let (shards, _other_keys) = - list_remote_tenant_shards(remote_storage, tenant_id, cancel.clone()) + list_remote_tenant_shards(&state.remote_storage, tenant_id, cancel.clone()) .await .map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?; for tenant_shard_id in shards { let (timeline_ids, _other_keys) = - list_remote_timelines(remote_storage, tenant_shard_id, cancel.clone()) + list_remote_timelines(&state.remote_storage, tenant_shard_id, cancel.clone()) .await .map_err(|e| ApiError::InternalServerError(anyhow::anyhow!(e)))?; let mut generation = Generation::none(); for timeline_id in timeline_ids { match download_index_part( - remote_storage, + &state.remote_storage, &tenant_shard_id, &timeline_id, Generation::MAX, diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 930700e50c..c69fb8c83b 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -57,7 +57,7 @@ pub use crate::metrics::preinitialize_metrics; #[tracing::instrument(skip_all, fields(%exit_code))] pub async fn shutdown_pageserver( tenant_manager: &TenantManager, - deletion_queue: Option, + mut deletion_queue: DeletionQueue, exit_code: i32, ) { use std::time::Duration; @@ -89,9 +89,7 @@ pub async fn shutdown_pageserver( .await; // Best effort to persist any outstanding deletions, to avoid leaking objects - if let Some(mut deletion_queue) = deletion_queue { - deletion_queue.shutdown(Duration::from_secs(5)).await; - } + deletion_queue.shutdown(Duration::from_secs(5)).await; // Shut down the HTTP endpoint last, so that you can still check the server's // status while it's shutting down. @@ -114,10 +112,6 @@ pub async fn shutdown_pageserver( std::process::exit(exit_code); } -/// The name of the metadata file pageserver creates per timeline. -/// Full path: `tenants//timelines//metadata`. -pub const METADATA_FILE_NAME: &str = "metadata"; - /// Per-tenant configuration file. /// Full path: `tenants//config`. pub(crate) const TENANT_CONFIG_NAME: &str = "config"; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 80d354d79e..026cbc107c 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -190,7 +190,7 @@ pub const TENANT_DELETED_MARKER_FILE_NAME: &str = "deleted"; #[derive(Clone)] pub struct TenantSharedResources { pub broker_client: storage_broker::BrokerClientChannel, - pub remote_storage: Option, + pub remote_storage: GenericRemoteStorage, pub deletion_queue_client: DeletionQueueClient, } @@ -292,7 +292,7 @@ pub struct Tenant { walredo_mgr: Option>, // provides access to timeline data sitting in the remote storage - pub(crate) remote_storage: Option, + pub(crate) remote_storage: GenericRemoteStorage, // Access to global deletion queue for when this tenant wants to schedule a deletion deletion_queue_client: DeletionQueueClient, @@ -551,21 +551,22 @@ impl Tenant { ); if let Some(index_part) = index_part.as_ref() { - timeline - .remote_client - .as_ref() - .unwrap() - .init_upload_queue(index_part)?; - } else if self.remote_storage.is_some() { + timeline.remote_client.init_upload_queue(index_part)?; + } else { // No data on the remote storage, but we have local metadata file. We can end up // here with timeline_create being interrupted before finishing index part upload. // By doing what we do here, the index part upload is retried. // If control plane retries timeline creation in the meantime, the mgmt API handler // for timeline creation will coalesce on the upload we queue here. + // FIXME: this branch should be dead code as we no longer write local metadata. - let rtc = timeline.remote_client.as_ref().unwrap(); - rtc.init_upload_queue_for_empty_remote(&metadata)?; - rtc.schedule_index_upload_for_full_metadata_update(&metadata)?; + + timeline + .remote_client + .init_upload_queue_for_empty_remote(&metadata)?; + timeline + .remote_client + .schedule_index_upload_for_full_metadata_update(&metadata)?; } timeline @@ -777,14 +778,14 @@ impl Tenant { AttachType::Normal }; - let preload = match (&mode, &remote_storage) { - (SpawnMode::Create, _) => { + let preload = match &mode { + SpawnMode::Create => { None }, - (SpawnMode::Eager | SpawnMode::Lazy, Some(remote_storage)) => { + SpawnMode::Eager | SpawnMode::Lazy => { let _preload_timer = TENANT.preload.start_timer(); let res = tenant_clone - .preload(remote_storage, task_mgr::shutdown_token()) + .preload(&remote_storage, task_mgr::shutdown_token()) .await; match res { Ok(p) => Some(p), @@ -794,10 +795,7 @@ impl Tenant { } } } - (_, None) => { - let _preload_timer = TENANT.preload.start_timer(); - None - } + }; // Remote preload is complete. @@ -1021,7 +1019,7 @@ impl Tenant { index_part, remote_metadata, TimelineResources { - remote_client: Some(remote_client), + remote_client, deletion_queue_client: self.deletion_queue_client.clone(), timeline_get_throttle: self.timeline_get_throttle.clone(), }, @@ -1047,7 +1045,7 @@ impl Tenant { Arc::clone(self), timeline_id, &index_part.metadata, - Some(remote_timeline_client), + remote_timeline_client, self.deletion_queue_client.clone(), ) .instrument(tracing::info_span!("timeline_delete", %timeline_id)) @@ -1139,9 +1137,7 @@ impl Tenant { let mut size = 0; for timeline in self.list_timelines() { - if let Some(remote_client) = &timeline.remote_client { - size += remote_client.get_remote_physical_size(); - } + size += timeline.remote_client.get_remote_physical_size(); } size @@ -1191,6 +1187,7 @@ impl Tenant { pub fn create_broken_tenant( conf: &'static PageServerConf, tenant_shard_id: TenantShardId, + remote_storage: GenericRemoteStorage, reason: String, ) -> Arc { Arc::new(Tenant::new( @@ -1205,7 +1202,7 @@ impl Tenant { ShardIdentity::broken(tenant_shard_id.shard_number, tenant_shard_id.shard_count), None, tenant_shard_id, - None, + remote_storage, DeletionQueueClient::broken(), )) } @@ -1398,13 +1395,7 @@ impl Tenant { tline.freeze_and_flush().await.context("freeze_and_flush")?; // Make sure the freeze_and_flush reaches remote storage. - tline - .remote_client - .as_ref() - .unwrap() - .wait_completion() - .await - .unwrap(); + tline.remote_client.wait_completion().await.unwrap(); let tl = uninit_tl.finish_creation()?; // The non-test code would call tl.activate() here. @@ -1470,20 +1461,19 @@ impl Tenant { return Err(CreateTimelineError::Conflict); } - if let Some(remote_client) = existing.remote_client.as_ref() { - // Wait for uploads to complete, so that when we return Ok, the timeline - // is known to be durable on remote storage. Just like we do at the end of - // this function, after we have created the timeline ourselves. - // - // We only really care that the initial version of `index_part.json` has - // been uploaded. That's enough to remember that the timeline - // exists. However, there is no function to wait specifically for that so - // we just wait for all in-progress uploads to finish. - remote_client - .wait_completion() - .await - .context("wait for timeline uploads to complete")?; - } + // Wait for uploads to complete, so that when we return Ok, the timeline + // is known to be durable on remote storage. Just like we do at the end of + // this function, after we have created the timeline ourselves. + // + // We only really care that the initial version of `index_part.json` has + // been uploaded. That's enough to remember that the timeline + // exists. However, there is no function to wait specifically for that so + // we just wait for all in-progress uploads to finish. + existing + .remote_client + .wait_completion() + .await + .context("wait for timeline uploads to complete")?; return Ok(existing); } @@ -1559,14 +1549,14 @@ impl Tenant { // the timeline is visible in [`Self::timelines`], but it is _not_ durable yet. We must // not send a success to the caller until it is. The same applies to handling retries, // see the handling of [`TimelineExclusionError::AlreadyExists`] above. - if let Some(remote_client) = loaded_timeline.remote_client.as_ref() { - let kind = ancestor_timeline_id - .map(|_| "branched") - .unwrap_or("bootstrapped"); - remote_client.wait_completion().await.with_context(|| { - format!("wait for {} timeline initial uploads to complete", kind) - })?; - } + let kind = ancestor_timeline_id + .map(|_| "branched") + .unwrap_or("bootstrapped"); + loaded_timeline + .remote_client + .wait_completion() + .await + .with_context(|| format!("wait for {} timeline initial uploads to complete", kind))?; loaded_timeline.activate(self.clone(), broker_client, None, ctx); @@ -2161,32 +2151,26 @@ impl Tenant { ) -> anyhow::Result<()> { let timelines = self.timelines.lock().unwrap().clone(); for timeline in timelines.values() { - let Some(tl_client) = &timeline.remote_client else { - anyhow::bail!("Remote storage is mandatory"); - }; - - let Some(remote_storage) = &self.remote_storage else { - anyhow::bail!("Remote storage is mandatory"); - }; - // We do not block timeline creation/deletion during splits inside the pageserver: it is up to higher levels // to ensure that they do not start a split if currently in the process of doing these. // Upload an index from the parent: this is partly to provide freshness for the // child tenants that will copy it, and partly for general ease-of-debugging: there will // always be a parent shard index in the same generation as we wrote the child shard index. - tl_client.schedule_index_upload_for_file_changes()?; - tl_client.wait_completion().await?; + timeline + .remote_client + .schedule_index_upload_for_file_changes()?; + timeline.remote_client.wait_completion().await?; // Shut down the timeline's remote client: this means that the indices we write // for child shards will not be invalidated by the parent shard deleting layers. - tl_client.shutdown().await; + timeline.remote_client.shutdown().await; // Download methods can still be used after shutdown, as they don't flow through the remote client's // queue. In principal the RemoteTimelineClient could provide this without downloading it, but this // operation is rare, so it's simpler to just download it (and robustly guarantees that the index // we use here really is the remotely persistent one). - let result = tl_client + let result = timeline.remote_client .download_index_file(&self.cancel) .instrument(info_span!("download_index_file", tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), timeline_id=%timeline.timeline_id)) .await?; @@ -2199,7 +2183,7 @@ impl Tenant { for child_shard in child_shards { upload_index_part( - remote_storage, + &self.remote_storage, child_shard, &timeline.timeline_id, self.generation, @@ -2475,7 +2459,7 @@ impl Tenant { shard_identity: ShardIdentity, walredo_mgr: Option>, tenant_shard_id: TenantShardId, - remote_storage: Option, + remote_storage: GenericRemoteStorage, deletion_queue_client: DeletionQueueClient, ) -> Tenant { let (state, mut rx) = watch::channel(state); @@ -3119,11 +3103,10 @@ impl Tenant { // We still need to upload its metadata eagerly: if other nodes `attach` the tenant and miss this timeline, their GC // could get incorrect information and remove more layers, than needed. // See also https://github.com/neondatabase/neon/issues/3865 - if let Some(remote_client) = new_timeline.remote_client.as_ref() { - remote_client - .schedule_index_upload_for_full_metadata_update(&metadata) - .context("branch initial metadata upload")?; - } + new_timeline + .remote_client + .schedule_index_upload_for_full_metadata_update(&metadata) + .context("branch initial metadata upload")?; Ok(new_timeline) } @@ -3155,11 +3138,6 @@ impl Tenant { pgdata_path: &Utf8PathBuf, timeline_id: &TimelineId, ) -> anyhow::Result<()> { - let Some(storage) = &self.remote_storage else { - // No remote storage? No upload. - return Ok(()); - }; - let temp_path = timelines_path.join(format!( "{INITDB_PATH}.upload-{timeline_id}.{TEMP_FILE_SUFFIX}" )); @@ -3183,7 +3161,7 @@ impl Tenant { backoff::retry( || async { self::remote_timeline_client::upload_initdb_dir( - storage, + &self.remote_storage, &self.tenant_shard_id.tenant_id, timeline_id, pgdata_zstd.try_clone().await?, @@ -3240,9 +3218,6 @@ impl Tenant { } } if let Some(existing_initdb_timeline_id) = load_existing_initdb { - let Some(storage) = &self.remote_storage else { - bail!("no storage configured but load_existing_initdb set to {existing_initdb_timeline_id}"); - }; if existing_initdb_timeline_id != timeline_id { let source_path = &remote_initdb_archive_path( &self.tenant_shard_id.tenant_id, @@ -3252,7 +3227,7 @@ impl Tenant { &remote_initdb_archive_path(&self.tenant_shard_id.tenant_id, &timeline_id); // if this fails, it will get retried by retried control plane requests - storage + self.remote_storage .copy_object(source_path, dest_path, &self.cancel) .await .context("copy initdb tar")?; @@ -3260,7 +3235,7 @@ impl Tenant { let (initdb_tar_zst_path, initdb_tar_zst) = self::remote_timeline_client::download_initdb_tar_zst( self.conf, - storage, + &self.remote_storage, &self.tenant_shard_id, &existing_initdb_timeline_id, &self.cancel, @@ -3355,20 +3330,14 @@ impl Tenant { /// Call this before constructing a timeline, to build its required structures fn build_timeline_resources(&self, timeline_id: TimelineId) -> TimelineResources { - let remote_client = if let Some(remote_storage) = self.remote_storage.as_ref() { - let remote_client = RemoteTimelineClient::new( - remote_storage.clone(), - self.deletion_queue_client.clone(), - self.conf, - self.tenant_shard_id, - timeline_id, - self.generation, - ); - Some(remote_client) - } else { - None - }; - + let remote_client = RemoteTimelineClient::new( + self.remote_storage.clone(), + self.deletion_queue_client.clone(), + self.conf, + self.tenant_shard_id, + timeline_id, + self.generation, + ); TimelineResources { remote_client, deletion_queue_client: self.deletion_queue_client.clone(), @@ -3392,9 +3361,9 @@ impl Tenant { let tenant_shard_id = self.tenant_shard_id; let resources = self.build_timeline_resources(new_timeline_id); - if let Some(remote_client) = &resources.remote_client { - remote_client.init_upload_queue_for_empty_remote(new_metadata)?; - } + resources + .remote_client + .init_upload_queue_for_empty_remote(new_metadata)?; let timeline_struct = self .create_timeline_struct( @@ -3562,9 +3531,7 @@ impl Tenant { tracing::info!(timeline_id=%timeline.timeline_id, "Flushing..."); timeline.freeze_and_flush().await?; tracing::info!(timeline_id=%timeline.timeline_id, "Waiting for uploads..."); - if let Some(client) = &timeline.remote_client { - client.wait_completion().await?; - } + timeline.remote_client.wait_completion().await?; Ok(()) } @@ -3878,7 +3845,7 @@ pub(crate) mod harness { ShardIdentity::unsharded(), Some(walredo_mgr), self.tenant_shard_id, - Some(self.remote_storage.clone()), + self.remote_storage.clone(), self.deletion_queue.new_client(), )); diff --git a/pageserver/src/tenant/delete.rs b/pageserver/src/tenant/delete.rs index 2e5259bfe2..3173a33dad 100644 --- a/pageserver/src/tenant/delete.rs +++ b/pageserver/src/tenant/delete.rs @@ -181,25 +181,23 @@ async fn ensure_timelines_dir_empty(timelines_path: &Utf8Path) -> Result<(), Del async fn remove_tenant_remote_delete_mark( conf: &PageServerConf, - remote_storage: Option<&GenericRemoteStorage>, + remote_storage: &GenericRemoteStorage, tenant_shard_id: &TenantShardId, cancel: &CancellationToken, ) -> Result<(), DeleteTenantError> { - if let Some(remote_storage) = remote_storage { - let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?; - backoff::retry( - || async { remote_storage.delete(&path, cancel).await }, - TimeoutOrCancel::caused_by_cancel, - FAILED_UPLOAD_WARN_THRESHOLD, - FAILED_REMOTE_OP_RETRIES, - "remove_tenant_remote_delete_mark", - cancel, - ) - .await - .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel)) - .and_then(|x| x) - .context("remove_tenant_remote_delete_mark")?; - } + let path = remote_tenant_delete_mark_path(conf, tenant_shard_id)?; + backoff::retry( + || async { remote_storage.delete(&path, cancel).await }, + TimeoutOrCancel::caused_by_cancel, + FAILED_UPLOAD_WARN_THRESHOLD, + FAILED_REMOTE_OP_RETRIES, + "remove_tenant_remote_delete_mark", + cancel, + ) + .await + .ok_or_else(|| anyhow::Error::new(TimeoutOrCancel::Cancel)) + .and_then(|x| x) + .context("remove_tenant_remote_delete_mark")?; Ok(()) } @@ -297,7 +295,7 @@ impl DeleteTenantFlow { #[instrument(skip_all)] pub(crate) async fn run( conf: &'static PageServerConf, - remote_storage: Option, + remote_storage: GenericRemoteStorage, tenants: &'static std::sync::RwLock, tenant: Arc, cancel: &CancellationToken, @@ -308,9 +306,7 @@ impl DeleteTenantFlow { let mut guard = Self::prepare(&tenant).await?; - if let Err(e) = - Self::run_inner(&mut guard, conf, remote_storage.as_ref(), &tenant, cancel).await - { + if let Err(e) = Self::run_inner(&mut guard, conf, &remote_storage, &tenant, cancel).await { tenant.set_broken(format!("{e:#}")).await; return Err(e); } @@ -327,7 +323,7 @@ impl DeleteTenantFlow { async fn run_inner( guard: &mut OwnedMutexGuard, conf: &'static PageServerConf, - remote_storage: Option<&GenericRemoteStorage>, + remote_storage: &GenericRemoteStorage, tenant: &Tenant, cancel: &CancellationToken, ) -> Result<(), DeleteTenantError> { @@ -339,14 +335,9 @@ impl DeleteTenantFlow { ))? }); - // IDEA: implement detach as delete without remote storage. Then they would use the same lock (deletion_progress) so wont contend. - // Though sounds scary, different mark name? - // Detach currently uses remove_dir_all so in case of a crash we can end up in a weird state. - if let Some(remote_storage) = &remote_storage { - create_remote_delete_mark(conf, remote_storage, &tenant.tenant_shard_id, cancel) - .await - .context("remote_mark")? - } + create_remote_delete_mark(conf, remote_storage, &tenant.tenant_shard_id, cancel) + .await + .context("remote_mark")?; fail::fail_point!("tenant-delete-before-create-local-mark", |_| { Err(anyhow::anyhow!( @@ -483,7 +474,7 @@ impl DeleteTenantFlow { fn schedule_background( guard: OwnedMutexGuard, conf: &'static PageServerConf, - remote_storage: Option, + remote_storage: GenericRemoteStorage, tenants: &'static std::sync::RwLock, tenant: Arc, ) { @@ -512,7 +503,7 @@ impl DeleteTenantFlow { async fn background( mut guard: OwnedMutexGuard, conf: &PageServerConf, - remote_storage: Option, + remote_storage: GenericRemoteStorage, tenants: &'static std::sync::RwLock, tenant: &Arc, ) -> Result<(), DeleteTenantError> { @@ -551,7 +542,7 @@ impl DeleteTenantFlow { remove_tenant_remote_delete_mark( conf, - remote_storage.as_ref(), + &remote_storage, &tenant.tenant_shard_id, &task_mgr::shutdown_token(), ) diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 7a3e36bf02..5abda7b64e 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -47,7 +47,7 @@ use crate::tenant::span::debug_assert_current_span_has_tenant_id; use crate::tenant::storage_layer::inmemory_layer; use crate::tenant::timeline::ShutdownMode; use crate::tenant::{AttachedTenantConf, SpawnMode, Tenant, TenantState}; -use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, METADATA_FILE_NAME, TEMP_FILE_SUFFIX}; +use crate::{InitializationOrder, IGNORED_TENANT_FILE_NAME, TEMP_FILE_SUFFIX}; use utils::crashsafe::path_with_suffix_extension; use utils::fs_ext::PathExt; @@ -391,22 +391,17 @@ async fn init_load_generations( // deletion list entries may still be valid. We provide that by pushing a recovery operation into // the queue. Sequential processing of te queue ensures that recovery is done before any new tenant deletions // are processed, even though we don't block on recovery completing here. - // - // Must only do this if remote storage is enabled, otherwise deletion queue - // is not running and channel push will fail. - if resources.remote_storage.is_some() { - let attached_tenants = generations - .iter() - .flat_map(|(id, start_mode)| { - match start_mode { - TenantStartupMode::Attached((_mode, generation)) => Some(generation), - TenantStartupMode::Secondary => None, - } - .map(|gen| (*id, *gen)) - }) - .collect(); - resources.deletion_queue_client.recover(attached_tenants)?; - } + let attached_tenants = generations + .iter() + .flat_map(|(id, start_mode)| { + match start_mode { + TenantStartupMode::Attached((_mode, generation)) => Some(generation), + TenantStartupMode::Secondary => None, + } + .map(|gen| (*id, *gen)) + }) + .collect(); + resources.deletion_queue_client.recover(attached_tenants)?; Ok(Some(generations)) } @@ -460,53 +455,6 @@ fn load_tenant_config( } }; - // Clean up legacy `metadata` files. - // Doing it here because every single tenant directory is visited here. - // In any later code, there's different treatment of tenant dirs - // ... depending on whether the tenant is in re-attach response or not - // ... epending on whether the tenant is ignored or not - assert_eq!( - &conf.tenant_path(&tenant_shard_id), - &tenant_dir_path, - "later use of conf....path() methods would be dubious" - ); - let timelines: Vec = match conf.timelines_path(&tenant_shard_id).read_dir_utf8() { - Ok(iter) => { - let mut timelines = Vec::new(); - for res in iter { - let p = res?; - let Some(timeline_id) = p.file_name().parse::().ok() else { - // skip any entries that aren't TimelineId, such as - // - *.___temp dirs - // - unfinished initdb uploads (test_non_uploaded_root_timeline_is_deleted_after_restart) - continue; - }; - timelines.push(timeline_id); - } - timelines - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => vec![], - Err(e) => return Err(anyhow::anyhow!(e)), - }; - for timeline_id in timelines { - let timeline_path = &conf.timeline_path(&tenant_shard_id, &timeline_id); - let metadata_path = timeline_path.join(METADATA_FILE_NAME); - match std::fs::remove_file(&metadata_path) { - Ok(()) => { - crashsafe::fsync(timeline_path) - .context("fsync timeline dir after removing legacy metadata file")?; - info!("removed legacy metadata file at {metadata_path}"); - } - Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - // something removed the file earlier, or it was never there - // We don't care, this software version doesn't write it again, so, we're good. - } - Err(e) => { - anyhow::bail!("remove legacy metadata file: {e}: {metadata_path}"); - } - } - } - let tenant_ignore_mark_file = tenant_dir_path.join(IGNORED_TENANT_FILE_NAME); if tenant_ignore_mark_file.exists() { info!("Found an ignore mark file {tenant_ignore_mark_file:?}, skipping the tenant"); @@ -611,6 +559,7 @@ pub async fn init_tenant_mgr( TenantSlot::Attached(Tenant::create_broken_tenant( conf, tenant_shard_id, + resources.remote_storage.clone(), format!("{}", e), )), ); @@ -803,6 +752,7 @@ fn tenant_spawn( "Cannot load tenant, ignore mark found at {tenant_ignore_mark:?}" ); + let remote_storage = resources.remote_storage.clone(); let tenant = match Tenant::spawn( conf, tenant_shard_id, @@ -817,7 +767,7 @@ fn tenant_spawn( Ok(tenant) => tenant, Err(e) => { error!("Failed to spawn tenant {tenant_shard_id}, reason: {e:#}"); - Tenant::create_broken_tenant(conf, tenant_shard_id, format!("{e:#}")) + Tenant::create_broken_tenant(conf, tenant_shard_id, remote_storage, format!("{e:#}")) } }; @@ -2276,7 +2226,7 @@ pub(crate) async fn load_tenant( tenant_id: TenantId, generation: Generation, broker_client: storage_broker::BrokerClientChannel, - remote_storage: Option, + remote_storage: GenericRemoteStorage, deletion_queue_client: DeletionQueueClient, ctx: &RequestContext, ) -> Result<(), TenantMapInsertError> { @@ -2937,7 +2887,7 @@ pub(crate) async fn immediate_gc( } let timeline = tenant.get_timeline(timeline_id, false).ok(); - let rtc = timeline.as_ref().and_then(|x| x.remote_client.as_ref()); + let rtc = timeline.as_ref().map(|x| &x.remote_client); if let Some(rtc) = rtc { // layer drops schedule actions on remote timeline client to actually do the diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 630ade5c13..c5462dac43 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -2137,7 +2137,7 @@ mod tests { tenant_ctx: _tenant_ctx, } = test_setup; - let client = timeline.remote_client.as_ref().unwrap(); + let client = &timeline.remote_client; // Download back the index.json, and check that the list of files is correct let initial_index_part = match client @@ -2328,7 +2328,7 @@ mod tests { timeline, .. } = TestSetup::new("metrics").await.unwrap(); - let client = timeline.remote_client.as_ref().unwrap(); + let client = &timeline.remote_client; let layer_file_name_1: LayerName = "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(); let local_path = local_layer_path( diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index c28e041fa2..46a3d7e81f 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -26,7 +26,7 @@ use crate::{ tasks::{warn_when_period_overrun, BackgroundLoopKind}, }, virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}, - METADATA_FILE_NAME, TEMP_FILE_SUFFIX, + TEMP_FILE_SUFFIX, }; use super::{ @@ -1074,11 +1074,7 @@ async fn init_timeline_state( .fatal_err(&format!("Read metadata on {}", file_path)); let file_name = file_path.file_name().expect("created it from the dentry"); - if file_name == METADATA_FILE_NAME { - // Secondary mode doesn't use local metadata files, but they might have been left behind by an attached tenant. - warn!(path=?dentry.path(), "found legacy metadata file, these should have been removed in load_tenant_config"); - continue; - } else if crate::is_temporary(&file_path) + if crate::is_temporary(&file_path) || is_temp_download_file(&file_path) || is_ephemeral_file(file_name) { diff --git a/pageserver/src/tenant/storage_layer/layer.rs b/pageserver/src/tenant/storage_layer/layer.rs index b6f7702247..e8c712c4c6 100644 --- a/pageserver/src/tenant/storage_layer/layer.rs +++ b/pageserver/src/tenant/storage_layer/layer.rs @@ -585,9 +585,6 @@ struct LayerInner { /// [`Timeline::gate`] at the same time. timeline: Weak, - /// Cached knowledge of [`Timeline::remote_client`] being `Some`. - have_remote_client: bool, - access_stats: LayerAccessStats, /// This custom OnceCell is backed by std mutex, but only held for short time periods. @@ -732,23 +729,23 @@ impl Drop for LayerInner { if removed { timeline.metrics.resident_physical_size_sub(file_size); } - if let Some(remote_client) = timeline.remote_client.as_ref() { - let res = remote_client.schedule_deletion_of_unlinked(vec![(file_name, meta)]); + let res = timeline + .remote_client + .schedule_deletion_of_unlinked(vec![(file_name, meta)]); - if let Err(e) = res { - // test_timeline_deletion_with_files_stuck_in_upload_queue is good at - // demonstrating this deadlock (without spawn_blocking): stop will drop - // queued items, which will have ResidentLayer's, and those drops would try - // to re-entrantly lock the RemoteTimelineClient inner state. - if !timeline.is_active() { - tracing::info!("scheduling deletion on drop failed: {e:#}"); - } else { - tracing::warn!("scheduling deletion on drop failed: {e:#}"); - } - LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed); + if let Err(e) = res { + // test_timeline_deletion_with_files_stuck_in_upload_queue is good at + // demonstrating this deadlock (without spawn_blocking): stop will drop + // queued items, which will have ResidentLayer's, and those drops would try + // to re-entrantly lock the RemoteTimelineClient inner state. + if !timeline.is_active() { + tracing::info!("scheduling deletion on drop failed: {e:#}"); } else { - LAYER_IMPL_METRICS.inc_completed_deletes(); + tracing::warn!("scheduling deletion on drop failed: {e:#}"); } + LAYER_IMPL_METRICS.inc_deletes_failed(DeleteFailed::DeleteSchedulingFailed); + } else { + LAYER_IMPL_METRICS.inc_completed_deletes(); } }); } @@ -786,7 +783,6 @@ impl LayerInner { path: local_path, desc, timeline: Arc::downgrade(timeline), - have_remote_client: timeline.remote_client.is_some(), access_stats, wanted_deleted: AtomicBool::new(false), inner, @@ -815,8 +811,6 @@ impl LayerInner { /// in a new attempt to evict OR join the previously started attempt. #[tracing::instrument(level = tracing::Level::DEBUG, skip_all, ret, err(level = tracing::Level::DEBUG), fields(layer=%self))] pub(crate) async fn evict_and_wait(&self, timeout: Duration) -> Result<(), EvictionError> { - assert!(self.have_remote_client); - let mut rx = self.status.as_ref().unwrap().subscribe(); { @@ -973,10 +967,6 @@ impl LayerInner { return Err(DownloadError::NotFile(ft)); } - if timeline.remote_client.as_ref().is_none() { - return Err(DownloadError::NoRemoteStorage); - } - if let Some(ctx) = ctx { self.check_expected_download(ctx)?; } @@ -1113,12 +1103,8 @@ impl LayerInner { permit: heavier_once_cell::InitPermit, ctx: &RequestContext, ) -> anyhow::Result> { - let client = timeline + let result = timeline .remote_client - .as_ref() - .expect("checked before download_init_and_wait"); - - let result = client .download_layer_file( &self.desc.layer_name(), &self.metadata(), @@ -1293,20 +1279,10 @@ impl LayerInner { /// `DownloadedLayer` is being dropped, so it calls this method. fn on_downloaded_layer_drop(self: Arc, only_version: usize) { - let can_evict = self.have_remote_client; - // we cannot know without inspecting LayerInner::inner if we should evict or not, even // though here it is very likely let span = tracing::info_span!(parent: None, "layer_evict", tenant_id = %self.desc.tenant_shard_id.tenant_id, shard_id = %self.desc.tenant_shard_id.shard_slug(), timeline_id = %self.desc.timeline_id, layer=%self, version=%only_version); - if !can_evict { - // it would be nice to assert this case out, but we are in drop - span.in_scope(|| { - tracing::error!("bug in struct Layer: ResidentOrWantedEvicted has been downgraded while we have no remote storage"); - }); - return; - } - // NOTE: this scope *must* never call `self.inner.get` because evict_and_wait might // drop while the `self.inner` is being locked, leading to a deadlock. @@ -1578,8 +1554,6 @@ pub(crate) enum EvictionError { pub(crate) enum DownloadError { #[error("timeline has already shutdown")] TimelineShutdown, - #[error("no remote storage configured")] - NoRemoteStorage, #[error("context denies downloading")] ContextAndConfigReallyDeniesDownloads, #[error("downloading is really required but not allowed by this method")] diff --git a/pageserver/src/tenant/storage_layer/layer/tests.rs b/pageserver/src/tenant/storage_layer/layer/tests.rs index 52f62faa8d..fa9142d5e9 100644 --- a/pageserver/src/tenant/storage_layer/layer/tests.rs +++ b/pageserver/src/tenant/storage_layer/layer/tests.rs @@ -145,7 +145,7 @@ async fn smoke_test() { .await .expect("the local layer file still exists"); - let rtc = timeline.remote_client.as_ref().unwrap(); + let rtc = &timeline.remote_client; { let layers = &[layer]; @@ -761,13 +761,7 @@ async fn eviction_cancellation_on_drop() { timeline.freeze_and_flush().await.unwrap(); // wait for the upload to complete so our Arc::strong_count assertion holds - timeline - .remote_client - .as_ref() - .unwrap() - .wait_completion() - .await - .unwrap(); + timeline.remote_client.wait_completion().await.unwrap(); let (evicted_layer, not_evicted) = { let mut layers = { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ca34b4fadc..df9bc9b35b 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -200,7 +200,7 @@ fn drop_wlock(rlock: tokio::sync::RwLockWriteGuard<'_, T>) { /// The outward-facing resources required to build a Timeline pub struct TimelineResources { - pub remote_client: Option, + pub remote_client: RemoteTimelineClient, pub deletion_queue_client: DeletionQueueClient, pub timeline_get_throttle: Arc< crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>, @@ -272,7 +272,7 @@ pub struct Timeline { /// Remote storage client. /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details. - pub remote_client: Option>, + pub remote_client: Arc, // What page versions do we hold in the repository? If we get a // request > last_record_lsn, we need to wait until we receive all @@ -1375,22 +1375,14 @@ impl Timeline { /// not validated with control plane yet. /// See [`Self::get_remote_consistent_lsn_visible`]. pub(crate) fn get_remote_consistent_lsn_projected(&self) -> Option { - if let Some(remote_client) = &self.remote_client { - remote_client.remote_consistent_lsn_projected() - } else { - None - } + self.remote_client.remote_consistent_lsn_projected() } /// remote_consistent_lsn which the tenant is guaranteed not to go backward from, /// i.e. a value of remote_consistent_lsn_projected which has undergone /// generation validation in the deletion queue. pub(crate) fn get_remote_consistent_lsn_visible(&self) -> Option { - if let Some(remote_client) = &self.remote_client { - remote_client.remote_consistent_lsn_visible() - } else { - None - } + self.remote_client.remote_consistent_lsn_visible() } /// The sum of the file size of all historic layers in the layer map. @@ -1760,16 +1752,14 @@ impl Timeline { match self.freeze_and_flush().await { Ok(_) => { // drain the upload queue - if let Some(client) = self.remote_client.as_ref() { - // if we did not wait for completion here, it might be our shutdown process - // didn't wait for remote uploads to complete at all, as new tasks can forever - // be spawned. - // - // what is problematic is the shutting down of RemoteTimelineClient, because - // obviously it does not make sense to stop while we wait for it, but what - // about corner cases like s3 suddenly hanging up? - client.shutdown().await; - } + // if we did not wait for completion here, it might be our shutdown process + // didn't wait for remote uploads to complete at all, as new tasks can forever + // be spawned. + // + // what is problematic is the shutting down of RemoteTimelineClient, because + // obviously it does not make sense to stop while we wait for it, but what + // about corner cases like s3 suddenly hanging up? + self.remote_client.shutdown().await; } Err(e) => { // Non-fatal. Shutdown is infallible. Failures to flush just mean that @@ -1785,18 +1775,16 @@ impl Timeline { // Transition the remote_client into a state where it's only useful for timeline deletion. // (The deletion use case is why we can't just hook up remote_client to Self::cancel).) - if let Some(remote_client) = self.remote_client.as_ref() { - remote_client.stop(); - // As documented in remote_client.stop()'s doc comment, it's our responsibility - // to shut down the upload queue tasks. - // TODO: fix that, task management should be encapsulated inside remote_client. - task_mgr::shutdown_tasks( - Some(TaskKind::RemoteUploadTask), - Some(self.tenant_shard_id), - Some(self.timeline_id), - ) - .await; - } + self.remote_client.stop(); + // As documented in remote_client.stop()'s doc comment, it's our responsibility + // to shut down the upload queue tasks. + // TODO: fix that, task management should be encapsulated inside remote_client. + task_mgr::shutdown_tasks( + Some(TaskKind::RemoteUploadTask), + Some(self.tenant_shard_id), + Some(self.timeline_id), + ) + .await; // TODO: work toward making this a no-op. See this funciton's doc comment for more context. tracing::debug!("Waiting for tasks..."); @@ -1922,10 +1910,6 @@ impl Timeline { return Ok(None); }; - if self.remote_client.is_none() { - return Ok(Some(false)); - } - layer.download().await?; Ok(Some(true)) @@ -2190,7 +2174,7 @@ impl Timeline { walredo_mgr, walreceiver: Mutex::new(None), - remote_client: resources.remote_client.map(Arc::new), + remote_client: Arc::new(resources.remote_client), // initialize in-memory 'last_record_lsn' from 'disk_consistent_lsn'. last_record_lsn: SeqWait::new(RecordLsn { @@ -2437,10 +2421,6 @@ impl Timeline { discovered_layers.push((layer_file_name, local_path, file_size)); continue; } - Discovered::Metadata => { - warn!("found legacy metadata file, these should have been removed in load_tenant_config"); - continue; - } Discovered::IgnoredBackup => { continue; } @@ -2487,12 +2467,10 @@ impl Timeline { if local.metadata.file_size() == remote.file_size() { // Use the local file, but take the remote metadata so that we pick up // the correct generation. - UseLocal( - LocalLayerFileMetadata { - metadata: remote, - local_path: local.local_path - } - ) + UseLocal(LocalLayerFileMetadata { + metadata: remote, + local_path: local.local_path, + }) } else { init::cleanup_local_file_for_remote(&local, &remote)?; UseRemote { local, remote } @@ -2501,7 +2479,11 @@ impl Timeline { Ok(decision) => decision, Err(DismissedLayer::Future { local }) => { if let Some(local) = local { - init::cleanup_future_layer(&local.local_path, &name, disk_consistent_lsn)?; + init::cleanup_future_layer( + &local.local_path, + &name, + disk_consistent_lsn, + )?; } needs_cleanup.push(name); continue; @@ -2523,7 +2505,8 @@ impl Timeline { let layer = match decision { UseLocal(local) => { total_physical_size += local.metadata.file_size(); - Layer::for_resident(conf, &this, local.local_path, name, local.metadata).drop_eviction_guard() + Layer::for_resident(conf, &this, local.local_path, name, local.metadata) + .drop_eviction_guard() } Evicted(remote) | UseRemote { remote, .. } => { Layer::for_evicted(conf, &this, name, remote) @@ -2543,36 +2526,36 @@ impl Timeline { guard.initialize_local_layers(loaded_layers, disk_consistent_lsn + 1); - if let Some(rtc) = self.remote_client.as_ref() { - rtc.schedule_layer_file_deletion(&needs_cleanup)?; - rtc.schedule_index_upload_for_file_changes()?; - // This barrier orders above DELETEs before any later operations. - // This is critical because code executing after the barrier might - // create again objects with the same key that we just scheduled for deletion. - // For example, if we just scheduled deletion of an image layer "from the future", - // later compaction might run again and re-create the same image layer. - // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn. - // "same" here means same key range and LSN. - // - // Without a barrier between above DELETEs and the re-creation's PUTs, - // the upload queue may execute the PUT first, then the DELETE. - // In our example, we will end up with an IndexPart referencing a non-existent object. - // - // 1. a future image layer is created and uploaded - // 2. ps restart - // 3. the future layer from (1) is deleted during load layer map - // 4. image layer is re-created and uploaded - // 5. deletion queue would like to delete (1) but actually deletes (4) - // 6. delete by name works as expected, but it now deletes the wrong (later) version - // - // See https://github.com/neondatabase/neon/issues/5878 - // - // NB: generation numbers naturally protect against this because they disambiguate - // (1) and (4) - rtc.schedule_barrier()?; - // Tenant::create_timeline will wait for these uploads to happen before returning, or - // on retry. - } + self.remote_client + .schedule_layer_file_deletion(&needs_cleanup)?; + self.remote_client + .schedule_index_upload_for_file_changes()?; + // This barrier orders above DELETEs before any later operations. + // This is critical because code executing after the barrier might + // create again objects with the same key that we just scheduled for deletion. + // For example, if we just scheduled deletion of an image layer "from the future", + // later compaction might run again and re-create the same image layer. + // "from the future" here means an image layer whose LSN is > IndexPart::disk_consistent_lsn. + // "same" here means same key range and LSN. + // + // Without a barrier between above DELETEs and the re-creation's PUTs, + // the upload queue may execute the PUT first, then the DELETE. + // In our example, we will end up with an IndexPart referencing a non-existent object. + // + // 1. a future image layer is created and uploaded + // 2. ps restart + // 3. the future layer from (1) is deleted during load layer map + // 4. image layer is re-created and uploaded + // 5. deletion queue would like to delete (1) but actually deletes (4) + // 6. delete by name works as expected, but it now deletes the wrong (later) version + // + // See https://github.com/neondatabase/neon/issues/5878 + // + // NB: generation numbers naturally protect against this because they disambiguate + // (1) and (4) + self.remote_client.schedule_barrier()?; + // Tenant::create_timeline will wait for these uploads to happen before returning, or + // on retry. info!( "loaded layer map with {} layers at {}, total physical size: {}", @@ -3025,9 +3008,6 @@ impl Timeline { /// should treat this as a cue to simply skip doing any heatmap uploading /// for this timeline. pub(crate) async fn generate_heatmap(&self) -> Option { - // no point in heatmaps without remote client - let _remote_client = self.remote_client.as_ref()?; - if !self.is_active() { return None; } @@ -3055,10 +3035,7 @@ impl Timeline { // branchpoint in the value in IndexPart::lineage self.ancestor_lsn == lsn || (self.ancestor_lsn == Lsn::INVALID - && self - .remote_client - .as_ref() - .is_some_and(|rtc| rtc.is_previous_ancestor_lsn(lsn))) + && self.remote_client.is_previous_ancestor_lsn(lsn)) } } @@ -3978,29 +3955,23 @@ impl Timeline { x.unwrap() )); - if let Some(remote_client) = &self.remote_client { - for layer in layers_to_upload { - remote_client.schedule_layer_file_upload(layer)?; - } - remote_client.schedule_index_upload_for_metadata_update(&update)?; + for layer in layers_to_upload { + self.remote_client.schedule_layer_file_upload(layer)?; } + self.remote_client + .schedule_index_upload_for_metadata_update(&update)?; Ok(()) } pub(crate) async fn preserve_initdb_archive(&self) -> anyhow::Result<()> { - if let Some(remote_client) = &self.remote_client { - remote_client - .preserve_initdb_archive( - &self.tenant_shard_id.tenant_id, - &self.timeline_id, - &self.cancel, - ) - .await?; - } else { - bail!("No remote storage configured, but was asked to backup the initdb archive for {} / {}", self.tenant_shard_id.tenant_id, self.timeline_id); - } - Ok(()) + self.remote_client + .preserve_initdb_archive( + &self.tenant_shard_id.tenant_id, + &self.timeline_id, + &self.cancel, + ) + .await } // Write out the given frozen in-memory layer as a new L0 delta file. This L0 file will not be tracked @@ -4361,12 +4332,7 @@ impl Timeline { return; } - if self - .remote_client - .as_ref() - .map(|c| c.is_deleting()) - .unwrap_or(false) - { + if self.remote_client.is_deleting() { // The timeline was created in a deletion-resume state, we don't expect logical size to be populated return; } @@ -4534,9 +4500,8 @@ impl Timeline { // deletion will happen later, the layer file manager calls garbage_collect_on_drop guard.finish_compact_l0(&remove_layers, &insert_layers, &self.metrics); - if let Some(remote_client) = self.remote_client.as_ref() { - remote_client.schedule_compaction_update(&remove_layers, new_deltas)?; - } + self.remote_client + .schedule_compaction_update(&remove_layers, new_deltas)?; drop_wlock(guard); @@ -4554,9 +4519,8 @@ impl Timeline { let upload_layers: Vec<_> = replace_layers.into_iter().map(|r| r.1).collect(); - if let Some(remote_client) = self.remote_client.as_ref() { - remote_client.schedule_compaction_update(&drop_layers, &upload_layers)?; - } + self.remote_client + .schedule_compaction_update(&drop_layers, &upload_layers)?; Ok(()) } @@ -4566,16 +4530,14 @@ impl Timeline { self: &Arc, new_images: impl IntoIterator, ) -> anyhow::Result<()> { - let Some(remote_client) = &self.remote_client else { - return Ok(()); - }; for layer in new_images { - remote_client.schedule_layer_file_upload(layer)?; + self.remote_client.schedule_layer_file_upload(layer)?; } // should any new image layer been created, not uploading index_part will // result in a mismatch between remote_physical_size and layermap calculated // size, which will fail some tests, but should not be an issue otherwise. - remote_client.schedule_index_upload_for_file_changes()?; + self.remote_client + .schedule_index_upload_for_file_changes()?; Ok(()) } @@ -4861,9 +4823,7 @@ impl Timeline { result.layers_removed = gc_layers.len() as u64; - if let Some(remote_client) = self.remote_client.as_ref() { - remote_client.schedule_gc_update(&gc_layers)?; - } + self.remote_client.schedule_gc_update(&gc_layers)?; guard.finish_gc_timeline(&gc_layers); diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 4226bf431e..ed48b4c9cb 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -295,13 +295,11 @@ impl Timeline { // Update the LayerMap so that readers will use the new layers, and enqueue it for writing to remote storage self.rewrite_layers(replace_layers, drop_layers).await?; - if let Some(remote_client) = self.remote_client.as_ref() { - // We wait for all uploads to complete before finishing this compaction stage. This is not - // necessary for correctness, but it simplifies testing, and avoids proceeding with another - // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O - // load. - remote_client.wait_completion().await?; - } + // We wait for all uploads to complete before finishing this compaction stage. This is not + // necessary for correctness, but it simplifies testing, and avoids proceeding with another + // Timeline's compaction while this timeline's uploads may be generating lots of disk I/O + // load. + self.remote_client.wait_completion().await?; Ok(()) } diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index d8701be170..901f5149b3 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -26,19 +26,21 @@ use super::{Timeline, TimelineResources}; /// during attach or pageserver restart. /// See comment in persist_index_part_with_deleted_flag. async fn set_deleted_in_remote_index(timeline: &Timeline) -> Result<(), DeleteTimelineError> { - if let Some(remote_client) = timeline.remote_client.as_ref() { - match remote_client.persist_index_part_with_deleted_flag().await { - // If we (now, or already) marked it successfully as deleted, we can proceed - Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (), - // Bail out otherwise - // - // AlreadyInProgress shouldn't happen, because the 'delete_lock' prevents - // two tasks from performing the deletion at the same time. The first task - // that starts deletion should run it to completion. - Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_)) - | Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => { - return Err(DeleteTimelineError::Other(anyhow::anyhow!(e))); - } + match timeline + .remote_client + .persist_index_part_with_deleted_flag() + .await + { + // If we (now, or already) marked it successfully as deleted, we can proceed + Ok(()) | Err(PersistIndexPartWithDeletedFlagError::AlreadyDeleted(_)) => (), + // Bail out otherwise + // + // AlreadyInProgress shouldn't happen, because the 'delete_lock' prevents + // two tasks from performing the deletion at the same time. The first task + // that starts deletion should run it to completion. + Err(e @ PersistIndexPartWithDeletedFlagError::AlreadyInProgress(_)) + | Err(e @ PersistIndexPartWithDeletedFlagError::Other(_)) => { + return Err(DeleteTimelineError::Other(anyhow::anyhow!(e))); } } Ok(()) @@ -117,11 +119,11 @@ pub(super) async fn delete_local_timeline_directory( /// Removes remote layers and an index file after them. async fn delete_remote_layers_and_index(timeline: &Timeline) -> anyhow::Result<()> { - if let Some(remote_client) = &timeline.remote_client { - remote_client.delete_all().await.context("delete_all")? - }; - - Ok(()) + timeline + .remote_client + .delete_all() + .await + .context("delete_all") } // This function removs remaining traces of a timeline on disk. @@ -260,7 +262,7 @@ impl DeleteTimelineFlow { tenant: Arc, timeline_id: TimelineId, local_metadata: &TimelineMetadata, - remote_client: Option, + remote_client: RemoteTimelineClient, deletion_queue_client: DeletionQueueClient, ) -> anyhow::Result<()> { // Note: here we even skip populating layer map. Timeline is essentially uninitialized. diff --git a/pageserver/src/tenant/timeline/detach_ancestor.rs b/pageserver/src/tenant/timeline/detach_ancestor.rs index 9471ba860f..7f59758c87 100644 --- a/pageserver/src/tenant/timeline/detach_ancestor.rs +++ b/pageserver/src/tenant/timeline/detach_ancestor.rs @@ -70,10 +70,6 @@ pub(super) async fn prepare( ) -> Result<(completion::Completion, PreparedTimelineDetach), Error> { use Error::*; - if detached.remote_client.as_ref().is_none() { - unimplemented!("no new code for running without remote storage"); - } - let Some((ancestor, ancestor_lsn)) = detached .ancestor_timeline .as_ref() @@ -315,8 +311,6 @@ async fn upload_rewritten_layer( // FIXME: better shuttingdown error target .remote_client - .as_ref() - .unwrap() .upload_layer_file(&copied, cancel) .await .map_err(UploadRewritten)?; @@ -406,8 +400,6 @@ async fn remote_copy( // FIXME: better shuttingdown error adoptee .remote_client - .as_ref() - .unwrap() .copy_timeline_layer(adopted, &owned, cancel) .await .map(move |()| owned) @@ -421,11 +413,6 @@ pub(super) async fn complete( prepared: PreparedTimelineDetach, _ctx: &RequestContext, ) -> Result, anyhow::Error> { - let rtc = detached - .remote_client - .as_ref() - .expect("has to have a remote timeline client for timeline ancestor detach"); - let PreparedTimelineDetach { layers } = prepared; let ancestor = detached @@ -442,11 +429,13 @@ pub(super) async fn complete( // // this is not perfect, but it avoids us a retry happening after a compaction or gc on restart // which could give us a completely wrong layer combination. - rtc.schedule_adding_existing_layers_to_index_detach_and_wait( - &layers, - (ancestor.timeline_id, ancestor_lsn), - ) - .await?; + detached + .remote_client + .schedule_adding_existing_layers_to_index_detach_and_wait( + &layers, + (ancestor.timeline_id, ancestor_lsn), + ) + .await?; let mut tasks = tokio::task::JoinSet::new(); @@ -491,8 +480,6 @@ pub(super) async fn complete( async move { let res = timeline .remote_client - .as_ref() - .expect("reparented has to have remote client because detached has one") .schedule_reparenting_and_wait(&new_parent) .await; diff --git a/pageserver/src/tenant/timeline/eviction_task.rs b/pageserver/src/tenant/timeline/eviction_task.rs index 3567761b9a..8a8c38d0ce 100644 --- a/pageserver/src/tenant/timeline/eviction_task.rs +++ b/pageserver/src/tenant/timeline/eviction_task.rs @@ -23,7 +23,7 @@ use std::{ use pageserver_api::models::{EvictionPolicy, EvictionPolicyLayerAccessThreshold}; use tokio::time::Instant; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, info_span, instrument, warn, Instrument}; +use tracing::{debug, info, info_span, instrument, warn, Instrument}; use crate::{ context::{DownloadBehavior, RequestContext}, @@ -211,11 +211,6 @@ impl Timeline { // So, we just need to deal with this. - if self.remote_client.is_none() { - error!("no remote storage configured, cannot evict layers"); - return ControlFlow::Continue(()); - } - let mut js = tokio::task::JoinSet::new(); { let guard = self.layers.read().await; diff --git a/pageserver/src/tenant/timeline/init.rs b/pageserver/src/tenant/timeline/init.rs index 66aa765015..feadc79e5e 100644 --- a/pageserver/src/tenant/timeline/init.rs +++ b/pageserver/src/tenant/timeline/init.rs @@ -9,7 +9,6 @@ use crate::{ storage_layer::LayerName, Generation, }, - METADATA_FILE_NAME, }; use anyhow::Context; use camino::{Utf8Path, Utf8PathBuf}; @@ -27,8 +26,6 @@ pub(super) enum Discovered { Temporary(String), /// Temporary on-demand download files, should be removed TemporaryDownload(String), - /// "metadata" file we persist locally and include in `index_part.json` - Metadata, /// Backup file from previously future layers IgnoredBackup, /// Unrecognized, warn about these @@ -49,9 +46,7 @@ pub(super) fn scan_timeline_dir(path: &Utf8Path) -> anyhow::Result { - if file_name == METADATA_FILE_NAME { - Discovered::Metadata - } else if file_name.ends_with(".old") { + if file_name.ends_with(".old") { // ignore these Discovered::IgnoredBackup } else if remote_timeline_client::is_temp_download_file(direntry.path()) { diff --git a/test_runner/regress/test_broken_timeline.py b/test_runner/regress/test_broken_timeline.py index 7d4e101189..61afd820ca 100644 --- a/test_runner/regress/test_broken_timeline.py +++ b/test_runner/regress/test_broken_timeline.py @@ -56,14 +56,8 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder): (tenant0, timeline0, pg0) = tenant_timelines[0] log.info(f"Timeline {tenant0}/{timeline0} is left intact") - (tenant1, timeline1, pg1) = tenant_timelines[1] - metadata_path = f"{env.pageserver.workdir}/tenants/{tenant1}/timelines/{timeline1}/metadata" - with open(metadata_path, "w") as f: - f.write("overwritten with garbage!") - log.info(f"Timeline {tenant1}/{timeline1} got its metadata spoiled") - - (tenant2, timeline2, pg2) = tenant_timelines[2] - timeline_path = f"{env.pageserver.workdir}/tenants/{tenant2}/timelines/{timeline2}/" + (tenant1, timeline1, pg1) = tenant_timelines[2] + timeline_path = f"{env.pageserver.workdir}/tenants/{tenant1}/timelines/{timeline1}/" for filename in os.listdir(timeline_path): if filename.startswith("00000"): # Looks like a layer file. Corrupt it @@ -72,7 +66,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder): with open(p, "wb") as f: f.truncate(0) f.truncate(size) - log.info(f"Timeline {tenant2}/{timeline2} got its local layer files spoiled") + log.info(f"Timeline {tenant1}/{timeline1} got its local layer files spoiled") env.pageserver.start() @@ -80,19 +74,15 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder): pg0.start() assert pg0.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100 - # Tenant with corrupt local metadata works: remote storage is authoritative for metadata - pg1.start() - assert pg1.safe_psql("SELECT COUNT(*) FROM t")[0][0] == 100 - # Second timeline will fail during basebackup, because the local layer file is corrupt. # It will fail when we try to read (and reconstruct) a page from it, ergo the error message. # (We don't check layer file contents on startup, when loading the timeline) # # This will change when we implement checksums for layers with pytest.raises(Exception, match=f"{reconstruct_function_name} for layer ") as err: - pg2.start() + pg1.start() log.info( - f"As expected, compute startup failed for timeline {tenant2}/{timeline2} with corrupt layers: {err}" + f"As expected, compute startup failed for timeline {tenant1}/{timeline1} with corrupt layers: {err}" )