From 10e4da399737f26a3584ab8822e701e382e2dd43 Mon Sep 17 00:00:00 2001 From: Kirill Bulatov Date: Mon, 2 May 2022 10:46:13 +0300 Subject: [PATCH] Rework timeline batching --- pageserver/src/http/routes.rs | 15 +- pageserver/src/layered_repository.rs | 115 ++-- pageserver/src/storage_sync.rs | 884 +++++++-------------------- pageserver/src/storage_sync/index.rs | 4 +- 4 files changed, 292 insertions(+), 726 deletions(-) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 8940efbda0..0104df826e 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -267,7 +267,7 @@ async fn timeline_attach_handler(request: Request) -> Result { tokio::fs::create_dir_all(state.conf.timeline_path(&timeline_id, &tenant_id)) .await @@ -300,11 +300,11 @@ async fn timeline_attach_handler(request: Request) -> Result anyhow::Result> { - let shard = match state.remote_storage.as_ref() { + let index_part = match state.remote_storage.as_ref() { Some(GenericRemoteStorage::Local(local_storage)) => { storage_sync::download_index_part(state.conf, local_storage, sync_id).await } @@ -313,18 +313,15 @@ async fn try_download_shard_data( } None => return Ok(None), } - .with_context(|| format!("Failed to download index shard for timeline {}", sync_id))?; + .with_context(|| format!("Failed to download index part for timeline {sync_id}"))?; let timeline_path = state .conf .timeline_path(&sync_id.timeline_id, &sync_id.tenant_id); - RemoteTimeline::from_index_part(&timeline_path, shard) + RemoteTimeline::from_index_part(&timeline_path, index_part) .map(Some) .with_context(|| { - format!( - "Failed to convert index shard into remote timeline for timeline {}", - sync_id - ) + format!("Failed to convert index part into remote timeline for timeline {sync_id}") }) } diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index da2699b15d..039bf8d1ed 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -455,7 +455,7 @@ enum LayeredTimelineEntry { impl LayeredTimelineEntry { fn timeline_id(&self) -> ZTimelineId { match self { - LayeredTimelineEntry::Loaded(timeline) => timeline.timelineid, + LayeredTimelineEntry::Loaded(timeline) => timeline.timeline_id, LayeredTimelineEntry::Unloaded { id, .. } => *id, } } @@ -615,21 +615,17 @@ impl LayeredRepository { fn load_local_timeline( &self, - timelineid: ZTimelineId, + timeline_id: ZTimelineId, timelines: &mut HashMap, ) -> anyhow::Result> { - let metadata = load_metadata(self.conf, timelineid, self.tenant_id) + let metadata = load_metadata(self.conf, timeline_id, self.tenant_id) .context("failed to load metadata")?; let disk_consistent_lsn = metadata.disk_consistent_lsn(); let ancestor = metadata .ancestor_timeline() .map(|ancestor_timeline_id| { - trace!( - "loading {}'s ancestor {}", - timelineid, - &ancestor_timeline_id - ); + trace!("loading {timeline_id}'s ancestor {}", &ancestor_timeline_id); self.get_timeline_load_internal(ancestor_timeline_id, timelines) }) .transpose() @@ -643,7 +639,7 @@ impl LayeredRepository { Arc::clone(&self.tenant_conf), metadata, ancestor, - timelineid, + timeline_id, self.tenant_id, Arc::clone(&self.walredo_mgr), self.upload_layers, @@ -902,8 +898,8 @@ pub struct LayeredTimeline { conf: &'static PageServerConf, tenant_conf: Arc>, - tenantid: ZTenantId, - timelineid: ZTimelineId, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, layers: RwLock, @@ -1177,50 +1173,50 @@ impl LayeredTimeline { tenant_conf: Arc>, metadata: TimelineMetadata, ancestor: Option, - timelineid: ZTimelineId, - tenantid: ZTenantId, + timeline_id: ZTimelineId, + tenant_id: ZTenantId, walredo_mgr: Arc, upload_layers: bool, ) -> LayeredTimeline { let reconstruct_time_histo = RECONSTRUCT_TIME - .get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()]) + .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) .unwrap(); let materialized_page_cache_hit_counter = MATERIALIZED_PAGE_CACHE_HIT - .get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()]) + .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) .unwrap(); let flush_time_histo = STORAGE_TIME .get_metric_with_label_values(&[ "layer flush", - &tenantid.to_string(), - &timelineid.to_string(), + &tenant_id.to_string(), + &timeline_id.to_string(), ]) .unwrap(); let compact_time_histo = STORAGE_TIME .get_metric_with_label_values(&[ "compact", - &tenantid.to_string(), - &timelineid.to_string(), + &tenant_id.to_string(), + &timeline_id.to_string(), ]) .unwrap(); let create_images_time_histo = STORAGE_TIME .get_metric_with_label_values(&[ "create images", - &tenantid.to_string(), - &timelineid.to_string(), + &tenant_id.to_string(), + &timeline_id.to_string(), ]) .unwrap(); let last_record_gauge = LAST_RECORD_LSN - .get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()]) + .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) .unwrap(); let wait_lsn_time_histo = WAIT_LSN_TIME - .get_metric_with_label_values(&[&tenantid.to_string(), &timelineid.to_string()]) + .get_metric_with_label_values(&[&tenant_id.to_string(), &timeline_id.to_string()]) .unwrap(); LayeredTimeline { conf, tenant_conf, - timelineid, - tenantid, + timeline_id, + tenant_id, layers: RwLock::new(LayerMap::default()), walredo_mgr, @@ -1272,7 +1268,7 @@ impl LayeredTimeline { // Scan timeline directory and create ImageFileName and DeltaFilename // structs representing all files on disk - let timeline_path = self.conf.timeline_path(&self.timelineid, &self.tenantid); + let timeline_path = self.conf.timeline_path(&self.timeline_id, &self.tenant_id); for direntry in fs::read_dir(timeline_path)? { let direntry = direntry?; @@ -1284,7 +1280,7 @@ impl LayeredTimeline { if imgfilename.lsn > disk_consistent_lsn { warn!( "found future image layer {} on timeline {} disk_consistent_lsn is {}", - imgfilename, self.timelineid, disk_consistent_lsn + imgfilename, self.timeline_id, disk_consistent_lsn ); rename_to_backup(direntry.path())?; @@ -1292,7 +1288,7 @@ impl LayeredTimeline { } let layer = - ImageLayer::new(self.conf, self.timelineid, self.tenantid, &imgfilename); + ImageLayer::new(self.conf, self.timeline_id, self.tenant_id, &imgfilename); trace!("found layer {}", layer.filename().display()); layers.insert_historic(Arc::new(layer)); @@ -1307,7 +1303,7 @@ impl LayeredTimeline { if deltafilename.lsn_range.end > disk_consistent_lsn + 1 { warn!( "found future delta layer {} on timeline {} disk_consistent_lsn is {}", - deltafilename, self.timelineid, disk_consistent_lsn + deltafilename, self.timeline_id, disk_consistent_lsn ); rename_to_backup(direntry.path())?; @@ -1315,7 +1311,7 @@ impl LayeredTimeline { } let layer = - DeltaLayer::new(self.conf, self.timelineid, self.tenantid, &deltafilename); + DeltaLayer::new(self.conf, self.timeline_id, self.tenant_id, &deltafilename); trace!("found layer {}", layer.filename().display()); layers.insert_historic(Arc::new(layer)); @@ -1497,7 +1493,7 @@ impl LayeredTimeline { // FIXME: It's pointless to check the cache for things that are not 8kB pages. // We should look at the key to determine if it's a cacheable object let (lsn, read_guard) = - cache.lookup_materialized_page(self.tenantid, self.timelineid, key, lsn)?; + cache.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)?; let img = Bytes::from(read_guard.to_vec()); Some((lsn, img)) } @@ -1509,7 +1505,7 @@ impl LayeredTimeline { .with_context(|| { format!( "Ancestor is missing. Timeline id: {} Ancestor id {:?}", - self.timelineid, + self.timeline_id, self.get_ancestor_timeline_id(), ) })? @@ -1517,7 +1513,7 @@ impl LayeredTimeline { .with_context(|| { format!( "Ancestor timeline is not is not loaded. Timeline id: {} Ancestor id {:?}", - self.timelineid, + self.timeline_id, self.get_ancestor_timeline_id(), ) })?; @@ -1554,12 +1550,12 @@ impl LayeredTimeline { trace!( "creating layer for write at {}/{} for record at {}", - self.timelineid, + self.timeline_id, start_lsn, lsn ); let new_layer = - InMemoryLayer::create(self.conf, self.timelineid, self.tenantid, start_lsn)?; + InMemoryLayer::create(self.conf, self.timeline_id, self.tenant_id, start_lsn)?; let layer_rc = Arc::new(new_layer); layers.open_layer = Some(Arc::clone(&layer_rc)); @@ -1633,8 +1629,8 @@ impl LayeredTimeline { let self_clone = Arc::clone(self); thread_mgr::spawn( thread_mgr::ThreadKind::LayerFlushThread, - Some(self.tenantid), - Some(self.timelineid), + Some(self.tenant_id), + Some(self.timeline_id), "layer flush thread", false, move || self_clone.flush_frozen_layers(false), @@ -1703,7 +1699,7 @@ impl LayeredTimeline { // them all in parallel. par_fsync::par_fsync(&[ new_delta_path.clone(), - self.conf.timeline_path(&self.timelineid, &self.tenantid), + self.conf.timeline_path(&self.timeline_id, &self.tenant_id), ])?; fail_point!("checkpoint-before-sync"); @@ -1775,8 +1771,8 @@ impl LayeredTimeline { LayeredRepository::save_metadata( self.conf, - self.timelineid, - self.tenantid, + self.timeline_id, + self.tenant_id, &metadata, false, )?; @@ -1786,8 +1782,8 @@ impl LayeredTimeline { if self.upload_layers.load(atomic::Ordering::Relaxed) { storage_sync::schedule_layer_upload( - self.tenantid, - self.timelineid, + self.tenant_id, + self.timeline_id, HashSet::from([new_delta_path]), Some(metadata), ); @@ -1840,7 +1836,8 @@ impl LayeredTimeline { let target_file_size = self.get_checkpoint_distance(); // Define partitioning schema if needed - if let Ok(pgdir) = tenant_mgr::get_local_timeline_with_load(self.tenantid, self.timelineid) + if let Ok(pgdir) = + tenant_mgr::get_local_timeline_with_load(self.tenant_id, self.timeline_id) { let (partitioning, lsn) = pgdir.repartition( self.get_last_record_lsn(), @@ -1858,8 +1855,8 @@ impl LayeredTimeline { } if self.upload_layers.load(atomic::Ordering::Relaxed) { storage_sync::schedule_layer_upload( - self.tenantid, - self.timelineid, + self.tenant_id, + self.timeline_id, layer_paths_to_upload, None, ); @@ -1909,7 +1906,7 @@ impl LayeredTimeline { let img_range = partition.ranges.first().unwrap().start..partition.ranges.last().unwrap().end; let mut image_layer_writer = - ImageLayerWriter::new(self.conf, self.timelineid, self.tenantid, &img_range, lsn)?; + ImageLayerWriter::new(self.conf, self.timeline_id, self.tenant_id, &img_range, lsn)?; for range in &partition.ranges { let mut key = range.start; @@ -1932,7 +1929,7 @@ impl LayeredTimeline { // and fsync them all in parallel. par_fsync::par_fsync(&[ image_layer.path(), - self.conf.timeline_path(&self.timelineid, &self.tenantid), + self.conf.timeline_path(&self.timeline_id, &self.tenant_id), ])?; // FIXME: Do we need to do something to upload it to remote storage here? @@ -2008,8 +2005,8 @@ impl LayeredTimeline { if writer.is_none() { writer = Some(DeltaLayerWriter::new( self.conf, - self.timelineid, - self.tenantid, + self.timeline_id, + self.tenant_id, key, lsn_range.clone(), )?); @@ -2027,7 +2024,7 @@ impl LayeredTimeline { let mut layer_paths: Vec = new_layers.iter().map(|l| l.path()).collect(); // also sync the directory - layer_paths.push(self.conf.timeline_path(&self.timelineid, &self.tenantid)); + layer_paths.push(self.conf.timeline_path(&self.timeline_id, &self.tenant_id)); // Fsync all the layer files and directory using multiple threads to // minimize latency. @@ -2057,14 +2054,14 @@ impl LayeredTimeline { if self.upload_layers.load(atomic::Ordering::Relaxed) { storage_sync::schedule_layer_upload( - self.tenantid, - self.timelineid, + self.tenant_id, + self.timeline_id, new_layer_paths, None, ); storage_sync::schedule_layer_delete( - self.tenantid, - self.timelineid, + self.tenant_id, + self.timeline_id, layer_paths_do_delete, ); } @@ -2121,7 +2118,7 @@ impl LayeredTimeline { let cutoff = gc_info.cutoff; let pitr = gc_info.pitr; - let _enter = info_span!("garbage collection", timeline = %self.timelineid, tenant = %self.tenantid, cutoff = %cutoff).entered(); + let _enter = info_span!("garbage collection", timeline = %self.timeline_id, tenant = %self.tenant_id, cutoff = %cutoff).entered(); // We need to ensure that no one branches at a point before latest_gc_cutoff_lsn. // See branch_timeline() for details. @@ -2254,8 +2251,8 @@ impl LayeredTimeline { if self.upload_layers.load(atomic::Ordering::Relaxed) { storage_sync::schedule_layer_delete( - self.tenantid, - self.timelineid, + self.tenant_id, + self.timeline_id, layer_paths_to_delete, ); } @@ -2323,8 +2320,8 @@ impl LayeredTimeline { if img.len() == page_cache::PAGE_SZ { let cache = page_cache::get(); cache.memorize_materialized_page( - self.tenantid, - self.timelineid, + self.tenant_id, + self.timeline_id, key, last_rec_lsn, &img, diff --git a/pageserver/src/storage_sync.rs b/pageserver/src/storage_sync.rs index bcc18e8ce4..b6091015b9 100644 --- a/pageserver/src/storage_sync.rs +++ b/pageserver/src/storage_sync.rs @@ -92,12 +92,12 @@ //! A queue is implemented in the [`sync_queue`] module as a pair of sender and receiver channels, to block on zero tasks instead of checking the queue. //! The pair's shared buffer of a fixed size serves as an implicit queue, holding [`SyncTask`] for local files upload/download operations. //! -//! The queue gets emptied by a single thread with the loop, that polls the tasks in batches of deduplicated tasks (size configurable). -//! A task from the batch corresponds to a single timeline, with its files to sync merged together. -//! Every batch task and layer file in the task is processed concurrently, which is possible due to incremental nature of the timelines: -//! it's not asserted, but assumed that timeline's checkpoints only add the files locally, not removing or amending the existing ones. -//! Only GC removes local timeline files, the GC support is not added to sync currently, -//! yet downloading extra files is not critically bad at this stage, GC can remove those again. +//! The queue gets emptied by a single thread with the loop, that polls the tasks in batches of deduplicated tasks. +//! A task from the batch corresponds to a single timeline, with its files to sync merged together: given that only one task sync loop step is active at a time, +//! timeline uploads and downloads can happen concurrently, in no particular order due to incremental nature of the timeline layers. +//! Deletion happens only after a successful upload only, otherwise the compation output might make the timeline inconsistent until both tasks are fully processed without errors. +//! Upload and download update the remote data (inmemory index and S3 json index part file) only after every layer is successfully synchronized, while the deletion task +//! does otherwise: it requires to have the remote data updated first succesfully: blob files will be invisible to pageserver this way. //! //! During the loop startup, an initial [`RemoteTimelineIndex`] state is constructed via downloading and merging the index data for all timelines, //! present locally. @@ -119,7 +119,7 @@ //! Among other tasks, the index is used to prevent invalid uploads and non-existing downloads on demand, refer to [`index`] for more details. //! //! Index construction is currently the only place where the storage sync can return an [`Err`] to the user. -//! New sync tasks are accepted via [`schedule_timeline_checkpoint_upload`] and [`schedule_timeline_download`] functions, +//! New sync tasks are accepted via [`schedule_layer_upload`], [`schedule_layer_download`] and [`schedule_layer_delete`] functions, //! disregarding of the corresponding loop startup. //! It's up to the caller to avoid synchronizations if the loop is disabled: otherwise, the sync tasks will be ignored. //! After the initial state is loaded into memory and the loop starts, any further [`Err`] results do not stop the loop, but rather @@ -449,7 +449,7 @@ fn collect_timeline_files( /// mpsc approach was picked to allow blocking the sync loop if no tasks are present, to avoid meaningless spinning. mod sync_queue { use std::{ - collections::{hash_map, HashMap, HashSet}, + collections::{HashMap, HashSet}, num::NonZeroUsize, ops::ControlFlow, sync::atomic::{AtomicUsize, Ordering}, @@ -460,7 +460,7 @@ mod sync_queue { use tokio::sync::mpsc::{error::TryRecvError, UnboundedReceiver, UnboundedSender}; use tracing::{debug, warn}; - use super::SyncTask; + use super::{SyncTask, SyncTaskBatch}; use utils::zid::ZTenantTimelineId; static SENDER: OnceCell> = OnceCell::new(); @@ -512,10 +512,10 @@ mod sync_queue { /// Not blocking, can return fewer tasks if the queue does not contain enough. /// Batch tasks are split by timelines, with all related tasks merged into one (download/upload) /// or two (download and upload, if both were found in the queue during batch construction). - pub async fn next_task_batch( + pub(super) async fn next_task_batch( receiver: &mut UnboundedReceiver<(ZTenantTimelineId, SyncTask)>, max_timelines_to_sync: NonZeroUsize, - ) -> ControlFlow<(), HashMap> { + ) -> ControlFlow<(), HashMap> { // request the first task in blocking fashion to do less meaningless work let (first_sync_id, first_task) = if let Some(first_task) = next_task(receiver).await { first_task @@ -529,26 +529,21 @@ mod sync_queue { batched_timelines.insert(first_sync_id.timeline_id); let mut tasks = HashMap::new(); - tasks.insert(first_sync_id, first_task); + tasks.insert(first_sync_id, SyncTaskBatch::new(first_task)); loop { if batched_timelines.len() >= max_timelines_to_sync { - debug!("Filled a full task batch with {max_timelines_to_sync} timeline sync operations"); + debug!( + "Filled a full task batch with {} timeline sync operations", + batched_timelines.len() + ); break; } match receiver.try_recv() { Ok((sync_id, new_task)) => { LENGTH.fetch_sub(1, Ordering::Relaxed); - match tasks.entry(sync_id) { - hash_map::Entry::Occupied(o) => { - let current = o.remove(); - tasks.insert(sync_id, current.merge(new_task)); - } - hash_map::Entry::Vacant(v) => { - v.insert(new_task); - } - } + tasks.entry(sync_id).or_default().add(new_task); batched_timelines.insert(sync_id.timeline_id); } Err(TryRecvError::Disconnected) => { @@ -583,8 +578,8 @@ pub enum SyncTask { Download(SyncData), /// A certain amount of image files to download. Upload(SyncData), - /// Both upload and download layers need to be synced. - DownloadAndUpload(SyncData, SyncData), + /// Delete remote files. + Delete(SyncData>), } /// Stores the data to synd and its retries, to evict the tasks failing to frequently. @@ -609,121 +604,70 @@ impl SyncTask { Self::Upload(SyncData::new(0, upload_task)) } - /// Merges two tasks into one with the following rules: - /// - /// * Download + Download = Download with the retry counter reset and the layers to skip combined - /// * DownloadAndUpload + Download = DownloadAndUpload with Upload unchanged and the Download counterparts united by the same rules - /// * Upload + Upload = Upload with the retry counter reset and the layers to upload and the uploaded layers combined - /// * DownloadAndUpload + Upload = DownloadAndUpload with Download unchanged and the Upload counterparts united by the same rules - /// * Upload + Download = DownloadAndUpload with both tasks unchanged - /// * DownloadAndUpload + DownloadAndUpload = DownloadAndUpload with both parts united by the same rules - fn merge(mut self, other: Self) -> Self { - match (&mut self, other) { - ( - SyncTask::DownloadAndUpload(download_data, _) | SyncTask::Download(download_data), - SyncTask::Download(new_download_data), - ) - | ( - SyncTask::Download(download_data), - SyncTask::DownloadAndUpload(new_download_data, _), - ) => { - download_data - .data - .layers_to_skip - .extend(new_download_data.data.layers_to_skip.into_iter()); - download_data.retries = 0; - } - (SyncTask::Upload(upload), SyncTask::Download(new_download_data)) => { - self = SyncTask::DownloadAndUpload(new_download_data, upload.clone()); - } + fn delete(layers_to_delete: HashSet) -> Self { + Self::Delete(SyncData::new(0, layers_to_delete)) + } +} - ( - SyncTask::DownloadAndUpload(_, upload_data) | SyncTask::Upload(upload_data), - SyncTask::Upload(new_upload_data), - ) - | (SyncTask::Upload(upload_data), SyncTask::DownloadAndUpload(_, new_upload_data)) => { - upload_data - .data - .layers_to_upload - .extend(new_upload_data.data.layers_to_upload.into_iter()); - upload_data - .data - .uploaded_layers - .extend(new_upload_data.data.uploaded_layers.into_iter()); - upload_data.retries = 0; +#[derive(Debug, Default)] +struct SyncTaskBatch { + upload: Option>, + download: Option>, + delete: Option>>, +} - if new_upload_data - .data - .metadata - .as_ref() - .map(|meta| meta.disk_consistent_lsn()) - > upload_data +impl SyncTaskBatch { + fn new(task: SyncTask) -> Self { + let mut new_self = Self::default(); + new_self.add(task); + new_self + } + + fn add(&mut self, task: SyncTask) { + match task { + SyncTask::Download(new_download) => match &mut self.download { + Some(batch_download) => { + batch_download.retries = batch_download.retries.min(new_download.retries); + batch_download .data + .layers_to_skip + .extend(new_download.data.layers_to_skip.into_iter()); + } + None => self.download = Some(new_download), + }, + SyncTask::Upload(new_upload) => match &mut self.upload { + Some(batch_upload) => { + batch_upload.retries = batch_upload.retries.min(new_upload.retries); + + let batch_data = &mut batch_upload.data; + let new_data = new_upload.data; + batch_data + .layers_to_upload + .extend(new_data.layers_to_upload.into_iter()); + batch_data + .uploaded_layers + .extend(new_data.uploaded_layers.into_iter()); + if batch_data .metadata .as_ref() .map(|meta| meta.disk_consistent_lsn()) - { - upload_data.data.metadata = new_upload_data.data.metadata; + <= new_data + .metadata + .as_ref() + .map(|meta| meta.disk_consistent_lsn()) + { + batch_data.metadata = new_data.metadata; + } } - } - (SyncTask::Download(download), SyncTask::Upload(new_upload_data)) => { - self = SyncTask::DownloadAndUpload(download.clone(), new_upload_data) - } - - ( - SyncTask::DownloadAndUpload(download_data, upload_data), - SyncTask::DownloadAndUpload(new_download_data, new_upload_data), - ) => { - download_data - .data - .layers_to_skip - .extend(new_download_data.data.layers_to_skip.into_iter()); - download_data.retries = 0; - - upload_data - .data - .layers_to_upload - .extend(new_upload_data.data.layers_to_upload.into_iter()); - upload_data - .data - .uploaded_layers - .extend(new_upload_data.data.uploaded_layers.into_iter()); - upload_data.retries = 0; - - if new_upload_data - .data - .metadata - .as_ref() - .map(|meta| meta.disk_consistent_lsn()) - > upload_data - .data - .metadata - .as_ref() - .map(|meta| meta.disk_consistent_lsn()) - { - upload_data.data.metadata = new_upload_data.data.metadata; + None => self.upload = Some(new_upload), + }, + SyncTask::Delete(new_delete) => match &mut self.delete { + Some(batch_delete) => { + batch_delete.retries = batch_delete.retries.min(new_delete.retries); + batch_delete.data.extend(new_delete.data.into_iter()); } - } - } - - self - } - - fn name(&self) -> &'static str { - match self { - SyncTask::Download(_) => "download", - SyncTask::Upload(_) => "upload", - SyncTask::DownloadAndUpload(_, _) => "download and upload", - } - } - - fn retries(&self) -> u32 { - match self { - SyncTask::Download(data) => data.retries, - SyncTask::Upload(data) => data.retries, - SyncTask::DownloadAndUpload(download_data, upload_data) => { - download_data.retries.max(upload_data.retries) - } + None => self.delete = Some(new_delete), + }, } } } @@ -760,6 +704,7 @@ pub fn schedule_layer_upload( layers_to_upload: HashSet, metadata: Option, ) { + debug!("Scheduling layer upload for tenant {tenant_id}, timeline {timeline_id}, to upload: {layers_to_upload:?}"); if !sync_queue::push( ZTenantTimelineId { tenant_id, @@ -771,18 +716,29 @@ pub fn schedule_layer_upload( metadata, }), ) { - warn!("Could not send an upload task for tenant {tenant_id}, timeline {timeline_id}",) + warn!("Could not send an upload task for tenant {tenant_id}, timeline {timeline_id}") } else { debug!("Upload task for tenant {tenant_id}, timeline {timeline_id} sent") } } pub fn schedule_layer_delete( - _tenant_id: ZTenantId, - _timeline_id: ZTimelineId, - _layers_to_delete: HashSet, + tenant_id: ZTenantId, + timeline_id: ZTimelineId, + layers_to_delete: HashSet, ) { - // TODO kb implement later + debug!("Scheduling layer deletion for tenant {tenant_id}, timeline {timeline_id}, to delete: {layers_to_delete:?}"); + if !sync_queue::push( + ZTenantTimelineId { + tenant_id, + timeline_id, + }, + SyncTask::delete(layers_to_delete), + ) { + warn!("Could not send deletion task for tenant {tenant_id}, timeline {timeline_id}") + } else { + debug!("Deletion task for tenant {tenant_id}, timeline {timeline_id} sent") + } } /// Requests the download of the entire timeline for a given tenant. @@ -948,13 +904,13 @@ where let mut sync_results = batched_tasks .into_iter() - .map(|(sync_id, task)| { + .map(|(sync_id, batch)| { let storage = Arc::clone(&storage); let index = index.clone(); async move { let state_update = - process_sync_task(conf, storage, index, max_sync_errors, sync_id, task) - .instrument(info_span!("process_sync_tasks", sync_id = %sync_id)) + process_sync_task_batch(conf, storage, index, max_sync_errors, sync_id, batch) + .instrument(info_span!("process_sync_task_batch", sync_id = %sync_id)) .await; (sync_id, state_update) } @@ -978,13 +934,13 @@ where ControlFlow::Continue(new_timeline_states) } -async fn process_sync_task( +async fn process_sync_task_batch( conf: &'static PageServerConf, storage: Arc, index: RemoteIndex, max_sync_errors: NonZeroU32, sync_id: ZTenantTimelineId, - task: SyncTask, + batch: SyncTaskBatch, ) -> Option where P: Debug + Send + Sync + 'static, @@ -993,124 +949,103 @@ where let sync_start = Instant::now(); let current_remote_timeline = { index.read().await.timeline_entry(&sync_id).cloned() }; - let task = match validate_task_retries(sync_id, task, max_sync_errors) { - ControlFlow::Continue(task) => task, - ControlFlow::Break(aborted_task) => { - match aborted_task { - SyncTask::Download(_) => { - index - .write() - .await - .set_awaits_download(&sync_id, false) - .ok(); - } - SyncTask::Upload(failed_upload_data) => { - if let Err(e) = update_remote_data( - conf, - storage.as_ref(), - &index, - sync_id, - &failed_upload_data.data, - true, - ) + let upload_data = batch.upload.clone(); + let download_data = batch.download.clone(); + let ((), status_update) = tokio::join!( + async { + if let Some(upload_data) = upload_data { + match validate_task_retries(upload_data, max_sync_errors) + .instrument(info_span!("retries_validation")) .await - { - error!("Failed to update remote timeline {sync_id}: {e:?}"); + { + ControlFlow::Continue(new_upload_data) => { + upload_timeline_data( + conf, + (storage.as_ref(), &index), + current_remote_timeline.as_ref(), + sync_id, + new_upload_data, + sync_start, + "upload", + ) + .await; } - } - SyncTask::DownloadAndUpload(_, failed_upload_data) => { - index - .write() + ControlFlow::Break(failed_upload_data) => { + if let Err(e) = update_remote_data( + conf, + storage.as_ref(), + &index, + sync_id, + &failed_upload_data.data, + true, + ) .await - .set_awaits_download(&sync_id, false) - .ok(); - if let Err(e) = update_remote_data( - conf, - storage.as_ref(), - &index, - sync_id, - &failed_upload_data.data, - true, - ) - .await - { - error!("Failed to update remote timeline {sync_id}: {e:?}"); + { + error!("Failed to update remote timeline {sync_id}: {e:?}"); + } } } } - return None; } - }; - - let task_name = task.name(); - let current_task_attempt = task.retries(); - info!("Sync task '{task_name}' processing started, attempt #{current_task_attempt}"); - - if current_task_attempt > 0 { - let seconds_to_wait = 2.0_f64.powf(current_task_attempt as f64 - 1.0).min(30.0); - info!("Waiting {seconds_to_wait} seconds before starting the '{task_name}' task"); - tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await; - } - - let status_update = match task { - SyncTask::Download(new_download_data) => { - download_timeline( - conf, - (storage.as_ref(), &index), - current_remote_timeline.as_ref(), - sync_id, - new_download_data, - sync_start, - task_name, - ) - .await - } - SyncTask::Upload(new_upload_data) => { - upload_timeline( - conf, - (storage.as_ref(), &index), - current_remote_timeline.as_ref(), - sync_id, - new_upload_data, - sync_start, - task_name, - ) - .await; + .instrument(info_span!("upload_timeline_data")), + async { + if let Some(download_data) = download_data { + match validate_task_retries(download_data, max_sync_errors) + .instrument(info_span!("retries_validation")) + .await + { + ControlFlow::Continue(new_download_data) => { + return download_timeline_data( + conf, + (storage.as_ref(), &index), + current_remote_timeline.as_ref(), + sync_id, + new_download_data, + sync_start, + "download", + ) + .await + } + ControlFlow::Break(_) => { + index + .write() + .await + .set_awaits_download(&sync_id, false) + .ok(); + } + } + } None } - SyncTask::DownloadAndUpload(new_download_data, new_upload_data) => { - let status_update = download_timeline( - conf, - (storage.as_ref(), &index), - current_remote_timeline.as_ref(), - sync_id, - new_download_data, - sync_start, - task_name, - ) - .await; + .instrument(info_span!("download_timeline_data")), + ); - upload_timeline( - conf, - (storage.as_ref(), &index), - current_remote_timeline.as_ref(), - sync_id, - new_upload_data, - sync_start, - task_name, - ) - .await; - - status_update + if let Some(delete_data) = batch.delete { + match validate_task_retries(delete_data, max_sync_errors) + .instrument(info_span!("retries_validation")) + .await + { + ControlFlow::Continue(new_delete_data) => { + delete_timeline_data( + conf, + (storage.as_ref(), &index), + current_remote_timeline.as_ref(), + sync_id, + new_delete_data, + sync_start, + "delete", + ) + .instrument(info_span!("delete_timeline_data")) + .await; + } + ControlFlow::Break(_) => {} } - }; - - info!("Finished processing the task"); + } status_update } -async fn download_timeline( +async fn download_timeline_data( conf: &'static PageServerConf, (storage, index): (&S, &RemoteIndex), current_remote_timeline: Option<&RemoteTimeline>, @@ -1228,6 +1163,31 @@ async fn update_local_metadata( Ok(()) } +async fn delete_timeline_data( + conf: &PageServerConf, + index: (&S, &RemoteIndex), + as_ref: Option<&RemoteTimeline>, + sync_id: ZTenantTimelineId, + new_delete_data: SyncData>, + sync_start: Instant, + task_name: &str, +) -> Option<()> +where + P: Debug + Send + Sync + 'static, + S: RemoteStorage + Send + Sync + 'static, +{ + // match update_remote_data(conf, storage, index, sync_id, &uploaded_data.data, false).await { + // Ok(()) => register_sync_status(sync_start, task_name, Some(true)), + // Err(e) => { + // error!("Failed to update remote timeline {sync_id}: {e:?}"); + // uploaded_data.retries += 1; + // sync_queue::push(sync_id, SyncTask::Upload(uploaded_data)); + // register_sync_status(sync_start, task_name, Some(false)); + // } + // } + todo!("TODO kb") +} + async fn read_metadata_file(metadata_path: &Path) -> anyhow::Result { TimelineMetadata::from_bytes( &fs::read(metadata_path) @@ -1237,7 +1197,7 @@ async fn read_metadata_file(metadata_path: &Path) -> anyhow::Result( +async fn upload_timeline_data( conf: &'static PageServerConf, (storage, index): (&S, &RemoteIndex), current_remote_timeline: Option<&RemoteTimeline>, @@ -1245,7 +1205,8 @@ async fn upload_timeline( new_upload_data: SyncData, sync_start: Instant, task_name: &str, -) where +) -> Option<()> +where P: Debug + Send + Sync + 'static, S: RemoteStorage + Send + Sync + 'static, { @@ -1255,7 +1216,7 @@ async fn upload_timeline( { UploadedTimeline::FailedAndRescheduled => { register_sync_status(sync_start, task_name, Some(false)); - return; + return None; } UploadedTimeline::Successful(upload_data) => upload_data, UploadedTimeline::SuccessfulAfterLocalFsUpdate(mut outdated_upload_data) => { @@ -1272,7 +1233,7 @@ async fn upload_timeline( outdated_upload_data.retries += 1; sync_queue::push(sync_id, SyncTask::Upload(outdated_upload_data)); register_sync_status(sync_start, task_name, Some(false)); - return; + return None; } }; outdated_upload_data.data.metadata = Some(local_metadata); @@ -1282,12 +1243,16 @@ async fn upload_timeline( }; match update_remote_data(conf, storage, index, sync_id, &uploaded_data.data, false).await { - Ok(()) => register_sync_status(sync_start, task_name, Some(true)), + Ok(()) => { + register_sync_status(sync_start, task_name, Some(true)); + Some(()) + } Err(e) => { error!("Failed to update remote timeline {sync_id}: {e:?}"); uploaded_data.retries += 1; sync_queue::push(sync_id, SyncTask::Upload(uploaded_data)); register_sync_status(sync_start, task_name, Some(false)); + None } } } @@ -1358,51 +1323,25 @@ where .context("Failed to upload new index part") } -fn validate_task_retries( - sync_id: ZTenantTimelineId, - task: SyncTask, +async fn validate_task_retries( + sync_data: SyncData, max_sync_errors: NonZeroU32, -) -> ControlFlow { +) -> ControlFlow, SyncData> { + let current_attempt = sync_data.retries; let max_sync_errors = max_sync_errors.get(); - let mut skip_upload = false; - let mut skip_download = false; - - match &task { - SyncTask::Download(download_data) | SyncTask::DownloadAndUpload(download_data, _) - if download_data.retries > max_sync_errors => - { - error!( - "Evicting download task for timeline {sync_id} that failed {} times, exceeding the error threshold {max_sync_errors}", - download_data.retries - ); - skip_download = true; - } - SyncTask::Upload(upload_data) | SyncTask::DownloadAndUpload(_, upload_data) - if upload_data.retries > max_sync_errors => - { - error!( - "Evicting upload task for timeline {sync_id} that failed {} times, exceeding the error threshold {max_sync_errors}", - upload_data.retries, - ); - skip_upload = true; - } - _ => {} + if current_attempt >= max_sync_errors { + error!( + "Aborting task that failed {current_attempt} times, exceeding retries threshold of {max_sync_errors}", + ); + return ControlFlow::Break(sync_data); } - match task { - aborted_task @ SyncTask::Download(_) if skip_download => ControlFlow::Break(aborted_task), - aborted_task @ SyncTask::Upload(_) if skip_upload => ControlFlow::Break(aborted_task), - aborted_task @ SyncTask::DownloadAndUpload(_, _) if skip_upload && skip_download => { - ControlFlow::Break(aborted_task) - } - SyncTask::DownloadAndUpload(download_task, _) if skip_upload => { - ControlFlow::Continue(SyncTask::Download(download_task)) - } - SyncTask::DownloadAndUpload(_, upload_task) if skip_download => { - ControlFlow::Continue(SyncTask::Upload(upload_task)) - } - not_skipped => ControlFlow::Continue(not_skipped), + if current_attempt > 0 { + let seconds_to_wait = 2.0_f64.powf(current_attempt as f64 - 1.0).min(30.0); + info!("Waiting {seconds_to_wait} seconds before starting the task"); + tokio::time::sleep(Duration::from_secs_f64(seconds_to_wait)).await; } + ControlFlow::Continue(sync_data) } async fn try_fetch_index_parts( @@ -1602,370 +1541,3 @@ mod test_utils { TimelineMetadata::new(disk_consistent_lsn, None, None, Lsn(0), Lsn(0), Lsn(0)) } } - -#[cfg(test)] -mod tests { - use std::collections::BTreeSet; - - use super::{test_utils::dummy_metadata, *}; - use utils::lsn::Lsn; - - #[test] - fn download_sync_tasks_merge() { - let download_1 = SyncTask::Download(SyncData::new( - 2, - TimelineDownload { - layers_to_skip: HashSet::from([PathBuf::from("one")]), - }, - )); - let download_2 = SyncTask::Download(SyncData::new( - 6, - TimelineDownload { - layers_to_skip: HashSet::from([PathBuf::from("two"), PathBuf::from("three")]), - }, - )); - - let merged_download = match download_1.merge(download_2) { - SyncTask::Download(merged_download) => merged_download, - wrong_merge_result => panic!("Unexpected merge result: {wrong_merge_result:?}"), - }; - - assert_eq!( - merged_download.retries, 0, - "Merged task should have its retries counter reset" - ); - - assert_eq!( - merged_download - .data - .layers_to_skip - .into_iter() - .collect::>(), - BTreeSet::from([ - PathBuf::from("one"), - PathBuf::from("two"), - PathBuf::from("three") - ]), - "Merged download tasks should a combined set of layers to skip" - ); - } - - #[test] - fn upload_sync_tasks_merge() { - let metadata_1 = dummy_metadata(Lsn(1)); - let metadata_2 = dummy_metadata(Lsn(2)); - assert!(metadata_2.disk_consistent_lsn() > metadata_1.disk_consistent_lsn()); - - let upload_1 = SyncTask::Upload(SyncData::new( - 2, - TimelineUpload { - layers_to_upload: HashSet::from([PathBuf::from("one")]), - uploaded_layers: HashSet::from([PathBuf::from("u_one")]), - metadata: Some(metadata_1), - }, - )); - let upload_2 = SyncTask::Upload(SyncData::new( - 6, - TimelineUpload { - layers_to_upload: HashSet::from([PathBuf::from("two"), PathBuf::from("three")]), - uploaded_layers: HashSet::from([PathBuf::from("u_two")]), - metadata: Some(metadata_2.clone()), - }, - )); - - let merged_upload = match upload_1.merge(upload_2) { - SyncTask::Upload(merged_upload) => merged_upload, - wrong_merge_result => panic!("Unexpected merge result: {wrong_merge_result:?}"), - }; - - assert_eq!( - merged_upload.retries, 0, - "Merged task should have its retries counter reset" - ); - - let upload = merged_upload.data; - assert_eq!( - upload.layers_to_upload.into_iter().collect::>(), - BTreeSet::from([ - PathBuf::from("one"), - PathBuf::from("two"), - PathBuf::from("three") - ]), - "Merged upload tasks should a combined set of layers to upload" - ); - - assert_eq!( - upload.uploaded_layers.into_iter().collect::>(), - BTreeSet::from([PathBuf::from("u_one"), PathBuf::from("u_two"),]), - "Merged upload tasks should a combined set of uploaded layers" - ); - - assert_eq!( - upload.metadata, - Some(metadata_2), - "Merged upload tasks should have a metadata with biggest disk_consistent_lsn" - ); - } - - #[test] - fn upload_and_download_sync_tasks_merge() { - let download_data = SyncData::new( - 3, - TimelineDownload { - layers_to_skip: HashSet::from([PathBuf::from("d_one")]), - }, - ); - - let upload_data = SyncData::new( - 2, - TimelineUpload { - layers_to_upload: HashSet::from([PathBuf::from("u_one")]), - uploaded_layers: HashSet::from([PathBuf::from("u_one_2")]), - metadata: Some(dummy_metadata(Lsn(1))), - }, - ); - - let (merged_download, merged_upload) = match SyncTask::Download(download_data.clone()) - .merge(SyncTask::Upload(upload_data.clone())) - { - SyncTask::DownloadAndUpload(merged_download, merged_upload) => { - (merged_download, merged_upload) - } - wrong_merge_result => panic!("Unexpected merge result: {wrong_merge_result:?}"), - }; - - assert_eq!( - merged_download, download_data, - "When upload and dowload are merged, both should be unchanged" - ); - assert_eq!( - merged_upload, upload_data, - "When upload and dowload are merged, both should be unchanged" - ); - } - - #[test] - fn uploaddownload_and_upload_sync_tasks_merge() { - let download_data = SyncData::new( - 3, - TimelineDownload { - layers_to_skip: HashSet::from([PathBuf::from("d_one")]), - }, - ); - - let metadata_1 = dummy_metadata(Lsn(5)); - let metadata_2 = dummy_metadata(Lsn(2)); - assert!(metadata_1.disk_consistent_lsn() > metadata_2.disk_consistent_lsn()); - - let upload_download = SyncTask::DownloadAndUpload( - download_data.clone(), - SyncData::new( - 2, - TimelineUpload { - layers_to_upload: HashSet::from([PathBuf::from("one")]), - uploaded_layers: HashSet::from([PathBuf::from("u_one")]), - metadata: Some(metadata_1.clone()), - }, - ), - ); - - let new_upload = SyncTask::Upload(SyncData::new( - 6, - TimelineUpload { - layers_to_upload: HashSet::from([PathBuf::from("two"), PathBuf::from("three")]), - uploaded_layers: HashSet::from([PathBuf::from("u_two")]), - metadata: Some(metadata_2), - }, - )); - - let (merged_download, merged_upload) = match upload_download.merge(new_upload) { - SyncTask::DownloadAndUpload(merged_download, merged_upload) => { - (merged_download, merged_upload) - } - wrong_merge_result => panic!("Unexpected merge result: {wrong_merge_result:?}"), - }; - - assert_eq!( - merged_download, download_data, - "When uploaddowload and upload tasks are merged, download should be unchanged" - ); - - assert_eq!( - merged_upload.retries, 0, - "Merged task should have its retries counter reset" - ); - let upload = merged_upload.data; - assert_eq!( - upload.layers_to_upload.into_iter().collect::>(), - BTreeSet::from([ - PathBuf::from("one"), - PathBuf::from("two"), - PathBuf::from("three") - ]), - "Merged upload tasks should a combined set of layers to upload" - ); - - assert_eq!( - upload.uploaded_layers.into_iter().collect::>(), - BTreeSet::from([PathBuf::from("u_one"), PathBuf::from("u_two"),]), - "Merged upload tasks should a combined set of uploaded layers" - ); - - assert_eq!( - upload.metadata, - Some(metadata_1), - "Merged upload tasks should have a metadata with biggest disk_consistent_lsn" - ); - } - - #[test] - fn uploaddownload_and_download_sync_tasks_merge() { - let upload_data = SyncData::new( - 22, - TimelineUpload { - layers_to_upload: HashSet::from([PathBuf::from("one")]), - uploaded_layers: HashSet::from([PathBuf::from("u_one")]), - metadata: Some(dummy_metadata(Lsn(22))), - }, - ); - - let upload_download = SyncTask::DownloadAndUpload( - SyncData::new( - 2, - TimelineDownload { - layers_to_skip: HashSet::from([PathBuf::from("one")]), - }, - ), - upload_data.clone(), - ); - - let new_download = SyncTask::Download(SyncData::new( - 6, - TimelineDownload { - layers_to_skip: HashSet::from([PathBuf::from("two"), PathBuf::from("three")]), - }, - )); - - let (merged_download, merged_upload) = match upload_download.merge(new_download) { - SyncTask::DownloadAndUpload(merged_download, merged_upload) => { - (merged_download, merged_upload) - } - wrong_merge_result => panic!("Unexpected merge result: {wrong_merge_result:?}"), - }; - - assert_eq!( - merged_upload, upload_data, - "When uploaddowload and download tasks are merged, upload should be unchanged" - ); - - assert_eq!( - merged_download.retries, 0, - "Merged task should have its retries counter reset" - ); - assert_eq!( - merged_download - .data - .layers_to_skip - .into_iter() - .collect::>(), - BTreeSet::from([ - PathBuf::from("one"), - PathBuf::from("two"), - PathBuf::from("three") - ]), - "Merged download tasks should a combined set of layers to skip" - ); - } - - #[test] - fn uploaddownload_sync_tasks_merge() { - let metadata_1 = dummy_metadata(Lsn(1)); - let metadata_2 = dummy_metadata(Lsn(2)); - assert!(metadata_2.disk_consistent_lsn() > metadata_1.disk_consistent_lsn()); - - let upload_download = SyncTask::DownloadAndUpload( - SyncData::new( - 2, - TimelineDownload { - layers_to_skip: HashSet::from([PathBuf::from("one")]), - }, - ), - SyncData::new( - 2, - TimelineUpload { - layers_to_upload: HashSet::from([PathBuf::from("one")]), - uploaded_layers: HashSet::from([PathBuf::from("u_one")]), - metadata: Some(metadata_1), - }, - ), - ); - let new_upload_download = SyncTask::DownloadAndUpload( - SyncData::new( - 6, - TimelineDownload { - layers_to_skip: HashSet::from([PathBuf::from("two"), PathBuf::from("three")]), - }, - ), - SyncData::new( - 6, - TimelineUpload { - layers_to_upload: HashSet::from([PathBuf::from("two"), PathBuf::from("three")]), - uploaded_layers: HashSet::from([PathBuf::from("u_two")]), - metadata: Some(metadata_2.clone()), - }, - ), - ); - - let (merged_download, merged_upload) = match upload_download.merge(new_upload_download) { - SyncTask::DownloadAndUpload(merged_download, merged_upload) => { - (merged_download, merged_upload) - } - wrong_merge_result => panic!("Unexpected merge result: {wrong_merge_result:?}"), - }; - - assert_eq!( - merged_download.retries, 0, - "Merged task should have its retries counter reset" - ); - assert_eq!( - merged_download - .data - .layers_to_skip - .into_iter() - .collect::>(), - BTreeSet::from([ - PathBuf::from("one"), - PathBuf::from("two"), - PathBuf::from("three") - ]), - "Merged download tasks should a combined set of layers to skip" - ); - - assert_eq!( - merged_upload.retries, 0, - "Merged task should have its retries counter reset" - ); - let upload = merged_upload.data; - assert_eq!( - upload.layers_to_upload.into_iter().collect::>(), - BTreeSet::from([ - PathBuf::from("one"), - PathBuf::from("two"), - PathBuf::from("three") - ]), - "Merged upload tasks should a combined set of layers to upload" - ); - - assert_eq!( - upload.uploaded_layers.into_iter().collect::>(), - BTreeSet::from([PathBuf::from("u_one"), PathBuf::from("u_two"),]), - "Merged upload tasks should a combined set of uploaded layers" - ); - - assert_eq!( - upload.metadata, - Some(metadata_2), - "Merged upload tasks should have a metadata with biggest disk_consistent_lsn" - ); - } -} diff --git a/pageserver/src/storage_sync/index.rs b/pageserver/src/storage_sync/index.rs index d847e03a24..b52ce8c95f 100644 --- a/pageserver/src/storage_sync/index.rs +++ b/pageserver/src/storage_sync/index.rs @@ -8,7 +8,7 @@ use std::{ sync::Arc, }; -use anyhow::{Context, Ok}; +use anyhow::{anyhow, Context, Ok}; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use tokio::sync::RwLock; @@ -113,7 +113,7 @@ impl RemoteTimelineIndex { awaits_download: bool, ) -> anyhow::Result<()> { self.timeline_entry_mut(id) - .ok_or_else(|| anyhow::anyhow!("unknown timeline sync {}", id))? + .ok_or_else(|| anyhow!("unknown timeline sync {id}"))? .awaits_download = awaits_download; Ok(()) }