Merge branch 'problame/batching-benchmark' into problame/merge-getpage-test

This commit is contained in:
Christian Schwarz
2024-11-21 11:27:16 +01:00
41 changed files with 876 additions and 442 deletions

View File

@@ -105,6 +105,11 @@ fn main() -> Result<()> {
fn init() -> Result<(String, clap::ArgMatches)> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
opentelemetry::global::set_error_handler(|err| {
tracing::info!("OpenTelemetry error: {err}");
})
.expect("global error handler lock poisoned");
let mut signals = Signals::new([SIGINT, SIGTERM, SIGQUIT])?;
thread::spawn(move || {
for sig in signals.forever() {

View File

@@ -2,14 +2,28 @@
// This module has heavy inspiration from the prometheus crate's `process_collector.rs`.
use once_cell::sync::Lazy;
use prometheus::Gauge;
use crate::UIntGauge;
pub struct Collector {
descs: Vec<prometheus::core::Desc>,
vmlck: crate::UIntGauge,
cpu_seconds_highres: Gauge,
}
const NMETRICS: usize = 1;
const NMETRICS: usize = 2;
static CLK_TCK_F64: Lazy<f64> = Lazy::new(|| {
let long = unsafe { libc::sysconf(libc::_SC_CLK_TCK) };
if long == -1 {
panic!("sysconf(_SC_CLK_TCK) failed");
}
let convertible_to_f64: i32 =
i32::try_from(long).expect("sysconf(_SC_CLK_TCK) is larger than i32");
convertible_to_f64 as f64
});
impl prometheus::core::Collector for Collector {
fn desc(&self) -> Vec<&prometheus::core::Desc> {
@@ -27,6 +41,12 @@ impl prometheus::core::Collector for Collector {
mfs.extend(self.vmlck.collect())
}
}
if let Ok(stat) = myself.stat() {
let cpu_seconds = stat.utime + stat.stime;
self.cpu_seconds_highres
.set(cpu_seconds as f64 / *CLK_TCK_F64);
mfs.extend(self.cpu_seconds_highres.collect());
}
mfs
}
}
@@ -43,7 +63,23 @@ impl Collector {
.cloned(),
);
Self { descs, vmlck }
let cpu_seconds_highres = Gauge::new(
"libmetrics_process_cpu_seconds_highres",
"Total user and system CPU time spent in seconds.\
Sub-second resolution, hence better than `process_cpu_seconds_total`.",
)
.unwrap();
descs.extend(
prometheus::core::Collector::desc(&cpu_seconds_highres)
.into_iter()
.cloned(),
);
Self {
descs,
vmlck,
cpu_seconds_highres,
}
}
}

View File

@@ -184,6 +184,7 @@ pub struct CancelKeyData {
impl fmt::Display for CancelKeyData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// TODO: this is producing strange results, with 0xffffffff........ always in the logs.
let hi = (self.backend_pid as u64) << 32;
let lo = self.cancel_key as u64;
let id = hi | lo;

View File

@@ -97,10 +97,7 @@ impl AzureBlobStorage {
pub fn relative_path_to_name(&self, path: &RemotePath) -> String {
assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR);
let path_string = path
.get_path()
.as_str()
.trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR);
let path_string = path.get_path().as_str();
match &self.prefix_in_container {
Some(prefix) => {
if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
@@ -277,19 +274,14 @@ impl RemoteStorage for AzureBlobStorage {
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> {
// get the passed prefix or if it is not set use prefix_in_bucket value
let list_prefix = prefix
.map(|p| self.relative_path_to_name(p))
.or_else(|| self.prefix_in_container.clone())
.map(|mut p| {
// required to end with a separator
// otherwise request will return only the entry of a prefix
if matches!(mode, ListingMode::WithDelimiter)
&& !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR)
{
p.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
let list_prefix = prefix.map(|p| self.relative_path_to_name(p)).or_else(|| {
self.prefix_in_container.clone().map(|mut s| {
if !s.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) {
s.push(REMOTE_STORAGE_PREFIX_SEPARATOR);
}
p
});
s
})
});
async_stream::stream! {
let _permit = self.permit(RequestKind::List, cancel).await?;

View File

@@ -189,6 +189,7 @@ pub struct TenantSharedResources {
/// A [`Tenant`] is really an _attached_ tenant. The configuration
/// for an attached tenant is a subset of the [`LocationConf`], represented
/// in this struct.
#[derive(Clone)]
pub(super) struct AttachedTenantConf {
tenant_conf: TenantConfOpt,
location: AttachedLocationConfig,
@@ -1807,6 +1808,7 @@ impl Tenant {
self.tenant_shard_id,
timeline_id,
self.generation,
&self.tenant_conf.load().location,
)
}
@@ -2527,6 +2529,10 @@ impl Tenant {
{
let conf = self.tenant_conf.load();
// If we may not delete layers, then simply skip GC. Even though a tenant
// in AttachedMulti state could do GC and just enqueue the blocked deletions,
// the only advantage to doing it is to perhaps shrink the LayerMap metadata
// a bit sooner than we would achieve by waiting for AttachedSingle status.
if !conf.location.may_delete_layers_hint() {
info!("Skipping GC in location state {:?}", conf.location);
return Ok(GcResult::default());
@@ -2568,7 +2574,14 @@ impl Tenant {
{
let conf = self.tenant_conf.load();
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
// Note that compaction usually requires deletions, but we don't respect
// may_delete_layers_hint here: that is because tenants in AttachedMulti
// should proceed with compaction even if they can't do deletion, to avoid
// accumulating dangerously deep stacks of L0 layers. Deletions will be
// enqueued inside RemoteTimelineClient, and executed layer if/when we transition
// to AttachedSingle state.
if !conf.location.may_upload_layers_hint() {
info!("Skipping compaction in location state {:?}", conf.location);
return Ok(false);
}
@@ -3446,6 +3459,7 @@ impl Tenant {
// this race is not possible if both request types come from the storage
// controller (as they should!) because an exclusive op lock is required
// on the storage controller side.
self.tenant_conf.rcu(|inner| {
Arc::new(AttachedTenantConf {
tenant_conf: new_tenant_conf.clone(),
@@ -3455,20 +3469,22 @@ impl Tenant {
})
});
let updated = self.tenant_conf.load().clone();
self.tenant_conf_updated(&new_tenant_conf);
// Don't hold self.timelines.lock() during the notifies.
// There's no risk of deadlock right now, but there could be if we consolidate
// mutexes in struct Timeline in the future.
let timelines = self.list_timelines();
for timeline in timelines {
timeline.tenant_conf_updated(&new_tenant_conf);
timeline.tenant_conf_updated(&updated);
}
}
pub(crate) fn set_new_location_config(&self, new_conf: AttachedTenantConf) {
let new_tenant_conf = new_conf.tenant_conf.clone();
self.tenant_conf.store(Arc::new(new_conf));
self.tenant_conf.store(Arc::new(new_conf.clone()));
self.tenant_conf_updated(&new_tenant_conf);
// Don't hold self.timelines.lock() during the notifies.
@@ -3476,7 +3492,7 @@ impl Tenant {
// mutexes in struct Timeline in the future.
let timelines = self.list_timelines();
for timeline in timelines {
timeline.tenant_conf_updated(&new_tenant_conf);
timeline.tenant_conf_updated(&new_conf);
}
}
@@ -4544,6 +4560,7 @@ impl Tenant {
self.tenant_shard_id,
timeline_id,
self.generation,
&self.tenant_conf.load().location,
)
}

View File

@@ -1719,10 +1719,11 @@ impl TenantManager {
parent_layers.push(relative_path.to_owned());
}
}
debug_assert!(
!parent_layers.is_empty(),
"shutdown cannot empty the layermap"
);
if parent_layers.is_empty() {
tracing::info!("Ancestor shard has no resident layer to hard link");
}
(parent_timelines, parent_layers)
};

View File

@@ -197,6 +197,7 @@ use utils::backoff::{
self, exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use utils::pausable_failpoint;
use utils::shard::ShardNumber;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicU32, Ordering};
@@ -240,6 +241,7 @@ use utils::id::{TenantId, TimelineId};
use self::index::IndexPart;
use super::config::AttachedLocationConfig;
use super::metadata::MetadataUpdate;
use super::storage_layer::{Layer, LayerName, ResidentLayer};
use super::upload_queue::{NotInitialized, SetDeletedFlagProgress};
@@ -301,6 +303,36 @@ pub enum WaitCompletionError {
#[derive(Debug, thiserror::Error)]
#[error("Upload queue either in unexpected state or hasn't downloaded manifest yet")]
pub struct UploadQueueNotReadyError;
/// Behavioral modes that enable seamless live migration.
///
/// See docs/rfcs/028-pageserver-migration.md to understand how these fit in.
struct RemoteTimelineClientConfig {
/// If this is false, then update to remote_consistent_lsn are dropped rather
/// than being submitted to DeletionQueue for validation. This behavior is
/// used when a tenant attachment is known to have a stale generation number,
/// such that validation attempts will always fail. This is not necessary
/// for correctness, but avoids spamming error statistics with failed validations
/// when doing migrations of tenants.
process_remote_consistent_lsn_updates: bool,
/// If this is true, then object deletions are held in a buffer in RemoteTimelineClient
/// rather than being submitted to the DeletionQueue. This behavior is used when a tenant
/// is known to be multi-attached, in order to avoid disrupting other attached tenants
/// whose generations' metadata refers to the deleted objects.
block_deletions: bool,
}
/// RemoteTimelineClientConfig's state is entirely driven by LocationConf, but we do
/// not carry the entire LocationConf structure: it's much more than we need. The From
/// impl extracts the subset of the LocationConf that is interesting to RemoteTimelineClient.
impl From<&AttachedLocationConfig> for RemoteTimelineClientConfig {
fn from(lc: &AttachedLocationConfig) -> Self {
Self {
block_deletions: !lc.may_delete_layers_hint(),
process_remote_consistent_lsn_updates: lc.may_upload_layers_hint(),
}
}
}
/// A client for accessing a timeline's data in remote storage.
///
@@ -321,7 +353,7 @@ pub struct UploadQueueNotReadyError;
/// in the index part file, whenever timeline metadata is uploaded.
///
/// Downloads are not queued, they are performed immediately.
pub struct RemoteTimelineClient {
pub(crate) struct RemoteTimelineClient {
conf: &'static PageServerConf,
runtime: tokio::runtime::Handle,
@@ -338,6 +370,9 @@ pub struct RemoteTimelineClient {
deletion_queue_client: DeletionQueueClient,
/// Subset of tenant configuration used to control upload behaviors during migrations
config: std::sync::RwLock<RemoteTimelineClientConfig>,
cancel: CancellationToken,
}
@@ -348,13 +383,14 @@ impl RemoteTimelineClient {
/// Note: the caller must initialize the upload queue before any uploads can be scheduled,
/// by calling init_upload_queue.
///
pub fn new(
pub(crate) fn new(
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
conf: &'static PageServerConf,
tenant_shard_id: TenantShardId,
timeline_id: TimelineId,
generation: Generation,
location_conf: &AttachedLocationConfig,
) -> RemoteTimelineClient {
RemoteTimelineClient {
conf,
@@ -374,6 +410,7 @@ impl RemoteTimelineClient {
&tenant_shard_id,
&timeline_id,
)),
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(location_conf)),
cancel: CancellationToken::new(),
}
}
@@ -429,6 +466,43 @@ impl RemoteTimelineClient {
Ok(())
}
/// Notify this client of a change to its parent tenant's config, as this may cause us to
/// take action (unblocking deletions when transitioning from AttachedMulti to AttachedSingle)
pub(super) fn update_config(&self, location_conf: &AttachedLocationConfig) {
let new_conf = RemoteTimelineClientConfig::from(location_conf);
let unblocked = !new_conf.block_deletions;
// Update config before draining deletions, so that we don't race with more being
// inserted. This can result in deletions happening our of order, but that does not
// violate any invariants: deletions only need to be ordered relative to upload of the index
// that dereferences the deleted objects, and we are not changing that order.
*self.config.write().unwrap() = new_conf;
if unblocked {
// If we may now delete layers, drain any that were blocked in our old
// configuration state
let mut queue_locked = self.upload_queue.lock().unwrap();
if let Ok(queue) = queue_locked.initialized_mut() {
let blocked_deletions = std::mem::take(&mut queue.blocked_deletions);
for d in blocked_deletions {
if let Err(e) = self.deletion_queue_client.push_layers_sync(
self.tenant_shard_id,
self.timeline_id,
self.generation,
d.layers,
) {
// This could happen if the pageserver is shut down while a tenant
// is transitioning from a deletion-blocked state: we will leak some
// S3 objects in this case.
warn!("Failed to drain blocked deletions: {}", e);
break;
}
}
}
}
}
/// Returns `None` if nothing is yet uplodaded, `Some(disk_consistent_lsn)` otherwise.
pub fn remote_consistent_lsn_projected(&self) -> Option<Lsn> {
match &mut *self.upload_queue.lock().unwrap() {
@@ -1912,16 +1986,24 @@ impl RemoteTimelineClient {
res
}
UploadOp::Delete(delete) => {
pausable_failpoint!("before-delete-layer-pausable");
self.deletion_queue_client
.push_layers(
self.tenant_shard_id,
self.timeline_id,
self.generation,
delete.layers.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e))
if self.config.read().unwrap().block_deletions {
let mut queue_locked = self.upload_queue.lock().unwrap();
if let Ok(queue) = queue_locked.initialized_mut() {
queue.blocked_deletions.push(delete.clone());
}
Ok(())
} else {
pausable_failpoint!("before-delete-layer-pausable");
self.deletion_queue_client
.push_layers(
self.tenant_shard_id,
self.timeline_id,
self.generation,
delete.layers.clone(),
)
.await
.map_err(|e| anyhow::anyhow!(e))
}
}
unexpected @ UploadOp::Barrier(_) | unexpected @ UploadOp::Shutdown => {
// unreachable. Barrier operations are handled synchronously in
@@ -2028,8 +2110,16 @@ impl RemoteTimelineClient {
// Legacy mode: skip validating generation
upload_queue.visible_remote_consistent_lsn.store(lsn);
None
} else {
} else if self
.config
.read()
.unwrap()
.process_remote_consistent_lsn_updates
{
Some((lsn, upload_queue.visible_remote_consistent_lsn.clone()))
} else {
// Our config disables remote_consistent_lsn updates: drop it.
None
}
}
UploadOp::Delete(_) => {
@@ -2166,6 +2256,7 @@ impl RemoteTimelineClient {
queued_operations: VecDeque::default(),
#[cfg(feature = "testing")]
dangling_files: HashMap::default(),
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
};
@@ -2231,6 +2322,28 @@ impl RemoteTimelineClient {
UploadQueue::Initialized(x) => x.no_pending_work(),
}
}
/// 'foreign' in the sense that it does not belong to this tenant shard. This method
/// is used during GC for other shards to get the index of shard zero.
pub(crate) async fn download_foreign_index(
&self,
shard_number: ShardNumber,
cancel: &CancellationToken,
) -> Result<(IndexPart, Generation, std::time::SystemTime), DownloadError> {
let foreign_shard_id = TenantShardId {
shard_number,
shard_count: self.tenant_shard_id.shard_count,
tenant_id: self.tenant_shard_id.tenant_id,
};
download_index_part(
&self.storage_impl,
&foreign_shard_id,
&self.timeline_id,
Generation::MAX,
cancel,
)
.await
}
}
pub(crate) struct UploadQueueAccessor<'a> {
@@ -2379,6 +2492,7 @@ mod tests {
use crate::{
context::RequestContext,
tenant::{
config::AttachmentMode,
harness::{TenantHarness, TIMELINE_ID},
storage_layer::layer::local_layer_path,
Tenant, Timeline,
@@ -2464,6 +2578,10 @@ mod tests {
/// Construct a RemoteTimelineClient in an arbitrary generation
fn build_client(&self, generation: Generation) -> Arc<RemoteTimelineClient> {
let location_conf = AttachedLocationConfig {
generation,
attach_mode: AttachmentMode::Single,
};
Arc::new(RemoteTimelineClient {
conf: self.harness.conf,
runtime: tokio::runtime::Handle::current(),
@@ -2477,6 +2595,7 @@ mod tests {
&self.harness.tenant_shard_id,
&TIMELINE_ID,
)),
config: std::sync::RwLock::new(RemoteTimelineClientConfig::from(&location_conf)),
cancel: CancellationToken::new(),
})
}

View File

@@ -111,15 +111,6 @@ pub(crate) struct SecondaryTenant {
pub(super) heatmap_total_size_metric: UIntGauge,
}
impl Drop for SecondaryTenant {
fn drop(&mut self) {
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
}
}
impl SecondaryTenant {
pub(crate) fn new(
tenant_shard_id: TenantShardId,
@@ -167,6 +158,13 @@ impl SecondaryTenant {
// Wait for any secondary downloader work to complete
self.gate.close().await;
self.validate_metrics();
let tenant_id = self.tenant_shard_id.tenant_id.to_string();
let shard_id = format!("{}", self.tenant_shard_id.shard_slug());
let _ = SECONDARY_RESIDENT_PHYSICAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
let _ = SECONDARY_HEATMAP_TOTAL_SIZE.remove_label_values(&[&tenant_id, &shard_id]);
}
pub(crate) fn set_config(&self, config: &SecondaryLocationConfig) {
@@ -254,6 +252,20 @@ impl SecondaryTenant {
.await
.expect("secondary eviction should not have panicked");
}
/// Exhaustive check that incrementally updated metrics match the actual state.
#[cfg(feature = "testing")]
fn validate_metrics(&self) {
let detail = self.detail.lock().unwrap();
let resident_size = detail.total_resident_size();
assert_eq!(resident_size, self.resident_size_metric.get());
}
#[cfg(not(feature = "testing"))]
fn validate_metrics(&self) {
// No-op in non-testing builds
}
}
/// The SecondaryController is a pseudo-rpc client for administrative control of secondary mode downloads,

View File

@@ -242,6 +242,19 @@ impl SecondaryDetail {
}
}
#[cfg(feature = "testing")]
pub(crate) fn total_resident_size(&self) -> u64 {
self.timelines
.values()
.map(|tl| {
tl.on_disk_layers
.values()
.map(|v| v.metadata.file_size)
.sum::<u64>()
})
.sum::<u64>()
}
pub(super) fn evict_layer(
&mut self,
name: LayerName,
@@ -763,24 +776,7 @@ impl<'a> TenantDownloader<'a> {
}
// Metrics consistency check in testing builds
if cfg!(feature = "testing") {
let detail = self.secondary_state.detail.lock().unwrap();
let resident_size = detail
.timelines
.values()
.map(|tl| {
tl.on_disk_layers
.values()
.map(|v| v.metadata.file_size)
.sum::<u64>()
})
.sum::<u64>();
assert_eq!(
resident_size,
self.secondary_state.resident_size_metric.get()
);
}
self.secondary_state.validate_metrics();
// Only update last_etag after a full successful download: this way will not skip
// the next download, even if the heatmap's actual etag is unchanged.
self.secondary_state.detail.lock().unwrap().last_download = Some(DownloadSummary {

View File

@@ -38,6 +38,7 @@ use pageserver_api::{
shard::{ShardIdentity, ShardNumber, TenantShardId},
};
use rand::Rng;
use remote_storage::DownloadError;
use serde_with::serde_as;
use storage_broker::BrokerClientChannel;
use tokio::{
@@ -272,7 +273,7 @@ pub struct Timeline {
/// Remote storage client.
/// See [`remote_timeline_client`](super::remote_timeline_client) module comment for details.
pub remote_client: Arc<RemoteTimelineClient>,
pub(crate) remote_client: Arc<RemoteTimelineClient>,
// What page versions do we hold in the repository? If we get a
// request > last_record_lsn, we need to wait until we receive all
@@ -2171,14 +2172,14 @@ impl Timeline {
)
}
pub(super) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) {
pub(super) fn tenant_conf_updated(&self, new_conf: &AttachedTenantConf) {
// NB: Most tenant conf options are read by background loops, so,
// changes will automatically be picked up.
// The threshold is embedded in the metric. So, we need to update it.
{
let new_threshold = Self::get_evictions_low_residence_duration_metric_threshold(
new_conf,
&new_conf.tenant_conf,
&self.conf.default_tenant_conf,
);
@@ -2186,6 +2187,9 @@ impl Timeline {
let shard_id_str = format!("{}", self.tenant_shard_id.shard_slug());
let timeline_id_str = self.timeline_id.to_string();
self.remote_client.update_config(&new_conf.location);
self.metrics
.evictions_with_low_residence_duration
.write()
@@ -4821,6 +4825,86 @@ impl Timeline {
Ok(())
}
async fn find_gc_time_cutoff(
&self,
pitr: Duration,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<Option<Lsn>, PageReconstructError> {
debug_assert_current_span_has_tenant_and_timeline_id();
if self.shard_identity.is_shard_zero() {
// Shard Zero has SLRU data and can calculate the PITR time -> LSN mapping itself
let now = SystemTime::now();
let time_range = if pitr == Duration::ZERO {
humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid")
} else {
pitr
};
// If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case)
let time_cutoff = now.checked_sub(time_range).unwrap_or(now);
let timestamp = to_pg_timestamp(time_cutoff);
let time_cutoff = match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? {
LsnForTimestamp::Present(lsn) => Some(lsn),
LsnForTimestamp::Future(lsn) => {
// The timestamp is in the future. That sounds impossible,
// but what it really means is that there hasn't been
// any commits since the cutoff timestamp.
//
// In this case we should use the LSN of the most recent commit,
// which is implicitly the last LSN in the log.
debug!("future({})", lsn);
Some(self.get_last_record_lsn())
}
LsnForTimestamp::Past(lsn) => {
debug!("past({})", lsn);
None
}
LsnForTimestamp::NoData(lsn) => {
debug!("nodata({})", lsn);
None
}
};
Ok(time_cutoff)
} else {
// Shards other than shard zero cannot do timestamp->lsn lookups, and must instead learn their GC cutoff
// from shard zero's index. The index doesn't explicitly tell us the time cutoff, but we may assume that
// the point up to which shard zero's last_gc_cutoff has advanced will either be the time cutoff, or a
// space cutoff that we would also have respected ourselves.
match self
.remote_client
.download_foreign_index(ShardNumber(0), cancel)
.await
{
Ok((index_part, index_generation, _index_mtime)) => {
tracing::info!("GC loaded shard zero metadata (gen {index_generation:?}): latest_gc_cutoff_lsn: {}",
index_part.metadata.latest_gc_cutoff_lsn());
Ok(Some(index_part.metadata.latest_gc_cutoff_lsn()))
}
Err(DownloadError::NotFound) => {
// This is unexpected, because during timeline creations shard zero persists to remote
// storage before other shards are called, and during timeline deletion non-zeroth shards are
// deleted before the zeroth one. However, it should be harmless: if we somehow end up in this
// state, then shard zero should _eventually_ write an index when it GCs.
tracing::warn!("GC couldn't find shard zero's index for timeline");
Ok(None)
}
Err(e) => {
// TODO: this function should return a different error type than page reconstruct error
Err(PageReconstructError::Other(anyhow::anyhow!(e)))
}
}
// TODO: after reading shard zero's GC cutoff, we should validate its generation with the storage
// controller. Otherwise, it is possible that we see the GC cutoff go backwards while shard zero
// is going through a migration if we read the old location's index and it has GC'd ahead of the
// new location. This is legal in principle, but problematic in practice because it might result
// in a timeline creation succeeding on shard zero ('s new location) but then failing on other shards
// because they have GC'd past the branch point.
}
}
/// Find the Lsns above which layer files need to be retained on
/// garbage collection.
///
@@ -4863,40 +4947,7 @@ impl Timeline {
// - if PITR interval is set, then this is our cutoff.
// - if PITR interval is not set, then we do a lookup
// based on DEFAULT_PITR_INTERVAL, so that size-based retention does not result in keeping history around permanently on idle databases.
let time_cutoff = {
let now = SystemTime::now();
let time_range = if pitr == Duration::ZERO {
humantime::parse_duration(DEFAULT_PITR_INTERVAL).expect("constant is invalid")
} else {
pitr
};
// If PITR is so large or `now` is so small that this underflows, we will retain no history (highly unexpected case)
let time_cutoff = now.checked_sub(time_range).unwrap_or(now);
let timestamp = to_pg_timestamp(time_cutoff);
match self.find_lsn_for_timestamp(timestamp, cancel, ctx).await? {
LsnForTimestamp::Present(lsn) => Some(lsn),
LsnForTimestamp::Future(lsn) => {
// The timestamp is in the future. That sounds impossible,
// but what it really means is that there hasn't been
// any commits since the cutoff timestamp.
//
// In this case we should use the LSN of the most recent commit,
// which is implicitly the last LSN in the log.
debug!("future({})", lsn);
Some(self.get_last_record_lsn())
}
LsnForTimestamp::Past(lsn) => {
debug!("past({})", lsn);
None
}
LsnForTimestamp::NoData(lsn) => {
debug!("nodata({})", lsn);
None
}
}
};
let time_cutoff = self.find_gc_time_cutoff(pitr, cancel, ctx).await?;
Ok(match (pitr, time_cutoff) {
(Duration::ZERO, Some(time_cutoff)) => {

View File

@@ -283,7 +283,7 @@ impl DeleteTimelineFlow {
/// Shortcut to create Timeline in stopping state and spawn deletion task.
#[instrument(skip_all, fields(%timeline_id))]
pub async fn resume_deletion(
pub(crate) async fn resume_deletion(
tenant: Arc<Tenant>,
timeline_id: TimelineId,
local_metadata: &TimelineMetadata,

View File

@@ -88,6 +88,9 @@ pub(crate) struct UploadQueueInitialized {
#[cfg(feature = "testing")]
pub(crate) dangling_files: HashMap<LayerName, Generation>,
/// Deletions that are blocked by the tenant configuration
pub(crate) blocked_deletions: Vec<Delete>,
/// Set to true when we have inserted the `UploadOp::Shutdown` into the `inprogress_tasks`.
pub(crate) shutting_down: bool,
@@ -180,6 +183,7 @@ impl UploadQueue {
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
};
@@ -220,6 +224,7 @@ impl UploadQueue {
queued_operations: VecDeque::new(),
#[cfg(feature = "testing")]
dangling_files: HashMap::new(),
blocked_deletions: Vec::new(),
shutting_down: false,
shutdown_ready: Arc::new(tokio::sync::Semaphore::new(0)),
};
@@ -270,7 +275,7 @@ pub(crate) struct UploadTask {
/// A deletion of some layers within the lifetime of a timeline. This is not used
/// for timeline deletion, which skips this queue and goes directly to DeletionQueue.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct Delete {
pub(crate) layers: Vec<(LayerName, LayerFileMetadata)>,
}

View File

@@ -111,7 +111,7 @@ struct SqlOverHttpArgs {
sql_over_http_cancel_set_shards: usize,
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
sql_over_http_max_request_size_bytes: u64,
sql_over_http_max_request_size_bytes: usize,
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
sql_over_http_max_response_size_bytes: usize,

View File

@@ -276,7 +276,7 @@ struct SqlOverHttpArgs {
sql_over_http_cancel_set_shards: usize,
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
sql_over_http_max_request_size_bytes: u64,
sql_over_http_max_request_size_bytes: usize,
#[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB
sql_over_http_max_response_size_bytes: usize,

View File

@@ -64,7 +64,7 @@ pub struct HttpConfig {
pub pool_options: GlobalConnPoolOptions,
pub cancel_set: CancelSet,
pub client_conn_threshold: u64,
pub max_request_size_bytes: u64,
pub max_request_size_bytes: usize,
pub max_response_size_bytes: usize,
}

View File

@@ -380,6 +380,7 @@ impl super::ControlPlaneApi for NeonControlPlaneClient {
// after getting back a permit - it's possible the cache was filled
// double check
if permit.should_check_cache() {
// TODO: if there is something in the cache, mark the permit as success.
check_cache!();
}

View File

@@ -122,18 +122,18 @@ impl Endpoint {
}
#[derive(Error, Debug)]
pub(crate) enum ReadBodyError {
pub(crate) enum ReadBodyError<E> {
#[error("Content length exceeds limit of {limit} bytes")]
BodyTooLarge { limit: usize },
#[error(transparent)]
Read(#[from] reqwest::Error),
Read(#[from] E),
}
pub(crate) async fn read_body_with_limit(
mut b: impl Body<Data = Bytes, Error = reqwest::Error> + Unpin,
pub(crate) async fn read_body_with_limit<E>(
mut b: impl Body<Data = Bytes, Error = E> + Unpin,
limit: usize,
) -> Result<Vec<u8>, ReadBodyError> {
) -> Result<Vec<u8>, ReadBodyError<E>> {
// We could use `b.limited().collect().await.to_bytes()` here
// but this ends up being slightly more efficient as far as I can tell.

View File

@@ -117,7 +117,6 @@ where
node_info.set_keys(user_info.get_keys());
node_info.allow_self_signed_compute = allow_self_signed_compute;
mechanism.update_connect_config(&mut node_info.config);
let retry_type = RetryType::ConnectToCompute;
// try once
let err = match mechanism
@@ -129,7 +128,7 @@ where
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
retry_type,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
@@ -147,7 +146,7 @@ where
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
retry_type,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
@@ -156,8 +155,9 @@ where
node_info
} else {
// if we failed to connect, it's likely that the compute node was suspended, wake a new compute node
info!("compute node's state has likely changed; requesting a wake-up");
debug!("compute node's state has likely changed; requesting a wake-up");
let old_node_info = invalidate_cache(node_info);
// TODO: increment num_retries?
let mut node_info =
wake_compute(&mut num_retries, ctx, user_info, wake_compute_retry_config).await?;
node_info.reuse_settings(old_node_info);
@@ -169,7 +169,7 @@ where
// now that we have a new node, try connect to it repeatedly.
// this can error for a few reasons, for instance:
// * DNS connection settings haven't quite propagated yet
info!("wake_compute success. attempting to connect");
debug!("wake_compute success. attempting to connect");
num_retries = 1;
loop {
match mechanism
@@ -181,10 +181,11 @@ where
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
retry_type,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);
// TODO: is this necessary? We have a metric.
info!(?num_retries, "connected to compute node after");
return Ok(res);
}
@@ -194,7 +195,7 @@ where
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
retry_type,
retry_type: RetryType::ConnectToCompute,
},
num_retries.into(),
);

View File

@@ -87,6 +87,8 @@ where
transfer_one_direction(cx, &mut compute_to_client, compute, client)
.map_err(ErrorSource::from_compute)?;
// TODO: 1 info log, with a enum label for close direction.
// Early termination checks from compute to client.
if let TransferState::Done(_) = compute_to_client {
if let TransferState::Running(buf) = &client_to_compute {

View File

@@ -5,7 +5,7 @@ use pq_proto::{
};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, warn};
use tracing::{debug, info, warn};
use crate::auth::endpoint_sni;
use crate::config::{TlsConfig, PG_ALPN_PROTOCOL};
@@ -199,6 +199,8 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
.await?;
}
// This log highlights the start of the connection.
// This contains useful information for debugging, not logged elsewhere, like role name and endpoint id.
info!(
?version,
?params,
@@ -211,7 +213,7 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
FeStartupPacket::StartupMessage { params, version }
if version.major() == 3 && version > PG_PROTOCOL_LATEST =>
{
warn!(?version, "unsupported minor version");
debug!(?version, "unsupported minor version");
// no protocol extensions are supported.
// <https://github.com/postgres/postgres/blob/ca481d3c9ab7bf69ff0c8d71ad3951d407f6a33c/src/backend/tcop/backend_startup.c#L744-L753>
@@ -233,14 +235,16 @@ pub(crate) async fn handshake<S: AsyncRead + AsyncWrite + Unpin>(
info!(
?version,
?params,
session_type = "normal",
"successful handshake; unsupported minor version requested"
);
break Ok(HandshakeData::Startup(stream, params));
}
FeStartupPacket::StartupMessage { version, .. } => {
FeStartupPacket::StartupMessage { version, params } => {
warn!(
?version,
?params,
session_type = "normal",
"unsuccessful handshake; unsupported version"
);

View File

@@ -254,7 +254,7 @@ pub(crate) async fn handle_client<S: AsyncRead + AsyncWrite + Unpin>(
endpoint_rate_limiter: Arc<EndpointRateLimiter>,
conn_gauge: NumClientConnectionsGuard<'static>,
) -> Result<Option<ProxyPassthrough<CancellationHandlerMainInternal, S>>, ClientRequestError> {
info!(
debug!(
protocol = %ctx.protocol(),
"handling interactive connection from client"
);

View File

@@ -1,5 +1,5 @@
use tokio::io::{AsyncRead, AsyncWrite};
use tracing::info;
use tracing::debug;
use utils::measured_stream::MeasuredStream;
use super::copy_bidirectional::ErrorSource;
@@ -45,7 +45,7 @@ pub(crate) async fn proxy_pass(
);
// Starting from here we only proxy the client's traffic.
info!("performing the proxy pass...");
debug!("performing the proxy pass...");
let _ = crate::proxy::copy_bidirectional::copy_bidirectional_client_compute(
&mut client,
&mut compute,

View File

@@ -17,7 +17,6 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
api: &B,
config: RetryConfig,
) -> Result<CachedNodeInfo, WakeComputeError> {
let retry_type = RetryType::WakeCompute;
loop {
match api.wake_compute(ctx).await {
Err(e) if !should_retry(&e, *num_retries, config) => {
@@ -26,7 +25,7 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Failed,
retry_type,
retry_type: RetryType::WakeCompute,
},
(*num_retries).into(),
);
@@ -40,10 +39,12 @@ pub(crate) async fn wake_compute<B: ComputeConnectBackend>(
Metrics::get().proxy.retries_metric.observe(
RetriesMetricGroup {
outcome: ConnectOutcome::Success,
retry_type,
retry_type: RetryType::WakeCompute,
},
(*num_retries).into(),
);
// TODO: is this necessary? We have a metric.
// TODO: this log line is misleading as "wake_compute" might return cached (and stale) info.
info!(?num_retries, "compute node woken up after");
return Ok(n);
}

View File

@@ -195,7 +195,11 @@ impl DynamicLimiter {
///
/// Set the outcome to `None` to ignore the job.
fn release_inner(&self, start: Instant, outcome: Option<Outcome>) {
tracing::info!("outcome is {:?}", outcome);
if outcome.is_none() {
tracing::warn!("outcome is {:?}", outcome);
} else {
tracing::debug!("outcome is {:?}", outcome);
}
if self.config.initial_limit == 0 {
return;
}

View File

@@ -31,26 +31,32 @@ impl LimitAlgorithm for Aimd {
if utilisation > self.utilisation {
let limit = old_limit + self.inc;
let increased_limit = limit.clamp(self.min, self.max);
if increased_limit > old_limit {
tracing::info!(increased_limit, "limit increased");
let new_limit = limit.clamp(self.min, self.max);
if new_limit > old_limit {
tracing::info!(old_limit, new_limit, "limit increased");
} else {
tracing::debug!(old_limit, new_limit, "limit clamped at max");
}
increased_limit
new_limit
} else {
old_limit
}
}
Outcome::Overload => {
let limit = old_limit as f32 * self.dec;
let new_limit = old_limit as f32 * self.dec;
// Floor instead of round, so the limit reduces even with small numbers.
// E.g. round(2 * 0.9) = 2, but floor(2 * 0.9) = 1
let limit = limit.floor() as usize;
let new_limit = new_limit.floor() as usize;
let limit = limit.clamp(self.min, self.max);
tracing::info!(limit, "limit decreased");
limit
let new_limit = new_limit.clamp(self.min, self.max);
if new_limit < old_limit {
tracing::info!(old_limit, new_limit, "limit decreased");
} else {
tracing::debug!(old_limit, new_limit, "limit clamped at min");
}
new_limit
}
}
}

View File

@@ -121,6 +121,7 @@ impl RedisPublisherClient {
cancel_key_data: CancelKeyData,
session_id: Uuid,
) -> anyhow::Result<()> {
// TODO: review redundant error duplication logs.
if !self.limiter.check() {
tracing::info!("Rate limit exceeded. Skipping cancellation message");
return Err(anyhow::anyhow!("Rate limit exceeded"));
@@ -146,7 +147,7 @@ impl CancellationPublisherMut for RedisPublisherClient {
tracing::info!("publishing cancellation key to Redis");
match self.try_publish_internal(cancel_key_data, session_id).await {
Ok(()) => {
tracing::info!("cancellation key successfuly published to Redis");
tracing::debug!("cancellation key successfuly published to Redis");
Ok(())
}
Err(e) => {

View File

@@ -12,8 +12,8 @@ use tracing::field::display;
use tracing::{debug, info};
use super::conn_pool::poll_client;
use super::conn_pool_lib::{Client, ConnInfo, GlobalConnPool};
use super::http_conn_pool::{self, poll_http2_client, Send};
use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
use super::http_conn_pool::{self, poll_http2_client, HttpConnPool, Send};
use super::local_conn_pool::{self, LocalConnPool, EXT_NAME, EXT_SCHEMA, EXT_VERSION};
use crate::auth::backend::local::StaticAuthRules;
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
@@ -36,9 +36,10 @@ use crate::rate_limiter::EndpointRateLimiter;
use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};
pub(crate) struct PoolingBackend {
pub(crate) http_conn_pool: Arc<super::http_conn_pool::GlobalConnPool<Send>>,
pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
pub(crate) local_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
pub(crate) pool:
Arc<GlobalConnPool<tokio_postgres::Client, EndpointConnPool<tokio_postgres::Client>>>,
pub(crate) config: &'static ProxyConfig,
pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
@@ -167,10 +168,10 @@ impl PoolingBackend {
force_new: bool,
) -> Result<Client<tokio_postgres::Client>, HttpConnError> {
let maybe_client = if force_new {
info!("pool: pool is disabled");
debug!("pool: pool is disabled");
None
} else {
info!("pool: looking for an existing connection");
debug!("pool: looking for an existing connection");
self.pool.get(ctx, &conn_info)?
};
@@ -204,14 +205,14 @@ impl PoolingBackend {
ctx: &RequestContext,
conn_info: ConnInfo,
) -> Result<http_conn_pool::Client<Send>, HttpConnError> {
info!("pool: looking for an existing connection");
debug!("pool: looking for an existing connection");
if let Ok(Some(client)) = self.http_conn_pool.get(ctx, &conn_info) {
return Ok(client);
}
let conn_id = uuid::Uuid::new_v4();
tracing::Span::current().record("conn_id", display(conn_id));
info!(%conn_id, "pool: opening a new connection '{conn_info}'");
debug!(%conn_id, "pool: opening a new connection '{conn_info}'");
let backend = self.auth_backend.as_ref().map(|()| ComputeCredentials {
info: ComputeUserInfo {
user: conn_info.user_info.user.clone(),
@@ -474,7 +475,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError {
}
struct TokioMechanism {
pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
pool: Arc<GlobalConnPool<tokio_postgres::Client, EndpointConnPool<tokio_postgres::Client>>>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,
@@ -524,7 +525,7 @@ impl ConnectMechanism for TokioMechanism {
}
struct HyperMechanism {
pool: Arc<http_conn_pool::GlobalConnPool<Send>>,
pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,

View File

@@ -19,7 +19,8 @@ use {
};
use super::conn_pool_lib::{
Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, GlobalConnPool,
Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, EndpointConnPool,
GlobalConnPool,
};
use crate::context::RequestContext;
use crate::control_plane::messages::MetricsAuxInfo;
@@ -52,7 +53,7 @@ impl fmt::Display for ConnInfo {
}
pub(crate) fn poll_client<C: ClientInnerExt>(
global_pool: Arc<GlobalConnPool<C>>,
global_pool: Arc<GlobalConnPool<C, EndpointConnPool<C>>>,
ctx: &RequestContext,
conn_info: ConnInfo,
client: C,
@@ -167,6 +168,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
Client::new(inner, conn_info, pool_clone)
}
#[derive(Clone)]
pub(crate) struct ClientDataRemote {
session: tokio::sync::watch::Sender<uuid::Uuid>,
cancel: CancellationToken,
@@ -243,7 +245,7 @@ mod tests {
},
cancel_set: CancelSet::new(0),
client_conn_threshold: u64::MAX,
max_request_size_bytes: u64::MAX,
max_request_size_bytes: usize::MAX,
max_response_size_bytes: usize::MAX,
}));
let pool = GlobalConnPool::new(config);

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Weak};
@@ -43,13 +44,14 @@ impl ConnInfo {
}
}
#[derive(Clone)]
pub(crate) enum ClientDataEnum {
Remote(ClientDataRemote),
Local(ClientDataLocal),
#[allow(dead_code)]
Http(ClientDataHttp),
}
#[derive(Clone)]
pub(crate) struct ClientInnerCommon<C: ClientInnerExt> {
pub(crate) inner: C,
pub(crate) aux: MetricsAuxInfo,
@@ -91,6 +93,7 @@ pub(crate) struct ConnPoolEntry<C: ClientInnerExt> {
pub(crate) struct EndpointConnPool<C: ClientInnerExt> {
pools: HashMap<(DbName, RoleName), DbUserConnPool<C>>,
total_conns: usize,
/// max # connections per endpoint
max_conns: usize,
_guard: HttpEndpointPoolsGuard<'static>,
global_connections_count: Arc<AtomicUsize>,
@@ -232,7 +235,7 @@ impl<C: ClientInnerExt> EndpointConnPool<C> {
// do logging outside of the mutex
if returned {
info!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
debug!(%conn_id, "{pool_name}: returning connection '{conn_info}' back to the pool, total_conns={total_conns}, for this (db, user)={per_db_size}");
} else {
info!(%conn_id, "{pool_name}: throwing away connection '{conn_info}' because pool is full, total_conns={total_conns}");
}
@@ -317,24 +320,49 @@ impl<C: ClientInnerExt> DbUserConn<C> for DbUserConnPool<C> {
}
}
pub(crate) struct GlobalConnPool<C: ClientInnerExt> {
pub(crate) trait EndpointConnPoolExt<C: ClientInnerExt> {
fn clear_closed(&mut self) -> usize;
fn total_conns(&self) -> usize;
}
impl<C: ClientInnerExt> EndpointConnPoolExt<C> for EndpointConnPool<C> {
fn clear_closed(&mut self) -> usize {
let mut clients_removed: usize = 0;
for db_pool in self.pools.values_mut() {
clients_removed += db_pool.clear_closed_clients(&mut self.total_conns);
}
clients_removed
}
fn total_conns(&self) -> usize {
self.total_conns
}
}
pub(crate) struct GlobalConnPool<C, P>
where
C: ClientInnerExt,
P: EndpointConnPoolExt<C>,
{
// endpoint -> per-endpoint connection pool
//
// That should be a fairly conteded map, so return reference to the per-endpoint
// pool as early as possible and release the lock.
global_pool: DashMap<EndpointCacheKey, Arc<RwLock<EndpointConnPool<C>>>>,
pub(crate) global_pool: DashMap<EndpointCacheKey, Arc<RwLock<P>>>,
/// Number of endpoint-connection pools
///
/// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each.
/// That seems like far too much effort, so we're using a relaxed increment counter instead.
/// It's only used for diagnostics.
global_pool_size: AtomicUsize,
pub(crate) global_pool_size: AtomicUsize,
/// Total number of connections in the pool
global_connections_count: Arc<AtomicUsize>,
pub(crate) global_connections_count: Arc<AtomicUsize>,
config: &'static crate::config::HttpConfig,
pub(crate) config: &'static crate::config::HttpConfig,
_marker: PhantomData<C>,
}
#[derive(Debug, Clone, Copy)]
@@ -357,7 +385,11 @@ pub struct GlobalConnPoolOptions {
pub max_total_conns: usize,
}
impl<C: ClientInnerExt> GlobalConnPool<C> {
impl<C, P> GlobalConnPool<C, P>
where
C: ClientInnerExt,
P: EndpointConnPoolExt<C>,
{
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
let shards = config.pool_options.pool_shards;
Arc::new(Self {
@@ -365,6 +397,7 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
global_pool_size: AtomicUsize::new(0),
config,
global_connections_count: Arc::new(AtomicUsize::new(0)),
_marker: PhantomData,
})
}
@@ -378,60 +411,6 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
self.config.pool_options.idle_timeout
}
pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestContext,
conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> {
let mut client: Option<ClientInnerCommon<C>> = None;
let Some(endpoint) = conn_info.endpoint_cache_key() else {
return Ok(None);
};
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
if let Some(entry) = endpoint_pool
.write()
.get_conn_entry(conn_info.db_and_user())
{
client = Some(entry.conn);
}
let endpoint_pool = Arc::downgrade(&endpoint_pool);
// ok return cached connection if found and establish a new one otherwise
if let Some(mut client) = client {
if client.inner.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
}
tracing::Span::current()
.record("conn_id", tracing::field::display(client.get_conn_id()));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
info!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
match client.get_data() {
ClientDataEnum::Local(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Remote(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Http(_) => (),
}
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
}
Ok(None)
}
pub(crate) fn shutdown(&self) {
// drops all strong references to endpoint-pools
self.global_pool.clear();
@@ -464,17 +443,10 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
// if the current endpoint pool is unique (no other strong or weak references)
// then it is currently not in use by any connections.
if let Some(pool) = Arc::get_mut(x.get_mut()) {
let EndpointConnPool {
pools, total_conns, ..
} = pool.get_mut();
let endpoints = pool.get_mut();
clients_removed = endpoints.clear_closed();
// ensure that closed clients are removed
for db_pool in pools.values_mut() {
clients_removed += db_pool.clear_closed_clients(total_conns);
}
// we only remove this pool if it has no active connections
if *total_conns == 0 {
if endpoints.total_conns() == 0 {
info!("pool: discarding pool for endpoint {endpoint}");
return false;
}
@@ -510,6 +482,62 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
info!("pool: performed global pool gc. size now {global_pool_size}");
}
}
}
impl<C: ClientInnerExt> GlobalConnPool<C, EndpointConnPool<C>> {
pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestContext,
conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> {
let mut client: Option<ClientInnerCommon<C>> = None;
let Some(endpoint) = conn_info.endpoint_cache_key() else {
return Ok(None);
};
let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
if let Some(entry) = endpoint_pool
.write()
.get_conn_entry(conn_info.db_and_user())
{
client = Some(entry.conn);
}
let endpoint_pool = Arc::downgrade(&endpoint_pool);
// ok return cached connection if found and establish a new one otherwise
if let Some(mut client) = client {
if client.inner.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
}
tracing::Span::current()
.record("conn_id", tracing::field::display(client.get_conn_id()));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
debug!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
match client.get_data() {
ClientDataEnum::Local(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Remote(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Http(_) => (),
}
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
}
Ok(None)
}
pub(crate) fn get_or_create_endpoint_pool(
self: &Arc<Self>,
@@ -556,7 +584,6 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
pool
}
}
pub(crate) struct Client<C: ClientInnerExt> {
span: Span,
inner: Option<ClientInnerCommon<C>>,

View File

@@ -2,16 +2,17 @@ use std::collections::VecDeque;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Weak};
use dashmap::DashMap;
use hyper::client::conn::http2;
use hyper_util::rt::{TokioExecutor, TokioIo};
use parking_lot::RwLock;
use rand::Rng;
use tokio::net::TcpStream;
use tracing::{debug, error, info, info_span, Instrument};
use super::backend::HttpConnError;
use super::conn_pool_lib::{ClientInnerExt, ConnInfo};
use super::conn_pool_lib::{
ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, ConnPoolEntry,
EndpointConnPoolExt, GlobalConnPool,
};
use crate::context::RequestContext;
use crate::control_plane::messages::{ColdStartInfo, MetricsAuxInfo};
use crate::metrics::{HttpEndpointPoolsGuard, Metrics};
@@ -23,17 +24,11 @@ pub(crate) type Connect =
http2::Connection<TokioIo<TcpStream>, hyper::body::Incoming, TokioExecutor>;
#[derive(Clone)]
pub(crate) struct ConnPoolEntry<C: ClientInnerExt + Clone> {
conn: C,
conn_id: uuid::Uuid,
aux: MetricsAuxInfo,
}
pub(crate) struct ClientDataHttp();
// Per-endpoint connection pool
// Number of open connections is limited by the `max_conns_per_endpoint`.
pub(crate) struct EndpointConnPool<C: ClientInnerExt + Clone> {
pub(crate) struct HttpConnPool<C: ClientInnerExt + Clone> {
// TODO(conrad):
// either we should open more connections depending on stream count
// (not exposed by hyper, need our own counter)
@@ -48,14 +43,19 @@ pub(crate) struct EndpointConnPool<C: ClientInnerExt + Clone> {
global_connections_count: Arc<AtomicUsize>,
}
impl<C: ClientInnerExt + Clone> EndpointConnPool<C> {
impl<C: ClientInnerExt + Clone> HttpConnPool<C> {
fn get_conn_entry(&mut self) -> Option<ConnPoolEntry<C>> {
let Self { conns, .. } = self;
loop {
let conn = conns.pop_front()?;
if !conn.conn.is_closed() {
conns.push_back(conn.clone());
if !conn.conn.inner.is_closed() {
let new_conn = ConnPoolEntry {
conn: conn.conn.clone(),
_last_access: std::time::Instant::now(),
};
conns.push_back(new_conn);
return Some(conn);
}
}
@@ -69,7 +69,7 @@ impl<C: ClientInnerExt + Clone> EndpointConnPool<C> {
} = self;
let old_len = conns.len();
conns.retain(|conn| conn.conn_id != conn_id);
conns.retain(|entry| entry.conn.conn_id != conn_id);
let new_len = conns.len();
let removed = old_len - new_len;
if removed > 0 {
@@ -84,7 +84,22 @@ impl<C: ClientInnerExt + Clone> EndpointConnPool<C> {
}
}
impl<C: ClientInnerExt + Clone> Drop for EndpointConnPool<C> {
impl<C: ClientInnerExt + Clone> EndpointConnPoolExt<C> for HttpConnPool<C> {
fn clear_closed(&mut self) -> usize {
let Self { conns, .. } = self;
let old_len = conns.len();
conns.retain(|entry| !entry.conn.inner.is_closed());
let new_len = conns.len();
old_len - new_len
}
fn total_conns(&self) -> usize {
self.conns.len()
}
}
impl<C: ClientInnerExt + Clone> Drop for HttpConnPool<C> {
fn drop(&mut self) {
if !self.conns.is_empty() {
self.global_connections_count
@@ -98,117 +113,7 @@ impl<C: ClientInnerExt + Clone> Drop for EndpointConnPool<C> {
}
}
pub(crate) struct GlobalConnPool<C: ClientInnerExt + Clone> {
// endpoint -> per-endpoint connection pool
//
// That should be a fairly conteded map, so return reference to the per-endpoint
// pool as early as possible and release the lock.
global_pool: DashMap<EndpointCacheKey, Arc<RwLock<EndpointConnPool<C>>>>,
/// Number of endpoint-connection pools
///
/// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each.
/// That seems like far too much effort, so we're using a relaxed increment counter instead.
/// It's only used for diagnostics.
global_pool_size: AtomicUsize,
/// Total number of connections in the pool
global_connections_count: Arc<AtomicUsize>,
config: &'static crate::config::HttpConfig,
}
impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
let shards = config.pool_options.pool_shards;
Arc::new(Self {
global_pool: DashMap::with_shard_amount(shards),
global_pool_size: AtomicUsize::new(0),
config,
global_connections_count: Arc::new(AtomicUsize::new(0)),
})
}
pub(crate) fn shutdown(&self) {
// drops all strong references to endpoint-pools
self.global_pool.clear();
}
pub(crate) async fn gc_worker(&self, mut rng: impl Rng) {
let epoch = self.config.pool_options.gc_epoch;
let mut interval = tokio::time::interval(epoch / (self.global_pool.shards().len()) as u32);
loop {
interval.tick().await;
let shard = rng.gen_range(0..self.global_pool.shards().len());
self.gc(shard);
}
}
fn gc(&self, shard: usize) {
debug!(shard, "pool: performing epoch reclamation");
// acquire a random shard lock
let mut shard = self.global_pool.shards()[shard].write();
let timer = Metrics::get()
.proxy
.http_pool_reclaimation_lag_seconds
.start_timer();
let current_len = shard.len();
let mut clients_removed = 0;
shard.retain(|endpoint, x| {
// if the current endpoint pool is unique (no other strong or weak references)
// then it is currently not in use by any connections.
if let Some(pool) = Arc::get_mut(x.get_mut()) {
let EndpointConnPool { conns, .. } = pool.get_mut();
let old_len = conns.len();
conns.retain(|conn| !conn.conn.is_closed());
let new_len = conns.len();
let removed = old_len - new_len;
clients_removed += removed;
// we only remove this pool if it has no active connections
if conns.is_empty() {
info!("pool: discarding pool for endpoint {endpoint}");
return false;
}
}
true
});
let new_len = shard.len();
drop(shard);
timer.observe();
// Do logging outside of the lock.
if clients_removed > 0 {
let size = self
.global_connections_count
.fetch_sub(clients_removed, atomic::Ordering::Relaxed)
- clients_removed;
Metrics::get()
.proxy
.http_pool_opened_connections
.get_metric()
.dec_by(clients_removed as i64);
info!("pool: performed global pool gc. removed {clients_removed} clients, total number of clients in pool is {size}");
}
let removed = current_len - new_len;
if removed > 0 {
let global_pool_size = self
.global_pool_size
.fetch_sub(removed, atomic::Ordering::Relaxed)
- removed;
info!("pool: performed global pool gc. size now {global_pool_size}");
}
}
impl<C: ClientInnerExt + Clone> GlobalConnPool<C, HttpConnPool<C>> {
#[expect(unused_results)]
pub(crate) fn get(
self: &Arc<Self>,
@@ -226,27 +131,28 @@ impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
return result;
};
tracing::Span::current().record("conn_id", tracing::field::display(client.conn_id));
info!(
tracing::Span::current().record("conn_id", tracing::field::display(client.conn.conn_id));
debug!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);
ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
Ok(Some(Client::new(client.conn, client.aux)))
Ok(Some(Client::new(client.conn.clone())))
}
fn get_or_create_endpoint_pool(
self: &Arc<Self>,
endpoint: &EndpointCacheKey,
) -> Arc<RwLock<EndpointConnPool<C>>> {
) -> Arc<RwLock<HttpConnPool<C>>> {
// fast path
if let Some(pool) = self.global_pool.get(endpoint) {
return pool.clone();
}
// slow path
let new_pool = Arc::new(RwLock::new(EndpointConnPool {
let new_pool = Arc::new(RwLock::new(HttpConnPool {
conns: VecDeque::new(),
_guard: Metrics::get().proxy.http_endpoint_pools.guard(),
global_connections_count: self.global_connections_count.clone(),
@@ -279,7 +185,7 @@ impl<C: ClientInnerExt + Clone> GlobalConnPool<C> {
}
pub(crate) fn poll_http2_client(
global_pool: Arc<GlobalConnPool<Send>>,
global_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
ctx: &RequestContext,
conn_info: &ConnInfo,
client: Send,
@@ -299,11 +205,15 @@ pub(crate) fn poll_http2_client(
let pool = match conn_info.endpoint_cache_key() {
Some(endpoint) => {
let pool = global_pool.get_or_create_endpoint_pool(&endpoint);
pool.write().conns.push_back(ConnPoolEntry {
conn: client.clone(),
conn_id,
let client = ClientInnerCommon {
inner: client.clone(),
aux: aux.clone(),
conn_id,
data: ClientDataEnum::Http(ClientDataHttp()),
};
pool.write().conns.push_back(ConnPoolEntry {
conn: client,
_last_access: std::time::Instant::now(),
});
Metrics::get()
.proxy
@@ -335,23 +245,30 @@ pub(crate) fn poll_http2_client(
.instrument(span),
);
Client::new(client, aux)
let client = ClientInnerCommon {
inner: client,
aux,
conn_id,
data: ClientDataEnum::Http(ClientDataHttp()),
};
Client::new(client)
}
pub(crate) struct Client<C: ClientInnerExt + Clone> {
pub(crate) inner: C,
aux: MetricsAuxInfo,
pub(crate) inner: ClientInnerCommon<C>,
}
impl<C: ClientInnerExt + Clone> Client<C> {
pub(self) fn new(inner: C, aux: MetricsAuxInfo) -> Self {
Self { inner, aux }
pub(self) fn new(inner: ClientInnerCommon<C>) -> Self {
Self { inner }
}
pub(crate) fn metrics(&self) -> Arc<MetricCounter> {
let aux = &self.inner.aux;
USAGE_METRICS.register(Ids {
endpoint_id: self.aux.endpoint_id,
branch_id: self.aux.branch_id,
endpoint_id: aux.endpoint_id,
branch_id: aux.branch_id,
})
}
}

View File

@@ -29,7 +29,7 @@ use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::types::ToSql;
use tokio_postgres::{AsyncMessage, Socket};
use tokio_util::sync::CancellationToken;
use tracing::{error, info, info_span, warn, Instrument};
use tracing::{debug, error, info, info_span, warn, Instrument};
use super::backend::HttpConnError;
use super::conn_pool_lib::{
@@ -44,6 +44,7 @@ pub(crate) const EXT_NAME: &str = "pg_session_jwt";
pub(crate) const EXT_VERSION: &str = "0.1.2";
pub(crate) const EXT_SCHEMA: &str = "auth";
#[derive(Clone)]
pub(crate) struct ClientDataLocal {
session: tokio::sync::watch::Sender<uuid::Uuid>,
cancel: CancellationToken,
@@ -110,7 +111,7 @@ impl<C: ClientInnerExt> LocalConnPool<C> {
"pid",
tracing::field::display(client.inner.get_process_id()),
);
info!(
debug!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"local_pool: reusing connection '{conn_info}'"
);

View File

@@ -88,7 +88,7 @@ pub async fn task_main(
}
});
let http_conn_pool = http_conn_pool::GlobalConnPool::new(&config.http_config);
let http_conn_pool = conn_pool_lib::GlobalConnPool::new(&config.http_config);
{
let http_conn_pool = Arc::clone(&http_conn_pool);
tokio::spawn(async move {

View File

@@ -8,7 +8,7 @@ use http::header::AUTHORIZATION;
use http::Method;
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use hyper::body::{Body, Incoming};
use hyper::body::Incoming;
use hyper::http::{HeaderName, HeaderValue};
use hyper::{header, HeaderMap, Request, Response, StatusCode};
use pq_proto::StartupMessageParamsBuilder;
@@ -18,7 +18,7 @@ use tokio::time;
use tokio_postgres::error::{DbError, ErrorPosition, SqlState};
use tokio_postgres::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
use tracing::{debug, error, info};
use typed_json::json;
use url::Url;
use urlencoding;
@@ -36,6 +36,7 @@ use crate::auth::{endpoint_sni, ComputeUserInfoParseError};
use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig};
use crate::context::RequestContext;
use crate::error::{ErrorKind, ReportableError, UserFacingError};
use crate::http::{read_body_with_limit, ReadBodyError};
use crate::metrics::{HttpDirection, Metrics};
use crate::proxy::{run_until_cancelled, NeonOptions};
use crate::serverless::backend::HttpConnError;
@@ -47,6 +48,7 @@ use crate::usage_metrics::{MetricCounter, MetricCounterRecorder};
struct QueryData {
query: String,
#[serde(deserialize_with = "bytes_to_pg_text")]
#[serde(default)]
params: Vec<Option<String>>,
#[serde(default)]
array_mode: Option<bool>,
@@ -357,8 +359,6 @@ pub(crate) enum SqlOverHttpError {
ConnectCompute(#[from] HttpConnError),
#[error("{0}")]
ConnInfo(#[from] ConnInfoError),
#[error("request is too large (max is {0} bytes)")]
RequestTooLarge(u64),
#[error("response is too large (max is {0} bytes)")]
ResponseTooLarge(usize),
#[error("invalid isolation level")]
@@ -377,7 +377,6 @@ impl ReportableError for SqlOverHttpError {
SqlOverHttpError::ReadPayload(e) => e.get_error_kind(),
SqlOverHttpError::ConnectCompute(e) => e.get_error_kind(),
SqlOverHttpError::ConnInfo(e) => e.get_error_kind(),
SqlOverHttpError::RequestTooLarge(_) => ErrorKind::User,
SqlOverHttpError::ResponseTooLarge(_) => ErrorKind::User,
SqlOverHttpError::InvalidIsolationLevel => ErrorKind::User,
SqlOverHttpError::Postgres(p) => p.get_error_kind(),
@@ -393,7 +392,6 @@ impl UserFacingError for SqlOverHttpError {
SqlOverHttpError::ReadPayload(p) => p.to_string(),
SqlOverHttpError::ConnectCompute(c) => c.to_string_client(),
SqlOverHttpError::ConnInfo(c) => c.to_string_client(),
SqlOverHttpError::RequestTooLarge(_) => self.to_string(),
SqlOverHttpError::ResponseTooLarge(_) => self.to_string(),
SqlOverHttpError::InvalidIsolationLevel => self.to_string(),
SqlOverHttpError::Postgres(p) => p.to_string(),
@@ -406,13 +404,12 @@ impl UserFacingError for SqlOverHttpError {
impl HttpCodeError for SqlOverHttpError {
fn get_http_status_code(&self) -> StatusCode {
match self {
SqlOverHttpError::ReadPayload(_) => StatusCode::BAD_REQUEST,
SqlOverHttpError::ReadPayload(e) => e.get_http_status_code(),
SqlOverHttpError::ConnectCompute(h) => match h.get_error_kind() {
ErrorKind::User => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
},
SqlOverHttpError::ConnInfo(_) => StatusCode::BAD_REQUEST,
SqlOverHttpError::RequestTooLarge(_) => StatusCode::PAYLOAD_TOO_LARGE,
SqlOverHttpError::ResponseTooLarge(_) => StatusCode::INSUFFICIENT_STORAGE,
SqlOverHttpError::InvalidIsolationLevel => StatusCode::BAD_REQUEST,
SqlOverHttpError::Postgres(_) => StatusCode::BAD_REQUEST,
@@ -426,19 +423,41 @@ impl HttpCodeError for SqlOverHttpError {
pub(crate) enum ReadPayloadError {
#[error("could not read the HTTP request body: {0}")]
Read(#[from] hyper::Error),
#[error("request is too large (max is {limit} bytes)")]
BodyTooLarge { limit: usize },
#[error("could not parse the HTTP request body: {0}")]
Parse(#[from] serde_json::Error),
}
impl From<ReadBodyError<hyper::Error>> for ReadPayloadError {
fn from(value: ReadBodyError<hyper::Error>) -> Self {
match value {
ReadBodyError::BodyTooLarge { limit } => Self::BodyTooLarge { limit },
ReadBodyError::Read(e) => Self::Read(e),
}
}
}
impl ReportableError for ReadPayloadError {
fn get_error_kind(&self) -> ErrorKind {
match self {
ReadPayloadError::Read(_) => ErrorKind::ClientDisconnect,
ReadPayloadError::BodyTooLarge { .. } => ErrorKind::User,
ReadPayloadError::Parse(_) => ErrorKind::User,
}
}
}
impl HttpCodeError for ReadPayloadError {
fn get_http_status_code(&self) -> StatusCode {
match self {
ReadPayloadError::Read(_) => StatusCode::BAD_REQUEST,
ReadPayloadError::BodyTooLarge { .. } => StatusCode::PAYLOAD_TOO_LARGE,
ReadPayloadError::Parse(_) => StatusCode::BAD_REQUEST,
}
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum SqlOverHttpCancel {
#[error("query was cancelled")]
@@ -580,28 +599,20 @@ async fn handle_db_inner(
let parsed_headers = HttpHeaders::try_parse(headers)?;
let request_content_length = match request.body().size_hint().upper() {
Some(v) => v,
None => config.http_config.max_request_size_bytes + 1,
};
info!(request_content_length, "request size in bytes");
Metrics::get()
.proxy
.http_conn_content_length_bytes
.observe(HttpDirection::Request, request_content_length as f64);
// we don't have a streaming request support yet so this is to prevent OOM
// from a malicious user sending an extremely large request body
if request_content_length > config.http_config.max_request_size_bytes {
return Err(SqlOverHttpError::RequestTooLarge(
config.http_config.max_request_size_bytes,
));
}
let fetch_and_process_request = Box::pin(
async {
let body = request.into_body().collect().await?.to_bytes();
info!(length = body.len(), "request payload read");
let body = read_body_with_limit(
request.into_body(),
config.http_config.max_request_size_bytes,
)
.await?;
Metrics::get()
.proxy
.http_conn_content_length_bytes
.observe(HttpDirection::Request, body.len() as f64);
debug!(length = body.len(), "request payload read");
let payload: Payload = serde_json::from_slice(&body)?;
Ok::<Payload, ReadPayloadError>(payload) // Adjust error type accordingly
}
@@ -768,6 +779,7 @@ async fn handle_auth_broker_inner(
let _metrics = client.metrics();
Ok(client
.inner
.inner
.send_request(req)
.await
@@ -1095,3 +1107,63 @@ impl Discard<'_> {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_payload() {
let payload = "{\"query\":\"SELECT * FROM users WHERE name = ?\",\"params\":[\"test\"],\"arrayMode\":true}";
let deserialized_payload: Payload = serde_json::from_str(payload).unwrap();
match deserialized_payload {
Payload::Single(QueryData {
query,
params,
array_mode,
}) => {
assert_eq!(query, "SELECT * FROM users WHERE name = ?");
assert_eq!(params, vec![Some(String::from("test"))]);
assert!(array_mode.unwrap());
}
Payload::Batch(_) => {
panic!("deserialization failed: case with single query, one param, and array mode")
}
}
let payload = "{\"queries\":[{\"query\":\"SELECT * FROM users0 WHERE name = ?\",\"params\":[\"test0\"], \"arrayMode\":false},{\"query\":\"SELECT * FROM users1 WHERE name = ?\",\"params\":[\"test1\"],\"arrayMode\":true}]}";
let deserialized_payload: Payload = serde_json::from_str(payload).unwrap();
match deserialized_payload {
Payload::Batch(BatchQueryData { queries }) => {
assert_eq!(queries.len(), 2);
for (i, query) in queries.into_iter().enumerate() {
assert_eq!(
query.query,
format!("SELECT * FROM users{i} WHERE name = ?")
);
assert_eq!(query.params, vec![Some(format!("test{i}"))]);
assert_eq!(query.array_mode.unwrap(), i > 0);
}
}
Payload::Single(_) => panic!("deserialization failed: case with multiple queries"),
}
let payload = "{\"query\":\"SELECT 1\"}";
let deserialized_payload: Payload = serde_json::from_str(payload).unwrap();
match deserialized_payload {
Payload::Single(QueryData {
query,
params,
array_mode,
}) => {
assert_eq!(query, "SELECT 1");
assert_eq!(params, vec![]);
assert!(array_mode.is_none());
}
Payload::Batch(_) => panic!("deserialization failed: case with only one query"),
}
}
}

View File

@@ -21,7 +21,7 @@ use utils::{backoff, id::TenantId};
use crate::{
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
init_remote, list_objects_with_retries,
metadata_stream::{stream_tenant_timelines, stream_tenants},
metadata_stream::{stream_tenant_timelines, stream_tenants_maybe_prefix},
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth, MAX_RETRIES,
};
@@ -118,9 +118,17 @@ pub async fn find_garbage(
console_config: ConsoleConfig,
depth: TraversingDepth,
node_kind: NodeKind,
tenant_id_prefix: Option<String>,
output_path: String,
) -> anyhow::Result<()> {
let garbage = find_garbage_inner(bucket_config, console_config, depth, node_kind).await?;
let garbage = find_garbage_inner(
bucket_config,
console_config,
depth,
node_kind,
tenant_id_prefix,
)
.await?;
let serialized = serde_json::to_vec_pretty(&garbage)?;
tokio::fs::write(&output_path, &serialized).await?;
@@ -152,6 +160,7 @@ async fn find_garbage_inner(
console_config: ConsoleConfig,
depth: TraversingDepth,
node_kind: NodeKind,
tenant_id_prefix: Option<String>,
) -> anyhow::Result<GarbageList> {
// Construct clients for S3 and for Console API
let (remote_client, target) = init_remote(bucket_config.clone(), node_kind).await?;
@@ -178,7 +187,7 @@ async fn find_garbage_inner(
// Enumerate Tenants in S3, and check if each one exists in Console
tracing::info!("Finding all tenants in {}...", bucket_config.desc_str());
let tenants = stream_tenants(&remote_client, &target);
let tenants = stream_tenants_maybe_prefix(&remote_client, &target, tenant_id_prefix);
let tenants_checked = tenants.map_ok(|t| {
let api_client = cloud_admin_api_client.clone();
let console_cache = console_cache.clone();

View File

@@ -54,6 +54,8 @@ enum Command {
node_kind: NodeKind,
#[arg(short, long, default_value_t=TraversingDepth::Tenant)]
depth: TraversingDepth,
#[arg(short, long, default_value=None)]
tenant_id_prefix: Option<String>,
#[arg(short, long, default_value_t = String::from("garbage.json"))]
output_path: String,
},
@@ -209,10 +211,19 @@ async fn main() -> anyhow::Result<()> {
Command::FindGarbage {
node_kind,
depth,
tenant_id_prefix,
output_path,
} => {
let console_config = ConsoleConfig::from_env()?;
find_garbage(bucket_config, console_config, depth, node_kind, output_path).await
find_garbage(
bucket_config,
console_config,
depth,
node_kind,
tenant_id_prefix,
output_path,
)
.await
}
Command::PurgeGarbage {
input_path,

View File

@@ -17,9 +17,20 @@ use utils::id::{TenantId, TimelineId};
pub fn stream_tenants<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget,
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
stream_tenants_maybe_prefix(remote_client, target, None)
}
/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
pub fn stream_tenants_maybe_prefix<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget,
tenant_id_prefix: Option<String>,
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
try_stream! {
let tenants_target = target.tenants_root();
let mut tenants_target = target.tenants_root();
if let Some(tenant_id_prefix) = tenant_id_prefix {
tenants_target.prefix_in_bucket += &tenant_id_prefix;
}
let mut tenants_stream =
std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target));
while let Some(chunk) = tenants_stream.next().await {

View File

@@ -77,14 +77,16 @@ class MockS3Server:
class LocalFsStorage:
root: Path
def tenant_path(self, tenant_id: TenantId) -> Path:
def tenant_path(self, tenant_id: Union[TenantId, TenantShardId]) -> Path:
return self.root / "tenants" / str(tenant_id)
def timeline_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
def timeline_path(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
) -> Path:
return self.tenant_path(tenant_id) / "timelines" / str(timeline_id)
def timeline_latest_generation(
self, tenant_id: TenantId, timeline_id: TimelineId
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
) -> Optional[int]:
timeline_files = os.listdir(self.timeline_path(tenant_id, timeline_id))
index_parts = [f for f in timeline_files if f.startswith("index_part")]
@@ -102,7 +104,9 @@ class LocalFsStorage:
raise RuntimeError(f"No index_part found for {tenant_id}/{timeline_id}")
return generations[-1]
def index_path(self, tenant_id: TenantId, timeline_id: TimelineId) -> Path:
def index_path(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
) -> Path:
latest_gen = self.timeline_latest_generation(tenant_id, timeline_id)
if latest_gen is None:
filename = TIMELINE_INDEX_PART_FILE_NAME
@@ -126,7 +130,9 @@ class LocalFsStorage:
filename = f"{local_name}-{generation:08x}"
return self.timeline_path(tenant_id, timeline_id) / filename
def index_content(self, tenant_id: TenantId, timeline_id: TimelineId) -> Any:
def index_content(
self, tenant_id: Union[TenantId, TenantShardId], timeline_id: TimelineId
) -> Any:
with self.index_path(tenant_id, timeline_id).open("r") as f:
return json.load(f)

View File

@@ -146,7 +146,7 @@ def test_getpage_merge_smoke(
).value,
compute_getpage_count=compute_getpage_count,
pageserver_cpu_seconds_total=pageserver_metrics.query_one(
"process_cpu_seconds_total"
"libmetrics_process_cpu_seconds_highres"
).value,
)
@@ -176,11 +176,10 @@ def test_getpage_merge_smoke(
#
# Sanity-checks on the collected data
#
def close_enough(a, b):
return (a / b > 0.99 and a / b < 1.01) and (b / a > 0.99 and b / a < 1.01)
# assert that getpage counts roughly match between compute and ps
assert close_enough(metrics.pageserver_getpage_count, metrics.compute_getpage_count)
assert metrics.pageserver_getpage_count == pytest.approx(
metrics.compute_getpage_count, rel=0.01
)
#
# Record the results

View File

@@ -365,6 +365,19 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
workload.validate(pageserver_a.id)
workload.validate(pageserver_b.id)
# Force compaction on destination pageserver
pageserver_b.http_client().timeline_compact(tenant_id, timeline_id, force_l0_compaction=True)
# Destination pageserver is in AttachedMulti, it should have generated deletions but
# not enqueued them yet.
# Check deletion metrics via prometheus - should be 0 since we're in AttachedMulti
assert (
pageserver_b.http_client().get_metric_value(
"pageserver_deletion_queue_submitted_total",
)
== 0
)
# Revert the origin to secondary
log.info("Setting origin to Secondary")
pageserver_a.tenant_location_configure(
@@ -389,6 +402,17 @@ def test_live_migration(neon_env_builder: NeonEnvBuilder):
},
)
# Transition to AttachedSingle should have drained deletions generated by doing a compaction
# while in AttachedMulti.
def blocked_deletions_drained():
submitted = pageserver_b.http_client().get_metric_value(
"pageserver_deletion_queue_submitted_total"
)
assert submitted is not None
assert submitted > 0
wait_until(10, 0.1, blocked_deletions_drained)
workload.churn_rows(64, pageserver_b.id)
workload.validate(pageserver_b.id)
del workload

View File

@@ -110,13 +110,15 @@ def post_checks(env: NeonEnv, test_output_dir: Path, db_name: str, endpoint: End
check_restored_datadir_content(test_output_dir, env, endpoint, ignored_files=ignored_files)
# Ensure that compaction works, on a timeline containing all the diversity that postgres regression tests create.
# Ensure that compaction/GC works, on a timeline containing all the diversity that postgres regression tests create.
# There should have been compactions mid-test as well, this final check is in addition those.
for shard, pageserver in tenant_get_shards(env, env.initial_tenant):
pageserver.http_client().timeline_checkpoint(
shard, env.initial_timeline, force_repartition=True, force_image_layer_creation=True
)
pageserver.http_client().timeline_gc(shard, env.initial_timeline, None)
# Run the main PostgreSQL regression tests, in src/test/regress.
#

View File

@@ -19,7 +19,7 @@ from fixtures.neon_fixtures import (
wait_for_last_flush_lsn,
)
from fixtures.pageserver.utils import assert_prefix_empty, assert_prefix_not_empty
from fixtures.remote_storage import s3_storage
from fixtures.remote_storage import LocalFsStorage, RemoteStorageKind, s3_storage
from fixtures.utils import skip_in_debug_build, wait_until
from fixtures.workload import Workload
from pytest_httpserver import HTTPServer
@@ -515,11 +515,12 @@ def test_sharding_split_smoke(
"""
# We will start with 4 shards and split into 8, then migrate all those
# 8 shards onto separate pageservers
shard_count = 4
split_shard_count = 8
neon_env_builder.num_pageservers = split_shard_count * 2
# Shard count we start with
shard_count = 2
# Shard count we split into
split_shard_count = 4
# We will have 2 shards per pageserver once done (including secondaries)
neon_env_builder.num_pageservers = split_shard_count
# 1MiB stripes: enable getting some meaningful data distribution without
# writing large quantities of data in this test. The stripe size is given
@@ -591,7 +592,7 @@ def test_sharding_split_smoke(
workload.validate()
assert len(pre_split_pageserver_ids) == 4
assert len(pre_split_pageserver_ids) == shard_count
def shards_on_disk(shard_ids):
for pageserver in env.pageservers:
@@ -654,9 +655,9 @@ def test_sharding_split_smoke(
# - shard_count reconciles for the original setup of the tenant
# - shard_count reconciles for detaching the original secondary locations during split
# - split_shard_count reconciles during shard splitting, for setting up secondaries.
# - shard_count of the child shards will need to fail over to their secondaries
# - shard_count of the child shard secondary locations will get moved to emptier nodes
expect_reconciles = shard_count * 2 + split_shard_count + shard_count * 2
# - split_shard_count/2 of the child shards will need to fail over to their secondaries (since we have 8 shards and 4 pageservers, only 4 will move)
expect_reconciles = shard_count * 2 + split_shard_count + split_shard_count / 2
reconcile_ok = env.storage_controller.get_metric_value(
"storage_controller_reconcile_complete_total", filter={"status": "ok"}
)
@@ -720,22 +721,10 @@ def test_sharding_split_smoke(
# dominated by shard count.
log.info(f"total: {total}")
assert total == {
1: 1,
2: 1,
3: 1,
4: 1,
5: 1,
6: 1,
7: 1,
8: 1,
9: 1,
10: 1,
11: 1,
12: 1,
13: 1,
14: 1,
15: 1,
16: 1,
1: 2,
2: 2,
3: 2,
4: 2,
}
# The controller is not required to lay out the attached locations in any particular way, but
@@ -1685,3 +1674,111 @@ def test_top_tenants(neon_env_builder: NeonEnvBuilder):
)
assert len(top["shards"]) == n_tenants - 4
assert set(i["id"] for i in top["shards"]) == set(str(i[0]) for i in tenants[4:])
def test_sharding_gc(
neon_env_builder: NeonEnvBuilder,
):
"""
Exercise GC in a sharded tenant: because only shard 0 holds SLRU content, it acts as
the "leader" for GC, and other shards read its index to learn what LSN they should
GC up to.
"""
shard_count = 4
neon_env_builder.num_pageservers = shard_count
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)
TENANT_CONF = {
# small checkpointing and compaction targets to ensure we generate many upload operations
"checkpoint_distance": 128 * 1024,
"compaction_threshold": 1,
"compaction_target_size": 128 * 1024,
# A short PITR horizon, so that we won't have to sleep too long in the test to wait for it to
# happen.
"pitr_interval": "1s",
# disable background compaction and GC. We invoke it manually when we want it to happen.
"gc_period": "0s",
"compaction_period": "0s",
# Disable automatic creation of image layers, as we will create them explicitly when we want them
"image_creation_threshold": 9999,
"image_layer_creation_check_threshold": 0,
"lsn_lease_length": "0s",
}
env = neon_env_builder.init_start(
initial_tenant_shard_count=shard_count, initial_tenant_conf=TENANT_CONF
)
tenant_id = env.initial_tenant
timeline_id = env.initial_timeline
# Create a branch and write some data
workload = Workload(env, tenant_id, timeline_id)
initial_lsn = Lsn(workload.endpoint().safe_psql("SELECT pg_current_wal_lsn()")[0][0])
log.info(f"Started at LSN: {initial_lsn}")
workload.init()
# Write enough data to generate multiple layers
for _i in range(10):
last_lsn = workload.write_rows(32)
assert last_lsn > initial_lsn
log.info(f"Wrote up to last LSN: {last_lsn}")
# Do full image layer generation. When we subsequently wait for PITR, all historic deltas
# should be GC-able
for shard_number in range(shard_count):
shard = TenantShardId(tenant_id, shard_number, shard_count)
env.get_tenant_pageserver(shard).http_client().timeline_compact(
shard, timeline_id, force_image_layer_creation=True
)
workload.churn_rows(32)
time.sleep(5)
# Invoke GC on a non-zero shard and verify its GC cutoff LSN does not advance
shard_one = TenantShardId(tenant_id, 1, shard_count)
env.get_tenant_pageserver(shard_one).http_client().timeline_gc(
shard_one, timeline_id, gc_horizon=None
)
# Check shard 1's index - GC cutoff LSN should not have advanced
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
shard_1_index = env.pageserver_remote_storage.index_content(
tenant_id=shard_one, timeline_id=timeline_id
)
shard_1_gc_cutoff_lsn = Lsn(shard_1_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
log.info(f"Shard 1 cutoff LSN: {shard_1_gc_cutoff_lsn}")
assert shard_1_gc_cutoff_lsn <= last_lsn
shard_zero = TenantShardId(tenant_id, 0, shard_count)
env.get_tenant_pageserver(shard_zero).http_client().timeline_gc(
shard_zero, timeline_id, gc_horizon=None
)
# TODO: observe that GC LSN of shard 0 has moved forward in remote storage
assert isinstance(env.pageserver_remote_storage, LocalFsStorage)
shard_0_index = env.pageserver_remote_storage.index_content(
tenant_id=shard_zero, timeline_id=timeline_id
)
shard_0_gc_cutoff_lsn = Lsn(shard_0_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
log.info(f"Shard 0 cutoff LSN: {shard_0_gc_cutoff_lsn}")
assert shard_0_gc_cutoff_lsn >= last_lsn
# Invoke GC on all other shards and verify their GC cutoff LSNs
for shard_number in range(1, shard_count):
shard = TenantShardId(tenant_id, shard_number, shard_count)
env.get_tenant_pageserver(shard).http_client().timeline_gc(
shard, timeline_id, gc_horizon=None
)
# Verify GC cutoff LSN advanced to match shard 0
shard_index = env.pageserver_remote_storage.index_content(
tenant_id=shard, timeline_id=timeline_id
)
shard_gc_cutoff_lsn = Lsn(shard_index["metadata_bytes"]["latest_gc_cutoff_lsn"])
log.info(f"Shard {shard_number} cutoff LSN: {shard_gc_cutoff_lsn}")
assert shard_gc_cutoff_lsn == shard_0_gc_cutoff_lsn