diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 284db005c8..4689cc2b83 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -105,6 +105,11 @@ fn main() -> Result<()> { fn init() -> Result<(String, clap::ArgMatches)> { init_tracing_and_logging(DEFAULT_LOG_LEVEL)?; + opentelemetry::global::set_error_handler(|err| { + tracing::info!("OpenTelemetry error: {err}"); + }) + .expect("global error handler lock poisoned"); + let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?; thread::spawn(move || { for sig in signals.forever() { diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index 9ffaaba584..b9e5387d86 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -184,6 +184,7 @@ pub struct CancelKeyData { impl fmt::Display for CancelKeyData { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + // TODO: this is producing strange results, with 0xffffffff........ always in the logs. let hi = (self.backend_pid as u64) << 32; let lo = self.cancel_key as u64; let id = hi | lo; diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs index f98d16789c..1c0d43d479 100644 --- a/libs/remote_storage/src/azure_blob.rs +++ b/libs/remote_storage/src/azure_blob.rs @@ -97,10 +97,7 @@ impl AzureBlobStorage { pub fn relative_path_to_name(&self, path: &RemotePath) -> String { assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR); - let path_string = path - .get_path() - .as_str() - .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR); + let path_string = path.get_path().as_str(); match &self.prefix_in_container { Some(prefix) => { if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { @@ -277,19 +274,14 @@ impl RemoteStorage for AzureBlobStorage { cancel: &CancellationToken, ) -> impl Stream> { // get the passed prefix or if it is not set use prefix_in_bucket value - let list_prefix = prefix - .map(|p| self.relative_path_to_name(p)) - .or_else(|| self.prefix_in_container.clone()) - .map(|mut p| { - // required to end with a separator - // otherwise request will return only the entry of a prefix - if matches!(mode, ListingMode::WithDelimiter) - && !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) - { - p.push(REMOTE_STORAGE_PREFIX_SEPARATOR); + let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| { + self.prefix_in_container.clone().map(|mut s| { + if !s.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { + s.push(REMOTE_STORAGE_PREFIX_SEPARATOR); } - p - }); + s + }) + }); async_stream::stream! { let _permit = self.permit(RequestKind::List, cancel).await?; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8e9e3890ba..2e5f69e3c9 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -189,6 +189,7 @@ pub struct TenantSharedResources { /// A [`Tenant`] is really an _attached_ tenant. The configuration /// for an attached tenant is a subset of the [`LocationConf`], represented /// in this struct. +#[derive(Clone)] pub(super) struct AttachedTenantConf { tenant_conf: TenantConfOpt, location: AttachedLocationConfig, @@ -1807,6 +1808,7 @@ impl Tenant { self.tenant_shard_id, timeline_id, self.generation, + &self.tenant_conf.load().location, ) } @@ -2527,6 +2529,10 @@ impl Tenant { { let conf = self.tenant_conf.load(); + // If we may not delete layers, then simply skip GC. Even though a tenant + // in AttachedMulti state could do GC and just enqueue the blocked deletions, + // the only advantage to doing it is to perhaps shrink the LayerMap metadata + // a bit sooner than we would achieve by waiting for AttachedSingle status. if !conf.location.may_delete_layers_hint() { info!("Skipping GC in location state {:?}", conf.location); return Ok(GcResult::default()); @@ -2568,7 +2574,14 @@ impl Tenant { { let conf = self.tenant_conf.load(); - if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() { + + // Note that compaction usually requires deletions, but we don't respect + // may_delete_layers_hint here: that is because tenants in AttachedMulti + // should proceed with compaction even if they can't do deletion, to avoid + // accumulating dangerously deep stacks of L0 layers. Deletions will be + // enqueued inside RemoteTimelineClient, and executed layer if/when we transition + // to AttachedSingle state. + if !conf.location.may_upload_layers_hint() { info!("Skipping compaction in location state {:?}", conf.location); return Ok(false); } @@ -3446,6 +3459,7 @@ impl Tenant { // this race is not possible if both request types come from the storage // controller (as they should!) because an exclusive op lock is required // on the storage controller side. + self.tenant_conf.rcu(|inner| { Arc::new(AttachedTenantConf { tenant_conf: new_tenant_conf.clone(), @@ -3455,20 +3469,22 @@ impl Tenant { }) }); + let updated = self.tenant_conf.load().clone(); + self.tenant_conf_updated(&new_tenant_conf); // Don't hold self.timelines.lock() during the notifies. // There's no risk of deadlock right now, but there could be if we consolidate // mutexes in struct Timeline in the future. let timelines = self.list_timelines(); for timeline in timelines { - timeline.tenant_conf_updated(&new_tenant_conf); + timeline.tenant_conf_updated(&updated); } } pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) { let new_tenant_conf = new_conf.tenant_conf.clone(); - self.tenant_conf.store(Arc::new(new_conf)); + self.tenant_conf.store(Arc::new(new_conf.clone())); self.tenant_conf_updated(&new_tenant_conf); // Don't hold self.timelines.lock() during the notifies. @@ -3476,7 +3492,7 @@ impl Tenant { // mutexes in struct Timeline in the future. let timelines = self.list_timelines(); for timeline in timelines { - timeline.tenant_conf_updated(&new_tenant_conf); + timeline.tenant_conf_updated(&new_conf); } } @@ -4544,6 +4560,7 @@ impl Tenant { self.tenant_shard_id, timeline_id, self.generation, + &self.tenant_conf.load().location, ) } diff --git a/pageserver/src/tenant/mgr.rs b/pageserver/src/tenant/mgr.rs index 4fc9d740c8..92b2200542 100644 --- a/pageserver/src/tenant/mgr.rs +++ b/pageserver/src/tenant/mgr.rs @@ -1719,10 +1719,11 @@ impl TenantManager { parent_layers.push(relative_path.to_owned()); } } - debug_assert!( - !parent_layers.is_empty(), - "shutdown cannot empty the layermap" - ); + + if parent_layers.is_empty() { + tracing::info!("Ancestor shard has no resident layer to hard link"); + } + (parent_timelines, parent_layers) }; diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 94f42c7827..377bc23542 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -197,6 +197,7 @@ use utils::backoff::{ self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS, }; use utils::pausable_failpoint; +use utils::shard::ShardNumber; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicU32, Ordering}; @@ -240,6 +241,7 @@ use utils::id::{TenantId, TimelineId}; use self::index::IndexPart; +use super::config::AttachedLocationConfig; use super::metadata::MetadataUpdate; use super::storage_layer::{Layer, LayerName, ResidentLayer}; use super::upload_queue::{NotInitialized, SetDeletedFlagProgress}; @@ -301,6 +303,36 @@ pub enum WaitCompletionError { #[derive(Debug, thiserror::Error)] #[error("Upload queue either in unexpected state or hasn't downloaded manifest yet")] pub struct UploadQueueNotReadyError; +/// Behavioral modes that enable seamless live migration. +/// +/// See docs/rfcs/028-pageserver-migration.md to understand how these fit in. +struct RemoteTimelineClientConfig { + /// If this is false, then update to remote_consistent_lsn are dropped rather + /// than being submitted to DeletionQueue for validation. This behavior is + /// used when a tenant attachment is known to have a stale generation number, + /// such that validation attempts will always fail. This is not necessary + /// for correctness, but avoids spamming error statistics with failed validations + /// when doing migrations of tenants. + process_remote_consistent_lsn_updates: bool, + + /// If this is true, then object deletions are held in a buffer in RemoteTimelineClient + /// rather than being submitted to the DeletionQueue. This behavior is used when a tenant + /// is known to be multi-attached, in order to avoid disrupting other attached tenants + /// whose generations' metadata refers to the deleted objects. + block_deletions: bool, +} + +/// RemoteTimelineClientConfig's state is entirely driven by LocationConf, but we do +/// not carry the entire LocationConf structure: it's much more than we need. The From +/// impl extracts the subset of the LocationConf that is interesting to RemoteTimelineClient. +impl From<&AttachedLocationConfig> for RemoteTimelineClientConfig { + fn from(lc: &AttachedLocationConfig) -> Self { + Self { + block_deletions: !lc.may_delete_layers_hint(), + process_remote_consistent_lsn_updates: lc.may_upload_layers_hint(), + } + } +} /// A client for accessing a timeline's data in remote storage. /// @@ -321,7 +353,7 @@ pub struct UploadQueueNotReadyError; /// in the index part file, whenever timeline metadata is uploaded. /// /// Downloads are not queued, they are performed immediately. -pub struct RemoteTimelineClient { +pub(crate) struct RemoteTimelineClient { conf: &'static PageServerConf, runtime: tokio::runtime::Handle, @@ -338,6 +370,9 @@ pub struct RemoteTimelineClient { deletion_queue_client: DeletionQueueClient, + /// Subset of tenant configuration used to control upload behaviors during migrations + config: std::sync::RwLock, + cancel: CancellationToken, } @@ -348,13 +383,14 @@ impl RemoteTimelineClient { /// Note: the caller must initialize the upload queue before any uploads can be scheduled, /// by calling init_upload_queue. /// - pub fn new( + pub(crate) fn new( remote_storage: GenericRemoteStorage, deletion_queue_client: DeletionQueueClient, conf: &'static PageServerConf, tenant_shard_id: TenantShardId, timeline_id: TimelineId, generation: Generation, + location_conf: &AttachedLocationConfig, ) -> RemoteTimelineClient { RemoteTimelineClient { conf, @@ -374,6 +410,7 @@ impl RemoteTimelineClient { &tenant_shard_id, &timeline_id, )), + config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)), cancel: CancellationToken::new(), } } @@ -429,6 +466,43 @@ impl RemoteTimelineClient { Ok(()) } + /// Notify this client of a change to its parent tenant's config, as this may cause us to + /// take action (unblocking deletions when transitioning from AttachedMulti to AttachedSingle) + pub(super) fn update_config(&self, location_conf: &AttachedLocationConfig) { + let new_conf = RemoteTimelineClientConfig::from(location_conf); + let unblocked = !new_conf.block_deletions; + + // Update config before draining deletions, so that we don't race with more being + // inserted. This can result in deletions happening our of order, but that does not + // violate any invariants: deletions only need to be ordered relative to upload of the index + // that dereferences the deleted objects, and we are not changing that order. + *self.config.write().unwrap() = new_conf; + + if unblocked { + // If we may now delete layers, drain any that were blocked in our old + // configuration state + let mut queue_locked = self.upload_queue.lock().unwrap(); + + if let Ok(queue) = queue_locked.initialized_mut() { + let blocked_deletions = std::mem::take(&mut queue.blocked_deletions); + for d in blocked_deletions { + if let Err(e) = self.deletion_queue_client.push_layers_sync( + self.tenant_shard_id, + self.timeline_id, + self.generation, + d.layers, + ) { + // This could happen if the pageserver is shut down while a tenant + // is transitioning from a deletion-blocked state: we will leak some + // S3 objects in this case. + warn!("Failed to drain blocked deletions: {}", e); + break; + } + } + } + } + } + /// Returns `None` if nothing is yet uplodaded, `Some(disk_consistent_lsn)` otherwise. pub fn remote_consistent_lsn_projected(&self) -> Option { match &mut *self.upload_queue.lock().unwrap() { @@ -1912,16 +1986,24 @@ impl RemoteTimelineClient { res } UploadOp::Delete(delete) => { - pausable_failpoint!("before-delete-layer-pausable"); - self.deletion_queue_client - .push_layers( - self.tenant_shard_id, - self.timeline_id, - self.generation, - delete.layers.clone(), - ) - .await - .map_err(|e| anyhow::anyhow!(e)) + if self.config.read().unwrap().block_deletions { + let mut queue_locked = self.upload_queue.lock().unwrap(); + if let Ok(queue) = queue_locked.initialized_mut() { + queue.blocked_deletions.push(delete.clone()); + } + Ok(()) + } else { + pausable_failpoint!("before-delete-layer-pausable"); + self.deletion_queue_client + .push_layers( + self.tenant_shard_id, + self.timeline_id, + self.generation, + delete.layers.clone(), + ) + .await + .map_err(|e| anyhow::anyhow!(e)) + } } unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => { // unreachable. Barrier operations are handled synchronously in @@ -2028,8 +2110,16 @@ impl RemoteTimelineClient { // Legacy mode: skip validating generation upload_queue.visible_remote_consistent_lsn.store(lsn); None - } else { + } else if self + .config + .read() + .unwrap() + .process_remote_consistent_lsn_updates + { Some((lsn, upload_queue.visible_remote_consistent_lsn.clone())) + } else { + // Our config disables remote_consistent_lsn updates: drop it. + None } } UploadOp::Delete(_) => { @@ -2166,6 +2256,7 @@ impl RemoteTimelineClient { queued_operations: VecDeque::default(), #[cfg(feature = "testing")] dangling_files: HashMap::default(), + blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), }; @@ -2231,6 +2322,28 @@ impl RemoteTimelineClient { UploadQueue::Initialized(x) => x.no_pending_work(), } } + + /// 'foreign' in the sense that it does not belong to this tenant shard. This method + /// is used during GC for other shards to get the index of shard zero. + pub(crate) async fn download_foreign_index( + &self, + shard_number: ShardNumber, + cancel: &CancellationToken, + ) -> Result<(IndexPart, Generation, std::time::SystemTime), DownloadError> { + let foreign_shard_id = TenantShardId { + shard_number, + shard_count: self.tenant_shard_id.shard_count, + tenant_id: self.tenant_shard_id.tenant_id, + }; + download_index_part( + &self.storage_impl, + &foreign_shard_id, + &self.timeline_id, + Generation::MAX, + cancel, + ) + .await + } } pub(crate) struct UploadQueueAccessor<'a> { @@ -2379,6 +2492,7 @@ mod tests { use crate::{ context::RequestContext, tenant::{ + config::AttachmentMode, harness::{TenantHarness, TIMELINE_ID}, storage_layer::layer::local_layer_path, Tenant, Timeline, @@ -2464,6 +2578,10 @@ mod tests { /// Construct a RemoteTimelineClient in an arbitrary generation fn build_client(&self, generation: Generation) -> Arc { + let location_conf = AttachedLocationConfig { + generation, + attach_mode: AttachmentMode::Single, + }; Arc::new(RemoteTimelineClient { conf: self.harness.conf, runtime: tokio::runtime::Handle::current(), @@ -2477,6 +2595,7 @@ mod tests { &self.harness.tenant_shard_id, &TIMELINE_ID, )), + config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)), cancel: CancellationToken::new(), }) } diff --git a/pageserver/src/tenant/secondary.rs b/pageserver/src/tenant/secondary.rs index 1331c07d05..3df89a928c 100644 --- a/pageserver/src/tenant/secondary.rs +++ b/pageserver/src/tenant/secondary.rs @@ -111,15 +111,6 @@ pub(crate) struct SecondaryTenant { pub(super) heatmap_total_size_metric: UIntGauge, } -impl Drop for SecondaryTenant { - fn drop(&mut self) { - let tenant_id = self.tenant_shard_id.tenant_id.to_string(); - let shard_id = format!("{}", self.tenant_shard_id.shard_slug()); - let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]); - let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]); - } -} - impl SecondaryTenant { pub(crate) fn new( tenant_shard_id: TenantShardId, @@ -167,6 +158,13 @@ impl SecondaryTenant { // Wait for any secondary downloader work to complete self.gate.close().await; + + self.validate_metrics(); + + let tenant_id = self.tenant_shard_id.tenant_id.to_string(); + let shard_id = format!("{}", self.tenant_shard_id.shard_slug()); + let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]); + let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]); } pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) { @@ -254,6 +252,20 @@ impl SecondaryTenant { .await .expect("secondary eviction should not have panicked"); } + + /// Exhaustive check that incrementally updated metrics match the actual state. + #[cfg(feature = "testing")] + fn validate_metrics(&self) { + let detail = self.detail.lock().unwrap(); + let resident_size = detail.total_resident_size(); + + assert_eq!(resident_size, self.resident_size_metric.get()); + } + + #[cfg(not(feature = "testing"))] + fn validate_metrics(&self) { + // No-op in non-testing builds + } } /// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads, diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 82c5702686..7443261a9c 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -242,6 +242,19 @@ impl SecondaryDetail { } } + #[cfg(feature = "testing")] + pub(crate) fn total_resident_size(&self) -> u64 { + self.timelines + .values() + .map(|tl| { + tl.on_disk_layers + .values() + .map(|v| v.metadata.file_size) + .sum::() + }) + .sum::() + } + pub(super) fn evict_layer( &mut self, name: LayerName, @@ -763,24 +776,7 @@ impl<'a> TenantDownloader<'a> { } // Metrics consistency check in testing builds - if cfg!(feature = "testing") { - let detail = self.secondary_state.detail.lock().unwrap(); - let resident_size = detail - .timelines - .values() - .map(|tl| { - tl.on_disk_layers - .values() - .map(|v| v.metadata.file_size) - .sum::() - }) - .sum::(); - assert_eq!( - resident_size, - self.secondary_state.resident_size_metric.get() - ); - } - + self.secondary_state.validate_metrics(); // Only update last_etag after a full successful download: this way will not skip // the next download, even if the heatmap's actual etag is unchanged. self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 0eb3de21e9..95864af4d0 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -38,6 +38,7 @@ use pageserver_api::{ shard::{ShardIdentity, ShardNumber, TenantShardId}, }; use rand::Rng; +use remote_storage::DownloadError; use serde_with::serde_as; use storage_broker::BrokerClientChannel; use tokio::{ @@ -272,7 +273,7 @@ pub struct Timeline { /// Remote storage client. /// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details. - pub remote_client: Arc, + pub(crate) remote_client: Arc, // What page versions do we hold in the repository? If we get a // request > last_record_lsn, we need to wait until we receive all @@ -2171,14 +2172,14 @@ impl Timeline { ) } - pub(super) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) { + pub(super) fn tenant_conf_updated(&self, new_conf: &AttachedTenantConf) { // NB: Most tenant conf options are read by background loops, so, // changes will automatically be picked up. // The threshold is embedded in the metric. So, we need to update it. { let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold( - new_conf, + &new_conf.tenant_conf, &self.conf.default_tenant_conf, ); @@ -2186,6 +2187,9 @@ impl Timeline { let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug()); let timeline_id_str = self.timeline_id.to_string(); + + self.remote_client.update_config(&new_conf.location); + self.metrics .evictions_with_low_residence_duration .write() @@ -4821,6 +4825,86 @@ impl Timeline { Ok(()) } + async fn find_gc_time_cutoff( + &self, + pitr: Duration, + cancel: &CancellationToken, + ctx: &RequestContext, + ) -> Result, PageReconstructError> { + debug_assert_current_span_has_tenant_and_timeline_id(); + if self.shard_identity.is_shard_zero() { + // Shard Zero has SLRU data and can calculate the PITR time -> LSN mapping itself + let now = SystemTime::now(); + let time_range = if pitr == Duration::ZERO { + humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid") + } else { + pitr + }; + + // If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case) + let time_cutoff = now.checked_sub(time_range).unwrap_or(now); + let timestamp = to_pg_timestamp(time_cutoff); + + let time_cutoff = match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? { + LsnForTimestamp::Present(lsn) => Some(lsn), + LsnForTimestamp::Future(lsn) => { + // The timestamp is in the future. That sounds impossible, + // but what it really means is that there hasn't been + // any commits since the cutoff timestamp. + // + // In this case we should use the LSN of the most recent commit, + // which is implicitly the last LSN in the log. + debug!("future({})", lsn); + Some(self.get_last_record_lsn()) + } + LsnForTimestamp::Past(lsn) => { + debug!("past({})", lsn); + None + } + LsnForTimestamp::NoData(lsn) => { + debug!("nodata({})", lsn); + None + } + }; + Ok(time_cutoff) + } else { + // Shards other than shard zero cannot do timestamp->lsn lookups, and must instead learn their GC cutoff + // from shard zero's index. The index doesn't explicitly tell us the time cutoff, but we may assume that + // the point up to which shard zero's last_gc_cutoff has advanced will either be the time cutoff, or a + // space cutoff that we would also have respected ourselves. + match self + .remote_client + .download_foreign_index(ShardNumber(0), cancel) + .await + { + Ok((index_part, index_generation, _index_mtime)) => { + tracing::info!("GC loaded shard zero metadata (gen {index_generation:?}): latest_gc_cutoff_lsn: {}", + index_part.metadata.latest_gc_cutoff_lsn()); + Ok(Some(index_part.metadata.latest_gc_cutoff_lsn())) + } + Err(DownloadError::NotFound) => { + // This is unexpected, because during timeline creations shard zero persists to remote + // storage before other shards are called, and during timeline deletion non-zeroth shards are + // deleted before the zeroth one. However, it should be harmless: if we somehow end up in this + // state, then shard zero should _eventually_ write an index when it GCs. + tracing::warn!("GC couldn't find shard zero's index for timeline"); + Ok(None) + } + Err(e) => { + // TODO: this function should return a different error type than page reconstruct error + Err(PageReconstructError::Other(anyhow::anyhow!(e))) + } + } + + // TODO: after reading shard zero's GC cutoff, we should validate its generation with the storage + // controller. Otherwise, it is possible that we see the GC cutoff go backwards while shard zero + // is going through a migration if we read the old location's index and it has GC'd ahead of the + // new location. This is legal in principle, but problematic in practice because it might result + // in a timeline creation succeeding on shard zero ('s new location) but then failing on other shards + // because they have GC'd past the branch point. + } + } + /// Find the Lsns above which layer files need to be retained on /// garbage collection. /// @@ -4863,40 +4947,7 @@ impl Timeline { // - if PITR interval is set, then this is our cutoff. // - if PITR interval is not set, then we do a lookup // based on DEFAULT_PITR_INTERVAL, so that size-based retention does not result in keeping history around permanently on idle databases. - let time_cutoff = { - let now = SystemTime::now(); - let time_range = if pitr == Duration::ZERO { - humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid") - } else { - pitr - }; - - // If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case) - let time_cutoff = now.checked_sub(time_range).unwrap_or(now); - let timestamp = to_pg_timestamp(time_cutoff); - - match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? { - LsnForTimestamp::Present(lsn) => Some(lsn), - LsnForTimestamp::Future(lsn) => { - // The timestamp is in the future. That sounds impossible, - // but what it really means is that there hasn't been - // any commits since the cutoff timestamp. - // - // In this case we should use the LSN of the most recent commit, - // which is implicitly the last LSN in the log. - debug!("future({})", lsn); - Some(self.get_last_record_lsn()) - } - LsnForTimestamp::Past(lsn) => { - debug!("past({})", lsn); - None - } - LsnForTimestamp::NoData(lsn) => { - debug!("nodata({})", lsn); - None - } - } - }; + let time_cutoff = self.find_gc_time_cutoff(pitr, cancel, ctx).await?; Ok(match (pitr, time_cutoff) { (Duration::ZERO, Some(time_cutoff)) => { diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 13a8dfa51a..67fc710c44 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -283,7 +283,7 @@ impl DeleteTimelineFlow { /// Shortcut to create Timeline in stopping state and spawn deletion task. #[instrument(skip_all, fields(%timeline_id))] - pub async fn resume_deletion( + pub(crate) async fn resume_deletion( tenant: Arc, timeline_id: TimelineId, local_metadata: &TimelineMetadata, diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index 592f41cb21..f14bf2f8c3 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -88,6 +88,9 @@ pub(crate) struct UploadQueueInitialized { #[cfg(feature = "testing")] pub(crate) dangling_files: HashMap, + /// Deletions that are blocked by the tenant configuration + pub(crate) blocked_deletions: Vec, + /// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`. pub(crate) shutting_down: bool, @@ -180,6 +183,7 @@ impl UploadQueue { queued_operations: VecDeque::new(), #[cfg(feature = "testing")] dangling_files: HashMap::new(), + blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), }; @@ -220,6 +224,7 @@ impl UploadQueue { queued_operations: VecDeque::new(), #[cfg(feature = "testing")] dangling_files: HashMap::new(), + blocked_deletions: Vec::new(), shutting_down: false, shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)), }; @@ -270,7 +275,7 @@ pub(crate) struct UploadTask { /// A deletion of some layers within the lifetime of a timeline. This is not used /// for timeline deletion, which skips this queue and goes directly to DeletionQueue. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct Delete { pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>, } diff --git a/proxy/src/bin/local_proxy.rs b/proxy/src/bin/local_proxy.rs index c4ec1300f2..968682cf0f 100644 --- a/proxy/src/bin/local_proxy.rs +++ b/proxy/src/bin/local_proxy.rs @@ -111,7 +111,7 @@ struct SqlOverHttpArgs { sql_over_http_cancel_set_shards: usize, #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB - sql_over_http_max_request_size_bytes: u64, + sql_over_http_max_request_size_bytes: usize, #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB sql_over_http_max_response_size_bytes: usize, diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 232721338d..45fbe4a398 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -276,7 +276,7 @@ struct SqlOverHttpArgs { sql_over_http_cancel_set_shards: usize, #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB - sql_over_http_max_request_size_bytes: u64, + sql_over_http_max_request_size_bytes: usize, #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB sql_over_http_max_response_size_bytes: usize, diff --git a/proxy/src/config.rs b/proxy/src/config.rs index b048c9d389..8bc8e3f96f 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -64,7 +64,7 @@ pub struct HttpConfig { pub pool_options: GlobalConnPoolOptions, pub cancel_set: CancelSet, pub client_conn_threshold: u64, - pub max_request_size_bytes: u64, + pub max_request_size_bytes: usize, pub max_response_size_bytes: usize, } diff --git a/proxy/src/control_plane/client/neon.rs b/proxy/src/control_plane/client/neon.rs index 53f9234926..757ea6720a 100644 --- a/proxy/src/control_plane/client/neon.rs +++ b/proxy/src/control_plane/client/neon.rs @@ -380,6 +380,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient { // after getting back a permit - it's possible the cache was filled // double check if permit.should_check_cache() { + // TODO: if there is something in the cache, mark the permit as success. check_cache!(); } diff --git a/proxy/src/http/mod.rs b/proxy/src/http/mod.rs index b1642cedb3..ed88c77256 100644 --- a/proxy/src/http/mod.rs +++ b/proxy/src/http/mod.rs @@ -122,18 +122,18 @@ impl Endpoint { } #[derive(Error, Debug)] -pub(crate) enum ReadBodyError { +pub(crate) enum ReadBodyError { #[error("Content length exceeds limit of {limit} bytes")] BodyTooLarge { limit: usize }, #[error(transparent)] - Read(#[from] reqwest::Error), + Read(#[from] E), } -pub(crate) async fn read_body_with_limit( - mut b: impl Body + Unpin, +pub(crate) async fn read_body_with_limit( + mut b: impl Body + Unpin, limit: usize, -) -> Result, ReadBodyError> { +) -> Result, ReadBodyError> { // We could use `b.limited().collect().await.to_bytes()` here // but this ends up being slightly more efficient as far as I can tell. diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index b30aec09c1..2e759b0894 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -117,7 +117,6 @@ where node_info.set_keys(user_info.get_keys()); node_info.allow_self_signed_compute = allow_self_signed_compute; mechanism.update_connect_config(&mut node_info.config); - let retry_type = RetryType::ConnectToCompute; // try once let err = match mechanism @@ -129,7 +128,7 @@ where Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Success, - retry_type, + retry_type: RetryType::ConnectToCompute, }, num_retries.into(), ); @@ -147,7 +146,7 @@ where Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Failed, - retry_type, + retry_type: RetryType::ConnectToCompute, }, num_retries.into(), ); @@ -156,8 +155,9 @@ where node_info } else { // if we failed to connect, it's likely that the compute node was suspended, wake a new compute node - info!("compute node's state has likely changed; requesting a wake-up"); + debug!("compute node's state has likely changed; requesting a wake-up"); let old_node_info = invalidate_cache(node_info); + // TODO: increment num_retries? let mut node_info = wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?; node_info.reuse_settings(old_node_info); @@ -169,7 +169,7 @@ where // now that we have a new node, try connect to it repeatedly. // this can error for a few reasons, for instance: // * DNS connection settings haven't quite propagated yet - info!("wake_compute success. attempting to connect"); + debug!("wake_compute success. attempting to connect"); num_retries = 1; loop { match mechanism @@ -181,10 +181,11 @@ where Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Success, - retry_type, + retry_type: RetryType::ConnectToCompute, }, num_retries.into(), ); + // TODO: is this necessary? We have a metric. info!(?num_retries, "connected to compute node after"); return Ok(res); } @@ -194,7 +195,7 @@ where Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Failed, - retry_type, + retry_type: RetryType::ConnectToCompute, }, num_retries.into(), ); diff --git a/proxy/src/proxy/copy_bidirectional.rs b/proxy/src/proxy/copy_bidirectional.rs index 91a3ceff75..4e4af88634 100644 --- a/proxy/src/proxy/copy_bidirectional.rs +++ b/proxy/src/proxy/copy_bidirectional.rs @@ -87,6 +87,8 @@ where transfer_one_direction(cx, &mut compute_to_client, compute, client) .map_err(ErrorSource::from_compute)?; + // TODO: 1 info log, with a enum label for close direction. + // Early termination checks from compute to client. if let TransferState::Done(_) = compute_to_client { if let TransferState::Running(buf) = &client_to_compute { diff --git a/proxy/src/proxy/handshake.rs b/proxy/src/proxy/handshake.rs index 3ada3a9995..e27c211932 100644 --- a/proxy/src/proxy/handshake.rs +++ b/proxy/src/proxy/handshake.rs @@ -5,7 +5,7 @@ use pq_proto::{ }; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::auth::endpoint_sni; use crate::config::{TlsConfig, PG_ALPN_PROTOCOL}; @@ -199,6 +199,8 @@ pub(crate) async fn handshake( .await?; } + // This log highlights the start of the connection. + // This contains useful information for debugging, not logged elsewhere, like role name and endpoint id. info!( ?version, ?params, @@ -211,7 +213,7 @@ pub(crate) async fn handshake( FeStartupPacket::StartupMessage { params, version } if version.major() == 3 && version > PG_PROTOCOL_LATEST => { - warn!(?version, "unsupported minor version"); + debug!(?version, "unsupported minor version"); // no protocol extensions are supported. // @@ -233,14 +235,16 @@ pub(crate) async fn handshake( info!( ?version, + ?params, session_type = "normal", "successful handshake; unsupported minor version requested" ); break Ok(HandshakeData::Startup(stream, params)); } - FeStartupPacket::StartupMessage { version, .. } => { + FeStartupPacket::StartupMessage { version, params } => { warn!( ?version, + ?params, session_type = "normal", "unsuccessful handshake; unsupported version" ); diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 4be4006d15..9415b54a4a 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -254,7 +254,7 @@ pub(crate) async fn handle_client( endpoint_rate_limiter: Arc, conn_gauge: NumClientConnectionsGuard<'static>, ) -> Result>, ClientRequestError> { - info!( + debug!( protocol = %ctx.protocol(), "handling interactive connection from client" ); diff --git a/proxy/src/proxy/passthrough.rs b/proxy/src/proxy/passthrough.rs index e3b4730982..5e07c8eeae 100644 --- a/proxy/src/proxy/passthrough.rs +++ b/proxy/src/proxy/passthrough.rs @@ -1,5 +1,5 @@ use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::info; +use tracing::debug; use utils::measured_stream::MeasuredStream; use super::copy_bidirectional::ErrorSource; @@ -45,7 +45,7 @@ pub(crate) async fn proxy_pass( ); // Starting from here we only proxy the client's traffic. - info!("performing the proxy pass..."); + debug!("performing the proxy pass..."); let _ = crate::proxy::copy_bidirectional::copy_bidirectional_client_compute( &mut client, &mut compute, diff --git a/proxy/src/proxy/wake_compute.rs b/proxy/src/proxy/wake_compute.rs index d09e0b1f41..8a672d48dc 100644 --- a/proxy/src/proxy/wake_compute.rs +++ b/proxy/src/proxy/wake_compute.rs @@ -17,7 +17,6 @@ pub(crate) async fn wake_compute( api: &B, config: RetryConfig, ) -> Result { - let retry_type = RetryType::WakeCompute; loop { match api.wake_compute(ctx).await { Err(e) if !should_retry(&e, *num_retries, config) => { @@ -26,7 +25,7 @@ pub(crate) async fn wake_compute( Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Failed, - retry_type, + retry_type: RetryType::WakeCompute, }, (*num_retries).into(), ); @@ -40,10 +39,12 @@ pub(crate) async fn wake_compute( Metrics::get().proxy.retries_metric.observe( RetriesMetricGroup { outcome: ConnectOutcome::Success, - retry_type, + retry_type: RetryType::WakeCompute, }, (*num_retries).into(), ); + // TODO: is this necessary? We have a metric. + // TODO: this log line is misleading as "wake_compute" might return cached (and stale) info. info!(?num_retries, "compute node woken up after"); return Ok(n); } diff --git a/proxy/src/rate_limiter/limit_algorithm.rs b/proxy/src/rate_limiter/limit_algorithm.rs index 16c398f303..b74a9ab17e 100644 --- a/proxy/src/rate_limiter/limit_algorithm.rs +++ b/proxy/src/rate_limiter/limit_algorithm.rs @@ -195,7 +195,11 @@ impl DynamicLimiter { /// /// Set the outcome to `None` to ignore the job. fn release_inner(&self, start: Instant, outcome: Option) { - tracing::info!("outcome is {:?}", outcome); + if outcome.is_none() { + tracing::warn!("outcome is {:?}", outcome); + } else { + tracing::debug!("outcome is {:?}", outcome); + } if self.config.initial_limit == 0 { return; } diff --git a/proxy/src/rate_limiter/limit_algorithm/aimd.rs b/proxy/src/rate_limiter/limit_algorithm/aimd.rs index 5332a5184f..3000cc4c2a 100644 --- a/proxy/src/rate_limiter/limit_algorithm/aimd.rs +++ b/proxy/src/rate_limiter/limit_algorithm/aimd.rs @@ -31,26 +31,32 @@ impl LimitAlgorithm for Aimd { if utilisation > self.utilisation { let limit = old_limit + self.inc; - let increased_limit = limit.clamp(self.min, self.max); - if increased_limit > old_limit { - tracing::info!(increased_limit, "limit increased"); + let new_limit = limit.clamp(self.min, self.max); + if new_limit > old_limit { + tracing::info!(old_limit, new_limit, "limit increased"); + } else { + tracing::debug!(old_limit, new_limit, "limit clamped at max"); } - increased_limit + new_limit } else { old_limit } } Outcome::Overload => { - let limit = old_limit as f32 * self.dec; + let new_limit = old_limit as f32 * self.dec; // Floor instead of round, so the limit reduces even with small numbers. // E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1 - let limit = limit.floor() as usize; + let new_limit = new_limit.floor() as usize; - let limit = limit.clamp(self.min, self.max); - tracing::info!(limit, "limit decreased"); - limit + let new_limit = new_limit.clamp(self.min, self.max); + if new_limit < old_limit { + tracing::info!(old_limit, new_limit, "limit decreased"); + } else { + tracing::debug!(old_limit, new_limit, "limit clamped at min"); + } + new_limit } } } diff --git a/proxy/src/redis/cancellation_publisher.rs b/proxy/src/redis/cancellation_publisher.rs index 0000246971..7392b0d316 100644 --- a/proxy/src/redis/cancellation_publisher.rs +++ b/proxy/src/redis/cancellation_publisher.rs @@ -121,6 +121,7 @@ impl RedisPublisherClient { cancel_key_data: CancelKeyData, session_id: Uuid, ) -> anyhow::Result<()> { + // TODO: review redundant error duplication logs. if !self.limiter.check() { tracing::info!("Rate limit exceeded. Skipping cancellation message"); return Err(anyhow::anyhow!("Rate limit exceeded")); @@ -146,7 +147,7 @@ impl CancellationPublisherMut for RedisPublisherClient { tracing::info!("publishing cancellation key to Redis"); match self.try_publish_internal(cancel_key_data, session_id).await { Ok(()) => { - tracing::info!("cancellation key successfuly published to Redis"); + tracing::debug!("cancellation key successfuly published to Redis"); Ok(()) } Err(e) => { diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index d9dcf6fbb7..3037e20888 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -12,8 +12,8 @@ use tracing::field::display; use tracing::{debug, info}; use super::conn_pool::poll_client; -use super::conn_pool_lib::{Client, ConnInfo, GlobalConnPool}; -use super::http_conn_pool::{self, poll_http2_client, Send}; +use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool}; +use super::http_conn_pool::{self, poll_http2_client, HttpConnPool, Send}; use super::local_conn_pool::{self, LocalConnPool, EXT_NAME, EXT_SCHEMA, EXT_VERSION}; use crate::auth::backend::local::StaticAuthRules; use crate::auth::backend::{ComputeCredentials, ComputeUserInfo}; @@ -36,9 +36,10 @@ use crate::rate_limiter::EndpointRateLimiter; use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX}; pub(crate) struct PoolingBackend { - pub(crate) http_conn_pool: Arc>, + pub(crate) http_conn_pool: Arc>>, pub(crate) local_pool: Arc>, - pub(crate) pool: Arc>, + pub(crate) pool: + Arc>>, pub(crate) config: &'static ProxyConfig, pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>, @@ -167,10 +168,10 @@ impl PoolingBackend { force_new: bool, ) -> Result, HttpConnError> { let maybe_client = if force_new { - info!("pool: pool is disabled"); + debug!("pool: pool is disabled"); None } else { - info!("pool: looking for an existing connection"); + debug!("pool: looking for an existing connection"); self.pool.get(ctx, &conn_info)? }; @@ -204,14 +205,14 @@ impl PoolingBackend { ctx: &RequestContext, conn_info: ConnInfo, ) -> Result, HttpConnError> { - info!("pool: looking for an existing connection"); + debug!("pool: looking for an existing connection"); if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) { return Ok(client); } let conn_id = uuid::Uuid::new_v4(); tracing::Span::current().record("conn_id", display(conn_id)); - info!(%conn_id, "pool: opening a new connection '{conn_info}'"); + debug!(%conn_id, "pool: opening a new connection '{conn_info}'"); let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials { info: ComputeUserInfo { user: conn_info.user_info.user.clone(), @@ -474,7 +475,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError { } struct TokioMechanism { - pool: Arc>, + pool: Arc>>, conn_info: ConnInfo, conn_id: uuid::Uuid, @@ -524,7 +525,7 @@ impl ConnectMechanism for TokioMechanism { } struct HyperMechanism { - pool: Arc>, + pool: Arc>>, conn_info: ConnInfo, conn_id: uuid::Uuid, diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 07ba1ae9af..bd262f45ed 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -19,7 +19,8 @@ use { }; use super::conn_pool_lib::{ - Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, GlobalConnPool, + Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, EndpointConnPool, + GlobalConnPool, }; use crate::context::RequestContext; use crate::control_plane::messages::MetricsAuxInfo; @@ -52,7 +53,7 @@ impl fmt::Display for ConnInfo { } pub(crate) fn poll_client( - global_pool: Arc>, + global_pool: Arc>>, ctx: &RequestContext, conn_info: ConnInfo, client: C, @@ -167,6 +168,7 @@ pub(crate) fn poll_client( Client::new(inner, conn_info, pool_clone) } +#[derive(Clone)] pub(crate) struct ClientDataRemote { session: tokio::sync::watch::Sender, cancel: CancellationToken, @@ -243,7 +245,7 @@ mod tests { }, cancel_set: CancelSet::new(0), client_conn_threshold: u64::MAX, - max_request_size_bytes: u64::MAX, + max_request_size_bytes: usize::MAX, max_response_size_bytes: usize::MAX, })); let pool = GlobalConnPool::new(config); diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index fe3c422c3b..fe1d2563bc 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::marker::PhantomData; use std::ops::Deref; use std::sync::atomic::{self, AtomicUsize}; use std::sync::{Arc, Weak}; @@ -43,13 +44,14 @@ impl ConnInfo { } } +#[derive(Clone)] pub(crate) enum ClientDataEnum { Remote(ClientDataRemote), Local(ClientDataLocal), - #[allow(dead_code)] Http(ClientDataHttp), } +#[derive(Clone)] pub(crate) struct ClientInnerCommon { pub(crate) inner: C, pub(crate) aux: MetricsAuxInfo, @@ -91,6 +93,7 @@ pub(crate) struct ConnPoolEntry { pub(crate) struct EndpointConnPool { pools: HashMap<(DbName, RoleName), DbUserConnPool>, total_conns: usize, + /// max # connections per endpoint max_conns: usize, _guard: HttpEndpointPoolsGuard<'static>, global_connections_count: Arc, @@ -232,7 +235,7 @@ impl EndpointConnPool { // do logging outside of the mutex if returned { - info!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}"); + debug!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}"); } else { info!(%conn_id, "{pool_name}: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}"); } @@ -317,24 +320,49 @@ impl DbUserConn for DbUserConnPool { } } -pub(crate) struct GlobalConnPool { +pub(crate) trait EndpointConnPoolExt { + fn clear_closed(&mut self) -> usize; + fn total_conns(&self) -> usize; +} + +impl EndpointConnPoolExt for EndpointConnPool { + fn clear_closed(&mut self) -> usize { + let mut clients_removed: usize = 0; + for db_pool in self.pools.values_mut() { + clients_removed += db_pool.clear_closed_clients(&mut self.total_conns); + } + clients_removed + } + + fn total_conns(&self) -> usize { + self.total_conns + } +} + +pub(crate) struct GlobalConnPool +where + C: ClientInnerExt, + P: EndpointConnPoolExt, +{ // endpoint -> per-endpoint connection pool // // That should be a fairly conteded map, so return reference to the per-endpoint // pool as early as possible and release the lock. - global_pool: DashMap>>>, + pub(crate) global_pool: DashMap>>, /// Number of endpoint-connection pools /// /// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each. /// That seems like far too much effort, so we're using a relaxed increment counter instead. /// It's only used for diagnostics. - global_pool_size: AtomicUsize, + pub(crate) global_pool_size: AtomicUsize, /// Total number of connections in the pool - global_connections_count: Arc, + pub(crate) global_connections_count: Arc, - config: &'static crate::config::HttpConfig, + pub(crate) config: &'static crate::config::HttpConfig, + + _marker: PhantomData, } #[derive(Debug, Clone, Copy)] @@ -357,7 +385,11 @@ pub struct GlobalConnPoolOptions { pub max_total_conns: usize, } -impl GlobalConnPool { +impl GlobalConnPool +where + C: ClientInnerExt, + P: EndpointConnPoolExt, +{ pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc { let shards = config.pool_options.pool_shards; Arc::new(Self { @@ -365,6 +397,7 @@ impl GlobalConnPool { global_pool_size: AtomicUsize::new(0), config, global_connections_count: Arc::new(AtomicUsize::new(0)), + _marker: PhantomData, }) } @@ -378,60 +411,6 @@ impl GlobalConnPool { self.config.pool_options.idle_timeout } - pub(crate) fn get( - self: &Arc, - ctx: &RequestContext, - conn_info: &ConnInfo, - ) -> Result>, HttpConnError> { - let mut client: Option> = None; - let Some(endpoint) = conn_info.endpoint_cache_key() else { - return Ok(None); - }; - - let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint); - if let Some(entry) = endpoint_pool - .write() - .get_conn_entry(conn_info.db_and_user()) - { - client = Some(entry.conn); - } - let endpoint_pool = Arc::downgrade(&endpoint_pool); - - // ok return cached connection if found and establish a new one otherwise - if let Some(mut client) = client { - if client.inner.is_closed() { - info!("pool: cached connection '{conn_info}' is closed, opening a new one"); - return Ok(None); - } - tracing::Span::current() - .record("conn_id", tracing::field::display(client.get_conn_id())); - tracing::Span::current().record( - "pid", - tracing::field::display(client.inner.get_process_id()), - ); - info!( - cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), - "pool: reusing connection '{conn_info}'" - ); - - match client.get_data() { - ClientDataEnum::Local(data) => { - data.session().send(ctx.session_id())?; - } - - ClientDataEnum::Remote(data) => { - data.session().send(ctx.session_id())?; - } - ClientDataEnum::Http(_) => (), - } - - ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); - ctx.success(); - return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool))); - } - Ok(None) - } - pub(crate) fn shutdown(&self) { // drops all strong references to endpoint-pools self.global_pool.clear(); @@ -464,17 +443,10 @@ impl GlobalConnPool { // if the current endpoint pool is unique (no other strong or weak references) // then it is currently not in use by any connections. if let Some(pool) = Arc::get_mut(x.get_mut()) { - let EndpointConnPool { - pools, total_conns, .. - } = pool.get_mut(); + let endpoints = pool.get_mut(); + clients_removed = endpoints.clear_closed(); - // ensure that closed clients are removed - for db_pool in pools.values_mut() { - clients_removed += db_pool.clear_closed_clients(total_conns); - } - - // we only remove this pool if it has no active connections - if *total_conns == 0 { + if endpoints.total_conns() == 0 { info!("pool: discarding pool for endpoint {endpoint}"); return false; } @@ -510,6 +482,62 @@ impl GlobalConnPool { info!("pool: performed global pool gc. size now {global_pool_size}"); } } +} + +impl GlobalConnPool> { + pub(crate) fn get( + self: &Arc, + ctx: &RequestContext, + conn_info: &ConnInfo, + ) -> Result>, HttpConnError> { + let mut client: Option> = None; + let Some(endpoint) = conn_info.endpoint_cache_key() else { + return Ok(None); + }; + + let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint); + if let Some(entry) = endpoint_pool + .write() + .get_conn_entry(conn_info.db_and_user()) + { + client = Some(entry.conn); + } + let endpoint_pool = Arc::downgrade(&endpoint_pool); + + // ok return cached connection if found and establish a new one otherwise + if let Some(mut client) = client { + if client.inner.is_closed() { + info!("pool: cached connection '{conn_info}' is closed, opening a new one"); + return Ok(None); + } + tracing::Span::current() + .record("conn_id", tracing::field::display(client.get_conn_id())); + tracing::Span::current().record( + "pid", + tracing::field::display(client.inner.get_process_id()), + ); + debug!( + cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), + "pool: reusing connection '{conn_info}'" + ); + + match client.get_data() { + ClientDataEnum::Local(data) => { + data.session().send(ctx.session_id())?; + } + + ClientDataEnum::Remote(data) => { + data.session().send(ctx.session_id())?; + } + ClientDataEnum::Http(_) => (), + } + + ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); + ctx.success(); + return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool))); + } + Ok(None) + } pub(crate) fn get_or_create_endpoint_pool( self: &Arc, @@ -556,7 +584,6 @@ impl GlobalConnPool { pool } } - pub(crate) struct Client { span: Span, inner: Option>, diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index bc86c4b1cd..fde38d0de3 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -2,16 +2,17 @@ use std::collections::VecDeque; use std::sync::atomic::{self, AtomicUsize}; use std::sync::{Arc, Weak}; -use dashmap::DashMap; use hyper::client::conn::http2; use hyper_util::rt::{TokioExecutor, TokioIo}; use parking_lot::RwLock; -use rand::Rng; use tokio::net::TcpStream; use tracing::{debug, error, info, info_span, Instrument}; use super::backend::HttpConnError; -use super::conn_pool_lib::{ClientInnerExt, ConnInfo}; +use super::conn_pool_lib::{ + ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry, + EndpointConnPoolExt, GlobalConnPool, +}; use crate::context::RequestContext; use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo}; use crate::metrics::{HttpEndpointPoolsGuard, Metrics}; @@ -23,17 +24,11 @@ pub(crate) type Connect = http2::Connection, hyper::body::Incoming, TokioExecutor>; #[derive(Clone)] -pub(crate) struct ConnPoolEntry { - conn: C, - conn_id: uuid::Uuid, - aux: MetricsAuxInfo, -} - pub(crate) struct ClientDataHttp(); // Per-endpoint connection pool // Number of open connections is limited by the `max_conns_per_endpoint`. -pub(crate) struct EndpointConnPool { +pub(crate) struct HttpConnPool { // TODO(conrad): // either we should open more connections depending on stream count // (not exposed by hyper, need our own counter) @@ -48,14 +43,19 @@ pub(crate) struct EndpointConnPool { global_connections_count: Arc, } -impl EndpointConnPool { +impl HttpConnPool { fn get_conn_entry(&mut self) -> Option> { let Self { conns, .. } = self; loop { let conn = conns.pop_front()?; - if !conn.conn.is_closed() { - conns.push_back(conn.clone()); + if !conn.conn.inner.is_closed() { + let new_conn = ConnPoolEntry { + conn: conn.conn.clone(), + _last_access: std::time::Instant::now(), + }; + + conns.push_back(new_conn); return Some(conn); } } @@ -69,7 +69,7 @@ impl EndpointConnPool { } = self; let old_len = conns.len(); - conns.retain(|conn| conn.conn_id != conn_id); + conns.retain(|entry| entry.conn.conn_id != conn_id); let new_len = conns.len(); let removed = old_len - new_len; if removed > 0 { @@ -84,7 +84,22 @@ impl EndpointConnPool { } } -impl Drop for EndpointConnPool { +impl EndpointConnPoolExt for HttpConnPool { + fn clear_closed(&mut self) -> usize { + let Self { conns, .. } = self; + let old_len = conns.len(); + conns.retain(|entry| !entry.conn.inner.is_closed()); + + let new_len = conns.len(); + old_len - new_len + } + + fn total_conns(&self) -> usize { + self.conns.len() + } +} + +impl Drop for HttpConnPool { fn drop(&mut self) { if !self.conns.is_empty() { self.global_connections_count @@ -98,117 +113,7 @@ impl Drop for EndpointConnPool { } } -pub(crate) struct GlobalConnPool { - // endpoint -> per-endpoint connection pool - // - // That should be a fairly conteded map, so return reference to the per-endpoint - // pool as early as possible and release the lock. - global_pool: DashMap>>>, - - /// Number of endpoint-connection pools - /// - /// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each. - /// That seems like far too much effort, so we're using a relaxed increment counter instead. - /// It's only used for diagnostics. - global_pool_size: AtomicUsize, - - /// Total number of connections in the pool - global_connections_count: Arc, - - config: &'static crate::config::HttpConfig, -} - -impl GlobalConnPool { - pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc { - let shards = config.pool_options.pool_shards; - Arc::new(Self { - global_pool: DashMap::with_shard_amount(shards), - global_pool_size: AtomicUsize::new(0), - config, - global_connections_count: Arc::new(AtomicUsize::new(0)), - }) - } - - pub(crate) fn shutdown(&self) { - // drops all strong references to endpoint-pools - self.global_pool.clear(); - } - - pub(crate) async fn gc_worker(&self, mut rng: impl Rng) { - let epoch = self.config.pool_options.gc_epoch; - let mut interval = tokio::time::interval(epoch / (self.global_pool.shards().len()) as u32); - loop { - interval.tick().await; - - let shard = rng.gen_range(0..self.global_pool.shards().len()); - self.gc(shard); - } - } - - fn gc(&self, shard: usize) { - debug!(shard, "pool: performing epoch reclamation"); - - // acquire a random shard lock - let mut shard = self.global_pool.shards()[shard].write(); - - let timer = Metrics::get() - .proxy - .http_pool_reclaimation_lag_seconds - .start_timer(); - let current_len = shard.len(); - let mut clients_removed = 0; - shard.retain(|endpoint, x| { - // if the current endpoint pool is unique (no other strong or weak references) - // then it is currently not in use by any connections. - if let Some(pool) = Arc::get_mut(x.get_mut()) { - let EndpointConnPool { conns, .. } = pool.get_mut(); - - let old_len = conns.len(); - - conns.retain(|conn| !conn.conn.is_closed()); - - let new_len = conns.len(); - let removed = old_len - new_len; - clients_removed += removed; - - // we only remove this pool if it has no active connections - if conns.is_empty() { - info!("pool: discarding pool for endpoint {endpoint}"); - return false; - } - } - - true - }); - - let new_len = shard.len(); - drop(shard); - timer.observe(); - - // Do logging outside of the lock. - if clients_removed > 0 { - let size = self - .global_connections_count - .fetch_sub(clients_removed, atomic::Ordering::Relaxed) - - clients_removed; - Metrics::get() - .proxy - .http_pool_opened_connections - .get_metric() - .dec_by(clients_removed as i64); - info!("pool: performed global pool gc. removed {clients_removed} clients, total number of clients in pool is {size}"); - } - let removed = current_len - new_len; - - if removed > 0 { - let global_pool_size = self - .global_pool_size - .fetch_sub(removed, atomic::Ordering::Relaxed) - - removed; - info!("pool: performed global pool gc. size now {global_pool_size}"); - } - } - +impl GlobalConnPool> { #[expect(unused_results)] pub(crate) fn get( self: &Arc, @@ -226,27 +131,28 @@ impl GlobalConnPool { return result; }; - tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id)); - info!( + tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id)); + debug!( cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), "pool: reusing connection '{conn_info}'" ); ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit); ctx.success(); - Ok(Some(Client::new(client.conn, client.aux))) + + Ok(Some(Client::new(client.conn.clone()))) } fn get_or_create_endpoint_pool( self: &Arc, endpoint: &EndpointCacheKey, - ) -> Arc>> { + ) -> Arc>> { // fast path if let Some(pool) = self.global_pool.get(endpoint) { return pool.clone(); } // slow path - let new_pool = Arc::new(RwLock::new(EndpointConnPool { + let new_pool = Arc::new(RwLock::new(HttpConnPool { conns: VecDeque::new(), _guard: Metrics::get().proxy.http_endpoint_pools.guard(), global_connections_count: self.global_connections_count.clone(), @@ -279,7 +185,7 @@ impl GlobalConnPool { } pub(crate) fn poll_http2_client( - global_pool: Arc>, + global_pool: Arc>>, ctx: &RequestContext, conn_info: &ConnInfo, client: Send, @@ -299,11 +205,15 @@ pub(crate) fn poll_http2_client( let pool = match conn_info.endpoint_cache_key() { Some(endpoint) => { let pool = global_pool.get_or_create_endpoint_pool(&endpoint); - - pool.write().conns.push_back(ConnPoolEntry { - conn: client.clone(), - conn_id, + let client = ClientInnerCommon { + inner: client.clone(), aux: aux.clone(), + conn_id, + data: ClientDataEnum::Http(ClientDataHttp()), + }; + pool.write().conns.push_back(ConnPoolEntry { + conn: client, + _last_access: std::time::Instant::now(), }); Metrics::get() .proxy @@ -335,23 +245,30 @@ pub(crate) fn poll_http2_client( .instrument(span), ); - Client::new(client, aux) + let client = ClientInnerCommon { + inner: client, + aux, + conn_id, + data: ClientDataEnum::Http(ClientDataHttp()), + }; + + Client::new(client) } pub(crate) struct Client { - pub(crate) inner: C, - aux: MetricsAuxInfo, + pub(crate) inner: ClientInnerCommon, } impl Client { - pub(self) fn new(inner: C, aux: MetricsAuxInfo) -> Self { - Self { inner, aux } + pub(self) fn new(inner: ClientInnerCommon) -> Self { + Self { inner } } pub(crate) fn metrics(&self) -> Arc { + let aux = &self.inner.aux; USAGE_METRICS.register(Ids { - endpoint_id: self.aux.endpoint_id, - branch_id: self.aux.branch_id, + endpoint_id: aux.endpoint_id, + branch_id: aux.branch_id, }) } } diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index cadcbd7530..9abe35db08 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -29,7 +29,7 @@ use tokio_postgres::tls::NoTlsStream; use tokio_postgres::types::ToSql; use tokio_postgres::{AsyncMessage, Socket}; use tokio_util::sync::CancellationToken; -use tracing::{error, info, info_span, warn, Instrument}; +use tracing::{debug, error, info, info_span, warn, Instrument}; use super::backend::HttpConnError; use super::conn_pool_lib::{ @@ -44,6 +44,7 @@ pub(crate) const EXT_NAME: &str = "pg_session_jwt"; pub(crate) const EXT_VERSION: &str = "0.1.2"; pub(crate) const EXT_SCHEMA: &str = "auth"; +#[derive(Clone)] pub(crate) struct ClientDataLocal { session: tokio::sync::watch::Sender, cancel: CancellationToken, @@ -110,7 +111,7 @@ impl LocalConnPool { "pid", tracing::field::display(client.inner.get_process_id()), ); - info!( + debug!( cold_start_info = ColdStartInfo::HttpPoolHit.as_str(), "local_pool: reusing connection '{conn_info}'" ); diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index 59247f03bf..77025f419d 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -88,7 +88,7 @@ pub async fn task_main( } }); - let http_conn_pool = http_conn_pool::GlobalConnPool::new(&config.http_config); + let http_conn_pool = conn_pool_lib::GlobalConnPool::new(&config.http_config); { let http_conn_pool = Arc::clone(&http_conn_pool); tokio::spawn(async move { diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 36d8595902..03b37bccd5 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -8,7 +8,7 @@ use http::header::AUTHORIZATION; use http::Method; use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full}; -use hyper::body::{Body, Incoming}; +use hyper::body::Incoming; use hyper::http::{HeaderName, HeaderValue}; use hyper::{header, HeaderMap, Request, Response, StatusCode}; use pq_proto::StartupMessageParamsBuilder; @@ -18,7 +18,7 @@ use tokio::time; use tokio_postgres::error::{DbError, ErrorPosition, SqlState}; use tokio_postgres::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction}; use tokio_util::sync::CancellationToken; -use tracing::{error, info}; +use tracing::{debug, error, info}; use typed_json::json; use url::Url; use urlencoding; @@ -36,6 +36,7 @@ use crate::auth::{endpoint_sni, ComputeUserInfoParseError}; use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig}; use crate::context::RequestContext; use crate::error::{ErrorKind, ReportableError, UserFacingError}; +use crate::http::{read_body_with_limit, ReadBodyError}; use crate::metrics::{HttpDirection, Metrics}; use crate::proxy::{run_until_cancelled, NeonOptions}; use crate::serverless::backend::HttpConnError; @@ -47,6 +48,7 @@ use crate::usage_metrics::{MetricCounter, MetricCounterRecorder}; struct QueryData { query: String, #[serde(deserialize_with = "bytes_to_pg_text")] + #[serde(default)] params: Vec>, #[serde(default)] array_mode: Option, @@ -357,8 +359,6 @@ pub(crate) enum SqlOverHttpError { ConnectCompute(#[from] HttpConnError), #[error("{0}")] ConnInfo(#[from] ConnInfoError), - #[error("request is too large (max is {0} bytes)")] - RequestTooLarge(u64), #[error("response is too large (max is {0} bytes)")] ResponseTooLarge(usize), #[error("invalid isolation level")] @@ -377,7 +377,6 @@ impl ReportableError for SqlOverHttpError { SqlOverHttpError::ReadPayload(e) => e.get_error_kind(), SqlOverHttpError::ConnectCompute(e) => e.get_error_kind(), SqlOverHttpError::ConnInfo(e) => e.get_error_kind(), - SqlOverHttpError::RequestTooLarge(_) => ErrorKind::User, SqlOverHttpError::ResponseTooLarge(_) => ErrorKind::User, SqlOverHttpError::InvalidIsolationLevel => ErrorKind::User, SqlOverHttpError::Postgres(p) => p.get_error_kind(), @@ -393,7 +392,6 @@ impl UserFacingError for SqlOverHttpError { SqlOverHttpError::ReadPayload(p) => p.to_string(), SqlOverHttpError::ConnectCompute(c) => c.to_string_client(), SqlOverHttpError::ConnInfo(c) => c.to_string_client(), - SqlOverHttpError::RequestTooLarge(_) => self.to_string(), SqlOverHttpError::ResponseTooLarge(_) => self.to_string(), SqlOverHttpError::InvalidIsolationLevel => self.to_string(), SqlOverHttpError::Postgres(p) => p.to_string(), @@ -406,13 +404,12 @@ impl UserFacingError for SqlOverHttpError { impl HttpCodeError for SqlOverHttpError { fn get_http_status_code(&self) -> StatusCode { match self { - SqlOverHttpError::ReadPayload(_) => StatusCode::BAD_REQUEST, + SqlOverHttpError::ReadPayload(e) => e.get_http_status_code(), SqlOverHttpError::ConnectCompute(h) => match h.get_error_kind() { ErrorKind::User => StatusCode::BAD_REQUEST, _ => StatusCode::INTERNAL_SERVER_ERROR, }, SqlOverHttpError::ConnInfo(_) => StatusCode::BAD_REQUEST, - SqlOverHttpError::RequestTooLarge(_) => StatusCode::PAYLOAD_TOO_LARGE, SqlOverHttpError::ResponseTooLarge(_) => StatusCode::INSUFFICIENT_STORAGE, SqlOverHttpError::InvalidIsolationLevel => StatusCode::BAD_REQUEST, SqlOverHttpError::Postgres(_) => StatusCode::BAD_REQUEST, @@ -426,19 +423,41 @@ impl HttpCodeError for SqlOverHttpError { pub(crate) enum ReadPayloadError { #[error("could not read the HTTP request body: {0}")] Read(#[from] hyper::Error), + #[error("request is too large (max is {limit} bytes)")] + BodyTooLarge { limit: usize }, #[error("could not parse the HTTP request body: {0}")] Parse(#[from] serde_json::Error), } +impl From> for ReadPayloadError { + fn from(value: ReadBodyError) -> Self { + match value { + ReadBodyError::BodyTooLarge { limit } => Self::BodyTooLarge { limit }, + ReadBodyError::Read(e) => Self::Read(e), + } + } +} + impl ReportableError for ReadPayloadError { fn get_error_kind(&self) -> ErrorKind { match self { ReadPayloadError::Read(_) => ErrorKind::ClientDisconnect, + ReadPayloadError::BodyTooLarge { .. } => ErrorKind::User, ReadPayloadError::Parse(_) => ErrorKind::User, } } } +impl HttpCodeError for ReadPayloadError { + fn get_http_status_code(&self) -> StatusCode { + match self { + ReadPayloadError::Read(_) => StatusCode::BAD_REQUEST, + ReadPayloadError::BodyTooLarge { .. } => StatusCode::PAYLOAD_TOO_LARGE, + ReadPayloadError::Parse(_) => StatusCode::BAD_REQUEST, + } + } +} + #[derive(Debug, thiserror::Error)] pub(crate) enum SqlOverHttpCancel { #[error("query was cancelled")] @@ -580,28 +599,20 @@ async fn handle_db_inner( let parsed_headers = HttpHeaders::try_parse(headers)?; - let request_content_length = match request.body().size_hint().upper() { - Some(v) => v, - None => config.http_config.max_request_size_bytes + 1, - }; - info!(request_content_length, "request size in bytes"); - Metrics::get() - .proxy - .http_conn_content_length_bytes - .observe(HttpDirection::Request, request_content_length as f64); - - // we don't have a streaming request support yet so this is to prevent OOM - // from a malicious user sending an extremely large request body - if request_content_length > config.http_config.max_request_size_bytes { - return Err(SqlOverHttpError::RequestTooLarge( - config.http_config.max_request_size_bytes, - )); - } - let fetch_and_process_request = Box::pin( async { - let body = request.into_body().collect().await?.to_bytes(); - info!(length = body.len(), "request payload read"); + let body = read_body_with_limit( + request.into_body(), + config.http_config.max_request_size_bytes, + ) + .await?; + + Metrics::get() + .proxy + .http_conn_content_length_bytes + .observe(HttpDirection::Request, body.len() as f64); + + debug!(length = body.len(), "request payload read"); let payload: Payload = serde_json::from_slice(&body)?; Ok::(payload) // Adjust error type accordingly } @@ -768,6 +779,7 @@ async fn handle_auth_broker_inner( let _metrics = client.metrics(); Ok(client + .inner .inner .send_request(req) .await @@ -1095,3 +1107,63 @@ impl Discard<'_> { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_payload() { + let payload = "{\"query\":\"SELECT * FROM users WHERE name = ?\",\"params\":[\"test\"],\"arrayMode\":true}"; + let deserialized_payload: Payload = serde_json::from_str(payload).unwrap(); + + match deserialized_payload { + Payload::Single(QueryData { + query, + params, + array_mode, + }) => { + assert_eq!(query, "SELECT * FROM users WHERE name = ?"); + assert_eq!(params, vec![Some(String::from("test"))]); + assert!(array_mode.unwrap()); + } + Payload::Batch(_) => { + panic!("deserialization failed: case with single query, one param, and array mode") + } + } + + let payload = "{\"queries\":[{\"query\":\"SELECT * FROM users0 WHERE name = ?\",\"params\":[\"test0\"], \"arrayMode\":false},{\"query\":\"SELECT * FROM users1 WHERE name = ?\",\"params\":[\"test1\"],\"arrayMode\":true}]}"; + let deserialized_payload: Payload = serde_json::from_str(payload).unwrap(); + + match deserialized_payload { + Payload::Batch(BatchQueryData { queries }) => { + assert_eq!(queries.len(), 2); + for (i, query) in queries.into_iter().enumerate() { + assert_eq!( + query.query, + format!("SELECT * FROM users{i} WHERE name = ?") + ); + assert_eq!(query.params, vec![Some(format!("test{i}"))]); + assert_eq!(query.array_mode.unwrap(), i > 0); + } + } + Payload::Single(_) => panic!("deserialization failed: case with multiple queries"), + } + + let payload = "{\"query\":\"SELECT 1\"}"; + let deserialized_payload: Payload = serde_json::from_str(payload).unwrap(); + + match deserialized_payload { + Payload::Single(QueryData { + query, + params, + array_mode, + }) => { + assert_eq!(query, "SELECT 1"); + assert_eq!(params, vec![]); + assert!(array_mode.is_none()); + } + Payload::Batch(_) => panic!("deserialization failed: case with only one query"), + } + } +} diff --git a/storage_scrubber/src/garbage.rs b/storage_scrubber/src/garbage.rs index 91668a42a7..b026efbc3b 100644 --- a/storage_scrubber/src/garbage.rs +++ b/storage_scrubber/src/garbage.rs @@ -21,7 +21,7 @@ use utils::{backoff, id::TenantId}; use crate::{ cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData}, init_remote, list_objects_with_retries, - metadata_stream::{stream_tenant_timelines, stream_tenants}, + metadata_stream::{stream_tenant_timelines, stream_tenants_maybe_prefix}, BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth, MAX_RETRIES, }; @@ -118,9 +118,17 @@ pub async fn find_garbage( console_config: ConsoleConfig, depth: TraversingDepth, node_kind: NodeKind, + tenant_id_prefix: Option, output_path: String, ) -> anyhow::Result<()> { - let garbage = find_garbage_inner(bucket_config, console_config, depth, node_kind).await?; + let garbage = find_garbage_inner( + bucket_config, + console_config, + depth, + node_kind, + tenant_id_prefix, + ) + .await?; let serialized = serde_json::to_vec_pretty(&garbage)?; tokio::fs::write(&output_path, &serialized).await?; @@ -152,6 +160,7 @@ async fn find_garbage_inner( console_config: ConsoleConfig, depth: TraversingDepth, node_kind: NodeKind, + tenant_id_prefix: Option, ) -> anyhow::Result { // Construct clients for S3 and for Console API let (remote_client, target) = init_remote(bucket_config.clone(), node_kind).await?; @@ -178,7 +187,7 @@ async fn find_garbage_inner( // Enumerate Tenants in S3, and check if each one exists in Console tracing::info!("Finding all tenants in {}...", bucket_config.desc_str()); - let tenants = stream_tenants(&remote_client, &target); + let tenants = stream_tenants_maybe_prefix(&remote_client, &target, tenant_id_prefix); let tenants_checked = tenants.map_ok(|t| { let api_client = cloud_admin_api_client.clone(); let console_cache = console_cache.clone(); diff --git a/storage_scrubber/src/main.rs b/storage_scrubber/src/main.rs index 0ffb570984..92979d609e 100644 --- a/storage_scrubber/src/main.rs +++ b/storage_scrubber/src/main.rs @@ -54,6 +54,8 @@ enum Command { node_kind: NodeKind, #[arg(short, long, default_value_t=TraversingDepth::Tenant)] depth: TraversingDepth, + #[arg(short, long, default_value=None)] + tenant_id_prefix: Option, #[arg(short, long, default_value_t = String::from("garbage.json"))] output_path: String, }, @@ -209,10 +211,19 @@ async fn main() -> anyhow::Result<()> { Command::FindGarbage { node_kind, depth, + tenant_id_prefix, output_path, } => { let console_config = ConsoleConfig::from_env()?; - find_garbage(bucket_config, console_config, depth, node_kind, output_path).await + find_garbage( + bucket_config, + console_config, + depth, + node_kind, + tenant_id_prefix, + output_path, + ) + .await } Command::PurgeGarbage { input_path, diff --git a/storage_scrubber/src/metadata_stream.rs b/storage_scrubber/src/metadata_stream.rs index efda7c213d..47447d681c 100644 --- a/storage_scrubber/src/metadata_stream.rs +++ b/storage_scrubber/src/metadata_stream.rs @@ -17,9 +17,20 @@ use utils::id::{TenantId, TimelineId}; pub fn stream_tenants<'a>( remote_client: &'a GenericRemoteStorage, target: &'a RootTarget, +) -> impl Stream> + 'a { + stream_tenants_maybe_prefix(remote_client, target, None) +} +/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes +pub fn stream_tenants_maybe_prefix<'a>( + remote_client: &'a GenericRemoteStorage, + target: &'a RootTarget, + tenant_id_prefix: Option, ) -> impl Stream> + 'a { try_stream! { - let tenants_target = target.tenants_root(); + let mut tenants_target = target.tenants_root(); + if let Some(tenant_id_prefix) = tenant_id_prefix { + tenants_target.prefix_in_bucket += &tenant_id_prefix; + } let mut tenants_stream = std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target)); while let Some(chunk) = tenants_stream.next().await { diff --git a/test_runner/fixtures/remote_storage.py b/test_runner/fixtures/remote_storage.py index 7024953661..c630ea98b4 100644 --- a/test_runner/fixtures/remote_storage.py +++ b/test_runner/fixtures/remote_storage.py @@ -77,14 +77,16 @@ class MockS3Server: class LocalFsStorage: root: Path - def tenant_path(self, tenant_id: TenantId) -> Path: + def tenant_path(self, tenant_id: Union[TenantId, TenantShardId]) -> Path: return self.root / "tenants" / str(tenant_id) - def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path: + def timeline_path( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId + ) -> Path: return self.tenant_path(tenant_id) / "timelines" / str(timeline_id) def timeline_latest_generation( - self, tenant_id: TenantId, timeline_id: TimelineId + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId ) -> Optional[int]: timeline_files = os.listdir(self.timeline_path(tenant_id, timeline_id)) index_parts = [f for f in timeline_files if f.startswith("index_part")] @@ -102,7 +104,9 @@ class LocalFsStorage: raise RuntimeError(f"No index_part found for {tenant_id}/{timeline_id}") return generations[-1] - def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path: + def index_path( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId + ) -> Path: latest_gen = self.timeline_latest_generation(tenant_id, timeline_id) if latest_gen is None: filename = TIMELINE_INDEX_PART_FILE_NAME @@ -126,7 +130,9 @@ class LocalFsStorage: filename = f"{local_name}-{generation:08x}" return self.timeline_path(tenant_id, timeline_id) / filename - def index_content(self, tenant_id: TenantId, timeline_id: TimelineId) -> Any: + def index_content( + self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId + ) -> Any: with self.index_path(tenant_id, timeline_id).open("r") as f: return json.load(f) diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index d4aef96735..12134048e6 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -365,6 +365,19 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder): workload.validate(pageserver_a.id) workload.validate(pageserver_b.id) + # Force compaction on destination pageserver + pageserver_b.http_client().timeline_compact(tenant_id, timeline_id, force_l0_compaction=True) + + # Destination pageserver is in AttachedMulti, it should have generated deletions but + # not enqueued them yet. + # Check deletion metrics via prometheus - should be 0 since we're in AttachedMulti + assert ( + pageserver_b.http_client().get_metric_value( + "pageserver_deletion_queue_submitted_total", + ) + == 0 + ) + # Revert the origin to secondary log.info("Setting origin to Secondary") pageserver_a.tenant_location_configure( @@ -389,6 +402,17 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder): }, ) + # Transition to AttachedSingle should have drained deletions generated by doing a compaction + # while in AttachedMulti. + def blocked_deletions_drained(): + submitted = pageserver_b.http_client().get_metric_value( + "pageserver_deletion_queue_submitted_total" + ) + assert submitted is not None + assert submitted > 0 + + wait_until(10, 0.1, blocked_deletions_drained) + workload.churn_rows(64, pageserver_b.id) workload.validate(pageserver_b.id) del workload diff --git a/test_runner/regress/test_pg_regress.py b/test_runner/regress/test_pg_regress.py index f4698191eb..6a5e388c53 100644 --- a/test_runner/regress/test_pg_regress.py +++ b/test_runner/regress/test_pg_regress.py @@ -110,13 +110,15 @@ def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: End check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files) - # Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create. + # Ensure that compaction/GC works, on a timeline containing all the diversity that postgres regression tests create. # There should have been compactions mid-test as well, this final check is in addition those. for shard, pageserver in tenant_get_shards(env, env.initial_tenant): pageserver.http_client().timeline_checkpoint( shard, env.initial_timeline, force_repartition=True, force_image_layer_creation=True ) + pageserver.http_client().timeline_gc(shard, env.initial_timeline, None) + # Run the main PostgreSQL regression tests, in src/test/regress. # diff --git a/test_runner/regress/test_sharding.py b/test_runner/regress/test_sharding.py index 0a4a53356d..3194fe6ec4 100644 --- a/test_runner/regress/test_sharding.py +++ b/test_runner/regress/test_sharding.py @@ -19,7 +19,7 @@ from fixtures.neon_fixtures import ( wait_for_last_flush_lsn, ) from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty -from fixtures.remote_storage import s3_storage +from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, s3_storage from fixtures.utils import skip_in_debug_build, wait_until from fixtures.workload import Workload from pytest_httpserver import HTTPServer @@ -515,11 +515,12 @@ def test_sharding_split_smoke( """ - # We will start with 4 shards and split into 8, then migrate all those - # 8 shards onto separate pageservers - shard_count = 4 - split_shard_count = 8 - neon_env_builder.num_pageservers = split_shard_count * 2 + # Shard count we start with + shard_count = 2 + # Shard count we split into + split_shard_count = 4 + # We will have 2 shards per pageserver once done (including secondaries) + neon_env_builder.num_pageservers = split_shard_count # 1MiB stripes: enable getting some meaningful data distribution without # writing large quantities of data in this test. The stripe size is given @@ -591,7 +592,7 @@ def test_sharding_split_smoke( workload.validate() - assert len(pre_split_pageserver_ids) == 4 + assert len(pre_split_pageserver_ids) == shard_count def shards_on_disk(shard_ids): for pageserver in env.pageservers: @@ -654,9 +655,9 @@ def test_sharding_split_smoke( # - shard_count reconciles for the original setup of the tenant # - shard_count reconciles for detaching the original secondary locations during split # - split_shard_count reconciles during shard splitting, for setting up secondaries. - # - shard_count of the child shards will need to fail over to their secondaries - # - shard_count of the child shard secondary locations will get moved to emptier nodes - expect_reconciles = shard_count * 2 + split_shard_count + shard_count * 2 + # - split_shard_count/2 of the child shards will need to fail over to their secondaries (since we have 8 shards and 4 pageservers, only 4 will move) + expect_reconciles = shard_count * 2 + split_shard_count + split_shard_count / 2 + reconcile_ok = env.storage_controller.get_metric_value( "storage_controller_reconcile_complete_total", filter={"status": "ok"} ) @@ -720,22 +721,10 @@ def test_sharding_split_smoke( # dominated by shard count. log.info(f"total: {total}") assert total == { - 1: 1, - 2: 1, - 3: 1, - 4: 1, - 5: 1, - 6: 1, - 7: 1, - 8: 1, - 9: 1, - 10: 1, - 11: 1, - 12: 1, - 13: 1, - 14: 1, - 15: 1, - 16: 1, + 1: 2, + 2: 2, + 3: 2, + 4: 2, } # The controller is not required to lay out the attached locations in any particular way, but @@ -1685,3 +1674,111 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder): ) assert len(top["shards"]) == n_tenants - 4 assert set(i["id"] for i in top["shards"]) == set(str(i[0]) for i in tenants[4:]) + + +def test_sharding_gc( + neon_env_builder: NeonEnvBuilder, +): + """ + Exercise GC in a sharded tenant: because only shard 0 holds SLRU content, it acts as + the "leader" for GC, and other shards read its index to learn what LSN they should + GC up to. + """ + + shard_count = 4 + neon_env_builder.num_pageservers = shard_count + neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS) + + TENANT_CONF = { + # small checkpointing and compaction targets to ensure we generate many upload operations + "checkpoint_distance": 128 * 1024, + "compaction_threshold": 1, + "compaction_target_size": 128 * 1024, + # A short PITR horizon, so that we won't have to sleep too long in the test to wait for it to + # happen. + "pitr_interval": "1s", + # disable background compaction and GC. We invoke it manually when we want it to happen. + "gc_period": "0s", + "compaction_period": "0s", + # Disable automatic creation of image layers, as we will create them explicitly when we want them + "image_creation_threshold": 9999, + "image_layer_creation_check_threshold": 0, + "lsn_lease_length": "0s", + } + env = neon_env_builder.init_start( + initial_tenant_shard_count=shard_count, initial_tenant_conf=TENANT_CONF + ) + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + # Create a branch and write some data + workload = Workload(env, tenant_id, timeline_id) + initial_lsn = Lsn(workload.endpoint().safe_psql("SELECT pg_current_wal_lsn()")[0][0]) + log.info(f"Started at LSN: {initial_lsn}") + + workload.init() + + # Write enough data to generate multiple layers + for _i in range(10): + last_lsn = workload.write_rows(32) + + assert last_lsn > initial_lsn + + log.info(f"Wrote up to last LSN: {last_lsn}") + + # Do full image layer generation. When we subsequently wait for PITR, all historic deltas + # should be GC-able + for shard_number in range(shard_count): + shard = TenantShardId(tenant_id, shard_number, shard_count) + env.get_tenant_pageserver(shard).http_client().timeline_compact( + shard, timeline_id, force_image_layer_creation=True + ) + + workload.churn_rows(32) + + time.sleep(5) + + # Invoke GC on a non-zero shard and verify its GC cutoff LSN does not advance + shard_one = TenantShardId(tenant_id, 1, shard_count) + env.get_tenant_pageserver(shard_one).http_client().timeline_gc( + shard_one, timeline_id, gc_horizon=None + ) + + # Check shard 1's index - GC cutoff LSN should not have advanced + assert isinstance(env.pageserver_remote_storage, LocalFsStorage) + shard_1_index = env.pageserver_remote_storage.index_content( + tenant_id=shard_one, timeline_id=timeline_id + ) + shard_1_gc_cutoff_lsn = Lsn(shard_1_index["metadata_bytes"]["latest_gc_cutoff_lsn"]) + log.info(f"Shard 1 cutoff LSN: {shard_1_gc_cutoff_lsn}") + assert shard_1_gc_cutoff_lsn <= last_lsn + + shard_zero = TenantShardId(tenant_id, 0, shard_count) + env.get_tenant_pageserver(shard_zero).http_client().timeline_gc( + shard_zero, timeline_id, gc_horizon=None + ) + + # TODO: observe that GC LSN of shard 0 has moved forward in remote storage + assert isinstance(env.pageserver_remote_storage, LocalFsStorage) + shard_0_index = env.pageserver_remote_storage.index_content( + tenant_id=shard_zero, timeline_id=timeline_id + ) + shard_0_gc_cutoff_lsn = Lsn(shard_0_index["metadata_bytes"]["latest_gc_cutoff_lsn"]) + log.info(f"Shard 0 cutoff LSN: {shard_0_gc_cutoff_lsn}") + assert shard_0_gc_cutoff_lsn >= last_lsn + + # Invoke GC on all other shards and verify their GC cutoff LSNs + for shard_number in range(1, shard_count): + shard = TenantShardId(tenant_id, shard_number, shard_count) + env.get_tenant_pageserver(shard).http_client().timeline_gc( + shard, timeline_id, gc_horizon=None + ) + + # Verify GC cutoff LSN advanced to match shard 0 + shard_index = env.pageserver_remote_storage.index_content( + tenant_id=shard, timeline_id=timeline_id + ) + shard_gc_cutoff_lsn = Lsn(shard_index["metadata_bytes"]["latest_gc_cutoff_lsn"]) + log.info(f"Shard {shard_number} cutoff LSN: {shard_gc_cutoff_lsn}") + assert shard_gc_cutoff_lsn == shard_0_gc_cutoff_lsn