diff --git a/Cargo.lock b/Cargo.lock index f11c774016..45a313a72b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2263,11 +2263,11 @@ dependencies = [ [[package]] name = "hashlink" -version = "0.8.2" +version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0761a1b9491c4f2e3d66aa0f62d0fba0af9a0e2852e4d48ea506632a4b56e6aa" +checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" dependencies = [ - "hashbrown 0.13.2", + "hashbrown 0.14.0", ] [[package]] @@ -3952,6 +3952,7 @@ dependencies = [ "pin-project-lite", "postgres-protocol", "rand 0.8.5", + "serde", "thiserror", "tokio", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 8df9ca9988..8952f7627f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,7 +81,7 @@ futures-core = "0.3" futures-util = "0.3" git-version = "0.3" hashbrown = "0.13" -hashlink = "0.8.1" +hashlink = "0.8.4" hdrhistogram = "7.5.2" hex = "0.4" hex-literal = "0.4" diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 993b5725a4..83db8e09ec 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::env; use std::fs; use std::io::BufRead; -use std::os::unix::fs::PermissionsExt; +use std::os::unix::fs::{symlink, PermissionsExt}; use std::path::Path; use std::process::{Command, Stdio}; use std::str::FromStr; @@ -634,6 +634,48 @@ impl ComputeNode { // Update pg_hba.conf received with basebackup. update_pg_hba(pgdata_path)?; + // Place pg_dynshmem under /dev/shm. This allows us to use + // 'dynamic_shared_memory_type = mmap' so that the files are placed in + // /dev/shm, similar to how 'dynamic_shared_memory_type = posix' works. + // + // Why on earth don't we just stick to the 'posix' default, you might + // ask. It turns out that making large allocations with 'posix' doesn't + // work very well with autoscaling. The behavior we want is that: + // + // 1. You can make large DSM allocations, larger than the current RAM + // size of the VM, without errors + // + // 2. If the allocated memory is really used, the VM is scaled up + // automatically to accommodate that + // + // We try to make that possible by having swap in the VM. But with the + // default 'posix' DSM implementation, we fail step 1, even when there's + // plenty of swap available. PostgreSQL uses posix_fallocate() to create + // the shmem segment, which is really just a file in /dev/shm in Linux, + // but posix_fallocate() on tmpfs returns ENOMEM if the size is larger + // than available RAM. + // + // Using 'dynamic_shared_memory_type = mmap' works around that, because + // the Postgres 'mmap' DSM implementation doesn't use + // posix_fallocate(). Instead, it uses repeated calls to write(2) to + // fill the file with zeros. It's weird that that differs between + // 'posix' and 'mmap', but we take advantage of it. When the file is + // filled slowly with write(2), the kernel allows it to grow larger, as + // long as there's swap available. + // + // In short, using 'dynamic_shared_memory_type = mmap' allows us one DSM + // segment to be larger than currently available RAM. But because we + // don't want to store it on a real file, which the kernel would try to + // flush to disk, so symlink pg_dynshm to /dev/shm. + // + // We don't set 'dynamic_shared_memory_type = mmap' here, we let the + // control plane control that option. If 'mmap' is not used, this + // symlink doesn't affect anything. + // + // See https://github.com/neondatabase/autoscaling/issues/800 + std::fs::remove_dir(pgdata_path.join("pg_dynshmem"))?; + symlink("/dev/shm/", pgdata_path.join("pg_dynshmem"))?; + match spec.mode { ComputeMode::Primary => {} ComputeMode::Replica | ComputeMode::Static(..) => { diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 46324efd43..1226eaa312 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -494,6 +494,8 @@ pub struct TimelineInfo { pub current_logical_size: u64, pub current_logical_size_is_accurate: bool, + pub directory_entries_counts: Vec, + /// Sum of the size of all layer files. /// If a layer is present in both local FS and S3, it counts only once. pub current_physical_size: Option, // is None when timeline is Unloaded diff --git a/libs/pageserver_api/src/reltag.rs b/libs/pageserver_api/src/reltag.rs index 8eb848a514..38693ab847 100644 --- a/libs/pageserver_api/src/reltag.rs +++ b/libs/pageserver_api/src/reltag.rs @@ -124,6 +124,7 @@ impl RelTag { Ord, strum_macros::EnumIter, strum_macros::FromRepr, + enum_map::Enum, )] #[repr(u8)] pub enum SlruKind { diff --git a/libs/pq_proto/Cargo.toml b/libs/pq_proto/Cargo.toml index b286eb0358..6eeb3bafef 100644 --- a/libs/pq_proto/Cargo.toml +++ b/libs/pq_proto/Cargo.toml @@ -13,5 +13,6 @@ rand.workspace = true tokio.workspace = true tracing.workspace = true thiserror.workspace = true +serde.workspace = true workspace_hack.workspace = true diff --git a/libs/pq_proto/src/lib.rs b/libs/pq_proto/src/lib.rs index c52a21bcd3..522b65f5d1 100644 --- a/libs/pq_proto/src/lib.rs +++ b/libs/pq_proto/src/lib.rs @@ -7,6 +7,7 @@ pub mod framed; use byteorder::{BigEndian, ReadBytesExt}; use bytes::{Buf, BufMut, Bytes, BytesMut}; +use serde::{Deserialize, Serialize}; use std::{borrow::Cow, collections::HashMap, fmt, io, str}; // re-export for use in utils pageserver_feedback.rs @@ -123,7 +124,7 @@ impl StartupMessageParams { } } -#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy)] +#[derive(Debug, Hash, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)] pub struct CancelKeyData { pub backend_pid: i32, pub cancel_key: i32, diff --git a/pageserver/src/deletion_queue.rs b/pageserver/src/deletion_queue.rs index da1da9331a..9046fe881b 100644 --- a/pageserver/src/deletion_queue.rs +++ b/pageserver/src/deletion_queue.rs @@ -234,7 +234,7 @@ impl DeletionHeader { let header_bytes = serde_json::to_vec(self).context("serialize deletion header")?; let header_path = conf.deletion_header_path(); let temp_path = path_with_suffix_extension(&header_path, TEMP_SUFFIX); - VirtualFile::crashsafe_overwrite(&header_path, &temp_path, &header_bytes) + VirtualFile::crashsafe_overwrite(&header_path, &temp_path, header_bytes) .await .maybe_fatal_err("save deletion header")?; @@ -325,7 +325,7 @@ impl DeletionList { let temp_path = path_with_suffix_extension(&path, TEMP_SUFFIX); let bytes = serde_json::to_vec(self).expect("Failed to serialize deletion list"); - VirtualFile::crashsafe_overwrite(&path, &temp_path, &bytes) + VirtualFile::crashsafe_overwrite(&path, &temp_path, bytes) .await .maybe_fatal_err("save deletion list") .map_err(Into::into) diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 4be8ee9892..c354cc9ab6 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -422,6 +422,7 @@ async fn build_timeline_info_common( tenant::timeline::logical_size::Accuracy::Approximate => false, tenant::timeline::logical_size::Accuracy::Exact => true, }, + directory_entries_counts: timeline.get_directory_metrics().to_vec(), current_physical_size, current_logical_size_non_incremental: None, timeline_dir_layer_file_size_sum: None, diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 98c98ef6e7..c2b1eafc3a 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -602,6 +602,15 @@ pub(crate) mod initial_logical_size { }); } +static DIRECTORY_ENTRIES_COUNT: Lazy = Lazy::new(|| { + register_uint_gauge_vec!( + "pageserver_directory_entries_count", + "Sum of the entries in pageserver-stored directory listings", + &["tenant_id", "shard_id", "timeline_id"] + ) + .expect("failed to define a metric") +}); + pub(crate) static TENANT_STATE_METRIC: Lazy = Lazy::new(|| { register_uint_gauge_vec!( "pageserver_tenant_states_count", @@ -1809,6 +1818,7 @@ pub(crate) struct TimelineMetrics { resident_physical_size_gauge: UIntGauge, /// copy of LayeredTimeline.current_logical_size pub current_logical_size_gauge: UIntGauge, + pub directory_entries_count_gauge: Lazy UIntGauge>>, pub num_persistent_files_created: IntCounter, pub persistent_bytes_written: IntCounter, pub evictions: IntCounter, @@ -1818,12 +1828,12 @@ pub(crate) struct TimelineMetrics { impl TimelineMetrics { pub fn new( tenant_shard_id: &TenantShardId, - timeline_id: &TimelineId, + timeline_id_raw: &TimelineId, evictions_with_low_residence_duration_builder: EvictionsWithLowResidenceDurationBuilder, ) -> Self { let tenant_id = tenant_shard_id.tenant_id.to_string(); let shard_id = format!("{}", tenant_shard_id.shard_slug()); - let timeline_id = timeline_id.to_string(); + let timeline_id = timeline_id_raw.to_string(); let flush_time_histo = StorageTimeMetrics::new( StorageTimeOperation::LayerFlush, &tenant_id, @@ -1876,6 +1886,22 @@ impl TimelineMetrics { let current_logical_size_gauge = CURRENT_LOGICAL_SIZE .get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id]) .unwrap(); + // TODO use impl Trait syntax here once we have ability to use it: https://github.com/rust-lang/rust/issues/63065 + let directory_entries_count_gauge_closure = { + let tenant_shard_id = *tenant_shard_id; + let timeline_id_raw = *timeline_id_raw; + move || { + let tenant_id = tenant_shard_id.tenant_id.to_string(); + let shard_id = format!("{}", tenant_shard_id.shard_slug()); + let timeline_id = timeline_id_raw.to_string(); + let gauge: UIntGauge = DIRECTORY_ENTRIES_COUNT + .get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id]) + .unwrap(); + gauge + } + }; + let directory_entries_count_gauge: Lazy UIntGauge>> = + Lazy::new(Box::new(directory_entries_count_gauge_closure)); let num_persistent_files_created = NUM_PERSISTENT_FILES_CREATED .get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id]) .unwrap(); @@ -1902,6 +1928,7 @@ impl TimelineMetrics { last_record_gauge, resident_physical_size_gauge, current_logical_size_gauge, + directory_entries_count_gauge, num_persistent_files_created, persistent_bytes_written, evictions, @@ -1944,6 +1971,9 @@ impl Drop for TimelineMetrics { RESIDENT_PHYSICAL_SIZE.remove_label_values(&[tenant_id, &shard_id, timeline_id]); } let _ = CURRENT_LOGICAL_SIZE.remove_label_values(&[tenant_id, &shard_id, timeline_id]); + if let Some(metric) = Lazy::get(&DIRECTORY_ENTRIES_COUNT) { + let _ = metric.remove_label_values(&[tenant_id, &shard_id, timeline_id]); + } let _ = NUM_PERSISTENT_FILES_CREATED.remove_label_values(&[tenant_id, &shard_id, timeline_id]); let _ = PERSISTENT_BYTES_WRITTEN.remove_label_values(&[tenant_id, &shard_id, timeline_id]); diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index f1d18c0146..5f80ea9b5e 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -14,6 +14,7 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_i use crate::walrecord::NeonWalRecord; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes, BytesMut}; +use enum_map::Enum; use pageserver_api::key::{ dbdir_key_range, is_rel_block_key, is_slru_block_key, rel_block_to_key, rel_dir_to_key, rel_key_range, rel_size_to_key, relmap_file_key, slru_block_to_key, slru_dir_to_key, @@ -155,6 +156,7 @@ impl Timeline { pending_updates: HashMap::new(), pending_deletions: Vec::new(), pending_nblocks: 0, + pending_directory_entries: Vec::new(), lsn, } } @@ -868,6 +870,7 @@ pub struct DatadirModification<'a> { pending_updates: HashMap>, pending_deletions: Vec<(Range, Lsn)>, pending_nblocks: i64, + pending_directory_entries: Vec<(DirectoryKind, usize)>, } impl<'a> DatadirModification<'a> { @@ -899,6 +902,7 @@ impl<'a> DatadirModification<'a> { let buf = DbDirectory::ser(&DbDirectory { dbdirs: HashMap::new(), })?; + self.pending_directory_entries.push((DirectoryKind::Db, 0)); self.put(DBDIR_KEY, Value::Image(buf.into())); // Create AuxFilesDirectory @@ -907,16 +911,24 @@ impl<'a> DatadirModification<'a> { let buf = TwoPhaseDirectory::ser(&TwoPhaseDirectory { xids: HashSet::new(), })?; + self.pending_directory_entries + .push((DirectoryKind::TwoPhase, 0)); self.put(TWOPHASEDIR_KEY, Value::Image(buf.into())); let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into(); let empty_dir = Value::Image(buf); self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone()); + self.pending_directory_entries + .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0)); self.put( slru_dir_to_key(SlruKind::MultiXactMembers), empty_dir.clone(), ); + self.pending_directory_entries + .push((DirectoryKind::SlruSegment(SlruKind::Clog), 0)); self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir); + self.pending_directory_entries + .push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0)); Ok(()) } @@ -1017,6 +1029,7 @@ impl<'a> DatadirModification<'a> { let buf = RelDirectory::ser(&RelDirectory { rels: HashSet::new(), })?; + self.pending_directory_entries.push((DirectoryKind::Rel, 0)); self.put( rel_dir_to_key(spcnode, dbnode), Value::Image(Bytes::from(buf)), @@ -1039,6 +1052,8 @@ impl<'a> DatadirModification<'a> { if !dir.xids.insert(xid) { anyhow::bail!("twophase file for xid {} already exists", xid); } + self.pending_directory_entries + .push((DirectoryKind::TwoPhase, dir.xids.len())); self.put( TWOPHASEDIR_KEY, Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)), @@ -1074,6 +1089,8 @@ impl<'a> DatadirModification<'a> { let mut dir = DbDirectory::des(&buf)?; if dir.dbdirs.remove(&(spcnode, dbnode)).is_some() { let buf = DbDirectory::ser(&dir)?; + self.pending_directory_entries + .push((DirectoryKind::Db, dir.dbdirs.len())); self.put(DBDIR_KEY, Value::Image(buf.into())); } else { warn!( @@ -1111,6 +1128,8 @@ impl<'a> DatadirModification<'a> { // Didn't exist. Update dbdir dbdir.dbdirs.insert((rel.spcnode, rel.dbnode), false); let buf = DbDirectory::ser(&dbdir).context("serialize db")?; + self.pending_directory_entries + .push((DirectoryKind::Db, dbdir.dbdirs.len())); self.put(DBDIR_KEY, Value::Image(buf.into())); // and create the RelDirectory @@ -1125,6 +1144,10 @@ impl<'a> DatadirModification<'a> { if !rel_dir.rels.insert((rel.relnode, rel.forknum)) { return Err(RelationError::AlreadyExists); } + + self.pending_directory_entries + .push((DirectoryKind::Rel, rel_dir.rels.len())); + self.put( rel_dir_key, Value::Image(Bytes::from( @@ -1216,6 +1239,9 @@ impl<'a> DatadirModification<'a> { let buf = self.get(dir_key, ctx).await?; let mut dir = RelDirectory::des(&buf)?; + self.pending_directory_entries + .push((DirectoryKind::Rel, dir.rels.len())); + if dir.rels.remove(&(rel.relnode, rel.forknum)) { self.put(dir_key, Value::Image(Bytes::from(RelDirectory::ser(&dir)?))); } else { @@ -1251,6 +1277,8 @@ impl<'a> DatadirModification<'a> { if !dir.segments.insert(segno) { anyhow::bail!("slru segment {kind:?}/{segno} already exists"); } + self.pending_directory_entries + .push((DirectoryKind::SlruSegment(kind), dir.segments.len())); self.put( dir_key, Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)), @@ -1295,6 +1323,8 @@ impl<'a> DatadirModification<'a> { if !dir.segments.remove(&segno) { warn!("slru segment {:?}/{} does not exist", kind, segno); } + self.pending_directory_entries + .push((DirectoryKind::SlruSegment(kind), dir.segments.len())); self.put( dir_key, Value::Image(Bytes::from(SlruSegmentDirectory::ser(&dir)?)), @@ -1325,6 +1355,8 @@ impl<'a> DatadirModification<'a> { if !dir.xids.remove(&xid) { warn!("twophase file for xid {} does not exist", xid); } + self.pending_directory_entries + .push((DirectoryKind::TwoPhase, dir.xids.len())); self.put( TWOPHASEDIR_KEY, Value::Image(Bytes::from(TwoPhaseDirectory::ser(&dir)?)), @@ -1340,6 +1372,8 @@ impl<'a> DatadirModification<'a> { let buf = AuxFilesDirectory::ser(&AuxFilesDirectory { files: HashMap::new(), })?; + self.pending_directory_entries + .push((DirectoryKind::AuxFiles, 0)); self.put(AUX_FILES_KEY, Value::Image(Bytes::from(buf))); Ok(()) } @@ -1366,6 +1400,9 @@ impl<'a> DatadirModification<'a> { } else { dir.files.insert(path, Bytes::copy_from_slice(content)); } + self.pending_directory_entries + .push((DirectoryKind::AuxFiles, dir.files.len())); + self.put( AUX_FILES_KEY, Value::Image(Bytes::from( @@ -1427,6 +1464,10 @@ impl<'a> DatadirModification<'a> { self.pending_nblocks = 0; } + for (kind, count) in std::mem::take(&mut self.pending_directory_entries) { + writer.update_directory_entries_count(kind, count as u64); + } + Ok(()) } @@ -1464,6 +1505,10 @@ impl<'a> DatadirModification<'a> { writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); } + for (kind, count) in std::mem::take(&mut self.pending_directory_entries) { + writer.update_directory_entries_count(kind, count as u64); + } + Ok(()) } @@ -1588,6 +1633,23 @@ struct SlruSegmentDirectory { segments: HashSet, } +#[derive(Copy, Clone, PartialEq, Eq, Debug, enum_map::Enum)] +#[repr(u8)] +pub(crate) enum DirectoryKind { + Db, + TwoPhase, + Rel, + AuxFiles, + SlruSegment(SlruKind), +} + +impl DirectoryKind { + pub(crate) const KINDS_NUM: usize = ::LENGTH; + pub(crate) fn offset(&self) -> usize { + self.into_usize() + } +} + static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]); #[allow(clippy::bool_assert_comparison)] diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d946c57118..9f1f188bf2 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -2880,7 +2880,7 @@ impl Tenant { let config_path = config_path.to_owned(); tokio::task::spawn_blocking(move || { Handle::current().block_on(async move { - let conf_content = conf_content.as_bytes(); + let conf_content = conf_content.into_bytes(); VirtualFile::crashsafe_overwrite(&config_path, &temp_path, conf_content) .await .with_context(|| { @@ -2917,7 +2917,7 @@ impl Tenant { let target_config_path = target_config_path.to_owned(); tokio::task::spawn_blocking(move || { Handle::current().block_on(async move { - let conf_content = conf_content.as_bytes(); + let conf_content = conf_content.into_bytes(); VirtualFile::crashsafe_overwrite(&target_config_path, &temp_path, conf_content) .await .with_context(|| { diff --git a/pageserver/src/tenant/blob_io.rs b/pageserver/src/tenant/blob_io.rs index e2ff12665a..ec70bdc679 100644 --- a/pageserver/src/tenant/blob_io.rs +++ b/pageserver/src/tenant/blob_io.rs @@ -131,27 +131,23 @@ impl BlobWriter { &mut self, src_buf: B, ) -> (B::Buf, Result<(), Error>) { - let src_buf_len = src_buf.bytes_init(); - let (src_buf, res) = if src_buf_len > 0 { - let src_buf = src_buf.slice(0..src_buf_len); - let res = self.inner.write_all(&src_buf).await; - let src_buf = Slice::into_inner(src_buf); - (src_buf, res) - } else { - let res = self.inner.write_all(&[]).await; - (Slice::into_inner(src_buf.slice_full()), res) + let (src_buf, res) = self.inner.write_all(src_buf).await; + let nbytes = match res { + Ok(nbytes) => nbytes, + Err(e) => return (src_buf, Err(e)), }; - if let Ok(()) = &res { - self.offset += src_buf_len as u64; - } - (src_buf, res) + self.offset += nbytes as u64; + (src_buf, Ok(())) } #[inline(always)] /// Flushes the internal buffer to the underlying `VirtualFile`. pub async fn flush_buffer(&mut self) -> Result<(), Error> { - self.inner.write_all(&self.buf).await?; - self.buf.clear(); + let buf = std::mem::take(&mut self.buf); + let (mut buf, res) = self.inner.write_all(buf).await; + res?; + buf.clear(); + self.buf = buf; Ok(()) } diff --git a/pageserver/src/tenant/metadata.rs b/pageserver/src/tenant/metadata.rs index 6fb86c65e2..dcbe781f90 100644 --- a/pageserver/src/tenant/metadata.rs +++ b/pageserver/src/tenant/metadata.rs @@ -279,7 +279,7 @@ pub async fn save_metadata( let path = conf.metadata_path(tenant_shard_id, timeline_id); let temp_path = path_with_suffix_extension(&path, TEMP_FILE_SUFFIX); let metadata_bytes = data.to_bytes().context("serialize metadata")?; - VirtualFile::crashsafe_overwrite(&path, &temp_path, &metadata_bytes) + VirtualFile::crashsafe_overwrite(&path, &temp_path, metadata_bytes) .await .context("write metadata")?; Ok(()) diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 0666e104f8..c23416a7f0 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -486,7 +486,7 @@ impl<'a> TenantDownloader<'a> { let heatmap_path_bg = heatmap_path.clone(); tokio::task::spawn_blocking(move || { tokio::runtime::Handle::current().block_on(async move { - VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, &heatmap_bytes).await + VirtualFile::crashsafe_overwrite(&heatmap_path_bg, &temp_path, heatmap_bytes).await }) }) .await diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index 7a5dc7a59f..9a7bcbcebe 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -461,7 +461,8 @@ impl DeltaLayerWriterInner { file.seek(SeekFrom::Start(index_start_blk as u64 * PAGE_SZ as u64)) .await?; for buf in block_buf.blocks { - file.write_all(buf.as_ref()).await?; + let (_buf, res) = file.write_all(buf).await; + res?; } assert!(self.lsn_range.start < self.lsn_range.end); // Fill in the summary on blk 0 @@ -476,17 +477,12 @@ impl DeltaLayerWriterInner { index_root_blk, }; - let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new(); + let mut buf = Vec::with_capacity(PAGE_SZ); + // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&summary, &mut buf)?; - if buf.spilled() { - // This is bad as we only have one free block for the summary - warn!( - "Used more than one page size for summary buffer: {}", - buf.len() - ); - } file.seek(SeekFrom::Start(0)).await?; - file.write_all(&buf).await?; + let (_buf, res) = file.write_all(buf).await; + res?; let metadata = file .metadata() @@ -679,18 +675,12 @@ impl DeltaLayer { let new_summary = rewrite(actual_summary); - let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new(); + let mut buf = Vec::with_capacity(PAGE_SZ); + // TODO: could use smallvec here, but it's a pain with Slice Summary::ser_into(&new_summary, &mut buf).context("serialize")?; - if buf.spilled() { - // The code in DeltaLayerWriterInner just warn!()s for this. - // It should probably error out as well. - return Err(RewriteSummaryError::Other(anyhow::anyhow!( - "Used more than one page size for summary buffer: {}", - buf.len() - ))); - } file.seek(SeekFrom::Start(0)).await?; - file.write_all(&buf).await?; + let (_buf, res) = file.write_all(buf).await; + res?; Ok(()) } } diff --git a/pageserver/src/tenant/storage_layer/image_layer.rs b/pageserver/src/tenant/storage_layer/image_layer.rs index 1ad195032d..458131b572 100644 --- a/pageserver/src/tenant/storage_layer/image_layer.rs +++ b/pageserver/src/tenant/storage_layer/image_layer.rs @@ -341,18 +341,12 @@ impl ImageLayer { let new_summary = rewrite(actual_summary); - let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new(); + let mut buf = Vec::with_capacity(PAGE_SZ); + // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&new_summary, &mut buf).context("serialize")?; - if buf.spilled() { - // The code in ImageLayerWriterInner just warn!()s for this. - // It should probably error out as well. - return Err(RewriteSummaryError::Other(anyhow::anyhow!( - "Used more than one page size for summary buffer: {}", - buf.len() - ))); - } file.seek(SeekFrom::Start(0)).await?; - file.write_all(&buf).await?; + let (_buf, res) = file.write_all(buf).await; + res?; Ok(()) } } @@ -555,7 +549,8 @@ impl ImageLayerWriterInner { .await?; let (index_root_blk, block_buf) = self.tree.finish()?; for buf in block_buf.blocks { - file.write_all(buf.as_ref()).await?; + let (_buf, res) = file.write_all(buf).await; + res?; } // Fill in the summary on blk 0 @@ -570,17 +565,12 @@ impl ImageLayerWriterInner { index_root_blk, }; - let mut buf = smallvec::SmallVec::<[u8; PAGE_SZ]>::new(); + let mut buf = Vec::with_capacity(PAGE_SZ); + // TODO: could use smallvec here but it's a pain with Slice Summary::ser_into(&summary, &mut buf)?; - if buf.spilled() { - // This is bad as we only have one free block for the summary - warn!( - "Used more than one page size for summary buffer: {}", - buf.len() - ); - } file.seek(SeekFrom::Start(0)).await?; - file.write_all(&buf).await?; + let (_buf, res) = file.write_all(buf).await; + res?; let metadata = file .metadata() diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 625be7a644..87cf0ac6ea 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -14,6 +14,7 @@ use enumset::EnumSet; use fail::fail_point; use futures::stream::StreamExt; use itertools::Itertools; +use once_cell::sync::Lazy; use pageserver_api::{ keyspace::{key_range_size, KeySpaceAccum}, models::{ @@ -34,17 +35,22 @@ use tokio_util::sync::CancellationToken; use tracing::*; use utils::sync::gate::Gate; -use std::collections::{BTreeMap, BinaryHeap, HashMap, HashSet}; use std::ops::{Deref, Range}; use std::pin::pin; use std::sync::atomic::Ordering as AtomicOrdering; use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::{Duration, Instant, SystemTime}; +use std::{ + array, + collections::{BTreeMap, BinaryHeap, HashMap, HashSet}, + sync::atomic::AtomicU64, +}; use std::{ cmp::{max, min, Ordering}, ops::ControlFlow, }; +use crate::pgdatadir_mapping::DirectoryKind; use crate::tenant::timeline::logical_size::CurrentLogicalSize; use crate::tenant::{ layer_map::{LayerMap, SearchResult}, @@ -258,6 +264,8 @@ pub struct Timeline { // in `crate::page_service` writes these metrics. pub(crate) query_metrics: crate::metrics::SmgrQueryTimePerTimeline, + directory_metrics: [AtomicU64; DirectoryKind::KINDS_NUM], + /// Ensures layers aren't frozen by checkpointer between /// [`Timeline::get_layer_for_write`] and layer reads. /// Locked automatically by [`TimelineWriter`] and checkpointer. @@ -790,6 +798,10 @@ impl Timeline { self.metrics.resident_physical_size_get() } + pub(crate) fn get_directory_metrics(&self) -> [u64; DirectoryKind::KINDS_NUM] { + array::from_fn(|idx| self.directory_metrics[idx].load(AtomicOrdering::Relaxed)) + } + /// /// Wait until WAL has been received and processed up to this LSN. /// @@ -1496,6 +1508,8 @@ impl Timeline { &timeline_id, ), + directory_metrics: array::from_fn(|_| AtomicU64::new(0)), + flush_loop_state: Mutex::new(FlushLoopState::NotStarted), layer_flush_start_tx, @@ -2264,6 +2278,29 @@ impl Timeline { } } + pub(crate) fn update_directory_entries_count(&self, kind: DirectoryKind, count: u64) { + self.directory_metrics[kind.offset()].store(count, AtomicOrdering::Relaxed); + let aux_metric = + self.directory_metrics[DirectoryKind::AuxFiles.offset()].load(AtomicOrdering::Relaxed); + + let sum_of_entries = self + .directory_metrics + .iter() + .map(|v| v.load(AtomicOrdering::Relaxed)) + .sum(); + // Set a high general threshold and a lower threshold for the auxiliary files, + // as we can have large numbers of relations in the db directory. + const SUM_THRESHOLD: u64 = 5000; + const AUX_THRESHOLD: u64 = 1000; + if sum_of_entries >= SUM_THRESHOLD || aux_metric >= AUX_THRESHOLD { + self.metrics + .directory_entries_count_gauge + .set(sum_of_entries); + } else if let Some(metric) = Lazy::get(&self.metrics.directory_entries_count_gauge) { + metric.set(sum_of_entries); + } + } + async fn find_layer(&self, layer_file_name: &str) -> Option { let guard = self.layers.read().await; for historic_layer in guard.layer_map().iter_historic_layers() { diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 059a6596d3..6cff748d42 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -19,7 +19,7 @@ use once_cell::sync::OnceCell; use pageserver_api::shard::TenantShardId; use std::fs::{self, File}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; -use tokio_epoll_uring::IoBufMut; +use tokio_epoll_uring::{BoundedBuf, IoBufMut, Slice}; use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::os::unix::fs::FileExt; @@ -410,10 +410,10 @@ impl VirtualFile { /// step, the tmp path is renamed to the final path. As renames are /// atomic, a crash during the write operation will never leave behind a /// partially written file. - pub async fn crashsafe_overwrite( + pub async fn crashsafe_overwrite( final_path: &Utf8Path, tmp_path: &Utf8Path, - content: &[u8], + content: B, ) -> std::io::Result<()> { let Some(final_path_parent) = final_path.parent() else { return Err(std::io::Error::from_raw_os_error( @@ -430,7 +430,8 @@ impl VirtualFile { .create_new(true), ) .await?; - file.write_all(content).await?; + let (_content, res) = file.write_all(content).await; + res?; file.sync_all().await?; drop(file); // before the rename, that's important! // renames are atomic @@ -601,23 +602,36 @@ impl VirtualFile { Ok(()) } - pub async fn write_all(&mut self, mut buf: &[u8]) -> Result<(), Error> { + /// Writes `buf.slice(0..buf.bytes_init())`. + /// Returns the IoBuf that is underlying the BoundedBuf `buf`. + /// I.e., the returned value's `bytes_init()` method returns something different than the `bytes_init()` that was passed in. + /// It's quite brittle and easy to mis-use, so, we return the size in the Ok() variant. + pub async fn write_all(&mut self, buf: B) -> (B::Buf, Result) { + let nbytes = buf.bytes_init(); + if nbytes == 0 { + return (Slice::into_inner(buf.slice_full()), Ok(0)); + } + let mut buf = buf.slice(0..nbytes); while !buf.is_empty() { - match self.write(buf).await { + // TODO: push `Slice` further down + match self.write(&buf).await { Ok(0) => { - return Err(Error::new( - std::io::ErrorKind::WriteZero, - "failed to write whole buffer", - )); + return ( + Slice::into_inner(buf), + Err(Error::new( + std::io::ErrorKind::WriteZero, + "failed to write whole buffer", + )), + ); } Ok(n) => { - buf = &buf[n..]; + buf = buf.slice(n..); } Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => {} - Err(e) => return Err(e), + Err(e) => return (Slice::into_inner(buf), Err(e)), } } - Ok(()) + (Slice::into_inner(buf), Ok(nbytes)) } async fn write(&mut self, buf: &[u8]) -> Result { @@ -676,7 +690,6 @@ where F: FnMut(tokio_epoll_uring::Slice, u64) -> Fut, Fut: std::future::Future, std::io::Result)>, { - use tokio_epoll_uring::BoundedBuf; let mut buf: tokio_epoll_uring::Slice = buf.slice_full(); // includes all the uninitialized memory while buf.bytes_total() != 0 { let res; @@ -1063,10 +1076,19 @@ mod tests { MaybeVirtualFile::File(file) => file.seek(pos), } } - async fn write_all(&mut self, buf: &[u8]) -> Result<(), Error> { + async fn write_all(&mut self, buf: B) -> Result<(), Error> { match self { - MaybeVirtualFile::VirtualFile(file) => file.write_all(buf).await, - MaybeVirtualFile::File(file) => file.write_all(buf), + MaybeVirtualFile::VirtualFile(file) => { + let (_buf, res) = file.write_all(buf).await; + res.map(|_| ()) + } + MaybeVirtualFile::File(file) => { + let buf_len = buf.bytes_init(); + if buf_len == 0 { + return Ok(()); + } + file.write_all(&buf.slice(0..buf_len)) + } } } @@ -1141,7 +1163,7 @@ mod tests { .to_owned(), ) .await?; - file_a.write_all(b"foobar").await?; + file_a.write_all(b"foobar".to_vec()).await?; // cannot read from a file opened in write-only mode let _ = file_a.read_string().await.unwrap_err(); @@ -1150,7 +1172,7 @@ mod tests { let mut file_a = openfunc(path_a, OpenOptions::new().read(true).to_owned()).await?; // cannot write to a file opened in read-only mode - let _ = file_a.write_all(b"bar").await.unwrap_err(); + let _ = file_a.write_all(b"bar".to_vec()).await.unwrap_err(); // Try simple read assert_eq!("foobar", file_a.read_string().await?); @@ -1293,7 +1315,7 @@ mod tests { let path = testdir.join("myfile"); let tmp_path = testdir.join("myfile.tmp"); - VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo") + VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec()) .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); @@ -1302,7 +1324,7 @@ mod tests { assert!(!tmp_path.exists()); drop(file); - VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar") + VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar".to_vec()) .await .unwrap(); let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap()); @@ -1324,7 +1346,7 @@ mod tests { std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap(); assert!(tmp_path.exists()); - VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo") + VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo".to_vec()) .await .unwrap(); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 63e8b8dc1f..213e396328 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -3079,14 +3079,6 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) XLogRecGetBlockTag(record, block_id, &rinfo, &forknum, &blkno); #endif - /* - * Out of an abundance of caution, we always run redo on shared catalogs, - * regardless of whether the block is stored in shared buffers. See also - * this function's top comment. - */ - if (!OidIsValid(NInfoGetDbOid(rinfo))) - return false; - CopyNRelFileInfoToBufTag(tag, rinfo); tag.forkNum = forknum; tag.blockNum = blkno; @@ -3100,17 +3092,28 @@ neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) */ LWLockAcquire(partitionLock, LW_SHARED); - /* Try to find the relevant buffer */ - buffer = BufTableLookup(&tag, hash); - - no_redo_needed = buffer < 0; + /* + * Out of an abundance of caution, we always run redo on shared catalogs, + * regardless of whether the block is stored in shared buffers. See also + * this function's top comment. + */ + if (!OidIsValid(NInfoGetDbOid(rinfo))) + { + no_redo_needed = false; + } + else + { + /* Try to find the relevant buffer */ + buffer = BufTableLookup(&tag, hash); + no_redo_needed = buffer < 0; + } /* In both cases st lwlsn past this WAL record */ SetLastWrittenLSNForBlock(end_recptr, rinfo, forknum, blkno); /* * we don't have the buffer in memory, update lwLsn past this record, also - * evict page fro file cache + * evict page from file cache */ if (no_redo_needed) lfc_evict(rinfo, forknum, blkno); diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index 00a229c135..b3d4fc0411 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -1,6 +1,8 @@ use futures::future::Either; use proxy::auth; use proxy::auth::backend::MaybeOwned; +use proxy::cancellation::CancelMap; +use proxy::cancellation::CancellationHandler; use proxy::config::AuthenticationConfig; use proxy::config::CacheOptions; use proxy::config::HttpConfig; @@ -12,6 +14,7 @@ use proxy::rate_limiter::EndpointRateLimiter; use proxy::rate_limiter::RateBucketInfo; use proxy::rate_limiter::RateLimiterConfig; use proxy::redis::notifications; +use proxy::redis::publisher::RedisPublisherClient; use proxy::serverless::GlobalConnPoolOptions; use proxy::usage_metrics; @@ -22,6 +25,7 @@ use std::net::SocketAddr; use std::pin::pin; use std::sync::Arc; use tokio::net::TcpListener; +use tokio::sync::Mutex; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; use tracing::info; @@ -129,6 +133,9 @@ struct ProxyCliArgs { /// Can be given multiple times for different bucket sizes. #[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)] endpoint_rps_limit: Vec, + /// Redis rate limiter max number of requests per second. + #[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)] + redis_rps_limit: Vec, /// Initial limit for dynamic rate limiter. Makes sense only if `rate_limit_algorithm` is *not* `None`. #[clap(long, default_value_t = 100)] initial_limit: usize, @@ -225,6 +232,19 @@ async fn main() -> anyhow::Result<()> { let cancellation_token = CancellationToken::new(); let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new(&config.endpoint_rps_limit)); + let cancel_map = CancelMap::default(); + let redis_publisher = match &args.redis_notifications { + Some(url) => Some(Arc::new(Mutex::new(RedisPublisherClient::new( + url, + args.region.clone(), + &config.redis_rps_limit, + )?))), + None => None, + }; + let cancellation_handler = Arc::new(CancellationHandler::new( + cancel_map.clone(), + redis_publisher, + )); // client facing tasks. these will exit on error or on cancellation // cancellation returns Ok(()) @@ -234,6 +254,7 @@ async fn main() -> anyhow::Result<()> { proxy_listener, cancellation_token.clone(), endpoint_rate_limiter.clone(), + cancellation_handler.clone(), )); // TODO: rename the argument to something like serverless. @@ -248,6 +269,7 @@ async fn main() -> anyhow::Result<()> { serverless_listener, cancellation_token.clone(), endpoint_rate_limiter.clone(), + cancellation_handler.clone(), )); } @@ -271,7 +293,12 @@ async fn main() -> anyhow::Result<()> { let cache = api.caches.project_info.clone(); if let Some(url) = args.redis_notifications { info!("Starting redis notifications listener ({url})"); - maintenance_tasks.spawn(notifications::task_main(url.to_owned(), cache.clone())); + maintenance_tasks.spawn(notifications::task_main( + url.to_owned(), + cache.clone(), + cancel_map.clone(), + args.region.clone(), + )); } maintenance_tasks.spawn(async move { cache.clone().gc_worker().await }); } @@ -403,6 +430,8 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { let mut endpoint_rps_limit = args.endpoint_rps_limit.clone(); RateBucketInfo::validate(&mut endpoint_rps_limit)?; + let mut redis_rps_limit = args.redis_rps_limit.clone(); + RateBucketInfo::validate(&mut redis_rps_limit)?; let config = Box::leak(Box::new(ProxyConfig { tls_config, @@ -414,6 +443,7 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { require_client_ip: args.require_client_ip, disable_ip_check_for_http: args.disable_ip_check_for_http, endpoint_rps_limit, + redis_rps_limit, handshake_timeout: args.handshake_timeout, // TODO: add this argument region: args.region.clone(), diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index fe614628d8..93a77bc4ae 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -1,16 +1,28 @@ +use async_trait::async_trait; use dashmap::DashMap; use pq_proto::CancelKeyData; use std::{net::SocketAddr, sync::Arc}; use thiserror::Error; use tokio::net::TcpStream; +use tokio::sync::Mutex; use tokio_postgres::{CancelToken, NoTls}; use tracing::info; +use uuid::Uuid; -use crate::error::ReportableError; +use crate::{ + error::ReportableError, metrics::NUM_CANCELLATION_REQUESTS, + redis::publisher::RedisPublisherClient, +}; + +pub type CancelMap = Arc>>; /// Enables serving `CancelRequest`s. -#[derive(Default)] -pub struct CancelMap(DashMap>); +/// +/// If there is a `RedisPublisherClient` available, it will be used to publish the cancellation key to other proxy instances. +pub struct CancellationHandler { + map: CancelMap, + redis_client: Option>>, +} #[derive(Debug, Error)] pub enum CancelError { @@ -32,15 +44,43 @@ impl ReportableError for CancelError { } } -impl CancelMap { +impl CancellationHandler { + pub fn new(map: CancelMap, redis_client: Option>>) -> Self { + Self { map, redis_client } + } /// Cancel a running query for the corresponding connection. - pub async fn cancel_session(&self, key: CancelKeyData) -> Result<(), CancelError> { + pub async fn cancel_session( + &self, + key: CancelKeyData, + session_id: Uuid, + ) -> Result<(), CancelError> { + let from = "from_client"; // NB: we should immediately release the lock after cloning the token. - let Some(cancel_closure) = self.0.get(&key).and_then(|x| x.clone()) else { + let Some(cancel_closure) = self.map.get(&key).and_then(|x| x.clone()) else { tracing::warn!("query cancellation key not found: {key}"); + if let Some(redis_client) = &self.redis_client { + NUM_CANCELLATION_REQUESTS + .with_label_values(&[from, "not_found"]) + .inc(); + info!("publishing cancellation key to Redis"); + match redis_client.lock().await.try_publish(key, session_id).await { + Ok(()) => { + info!("cancellation key successfuly published to Redis"); + } + Err(e) => { + tracing::error!("failed to publish a message: {e}"); + return Err(CancelError::IO(std::io::Error::new( + std::io::ErrorKind::Other, + e.to_string(), + ))); + } + } + } return Ok(()); }; - + NUM_CANCELLATION_REQUESTS + .with_label_values(&[from, "found"]) + .inc(); info!("cancelling query per user's request using key {key}"); cancel_closure.try_cancel_query().await } @@ -57,7 +97,7 @@ impl CancelMap { // Random key collisions are unlikely to happen here, but they're still possible, // which is why we have to take care not to rewrite an existing key. - match self.0.entry(key) { + match self.map.entry(key) { dashmap::mapref::entry::Entry::Occupied(_) => continue, dashmap::mapref::entry::Entry::Vacant(e) => { e.insert(None); @@ -69,18 +109,46 @@ impl CancelMap { info!("registered new query cancellation key {key}"); Session { key, - cancel_map: self, + cancellation_handler: self, } } #[cfg(test)] fn contains(&self, session: &Session) -> bool { - self.0.contains_key(&session.key) + self.map.contains_key(&session.key) } #[cfg(test)] fn is_empty(&self) -> bool { - self.0.is_empty() + self.map.is_empty() + } +} + +#[async_trait] +pub trait NotificationsCancellationHandler { + async fn cancel_session_no_publish(&self, key: CancelKeyData) -> Result<(), CancelError>; +} + +#[async_trait] +impl NotificationsCancellationHandler for CancellationHandler { + async fn cancel_session_no_publish(&self, key: CancelKeyData) -> Result<(), CancelError> { + let from = "from_redis"; + let cancel_closure = self.map.get(&key).and_then(|x| x.clone()); + match cancel_closure { + Some(cancel_closure) => { + NUM_CANCELLATION_REQUESTS + .with_label_values(&[from, "found"]) + .inc(); + cancel_closure.try_cancel_query().await + } + None => { + NUM_CANCELLATION_REQUESTS + .with_label_values(&[from, "not_found"]) + .inc(); + tracing::warn!("query cancellation key not found: {key}"); + Ok(()) + } + } } } @@ -115,7 +183,7 @@ pub struct Session { /// The user-facing key identifying this session. key: CancelKeyData, /// The [`CancelMap`] this session belongs to. - cancel_map: Arc, + cancellation_handler: Arc, } impl Session { @@ -123,7 +191,9 @@ impl Session { /// This enables query cancellation in `crate::proxy::prepare_client_connection`. pub fn enable_query_cancellation(&self, cancel_closure: CancelClosure) -> CancelKeyData { info!("enabling query cancellation for this session"); - self.cancel_map.0.insert(self.key, Some(cancel_closure)); + self.cancellation_handler + .map + .insert(self.key, Some(cancel_closure)); self.key } @@ -131,7 +201,7 @@ impl Session { impl Drop for Session { fn drop(&mut self) { - self.cancel_map.0.remove(&self.key); + self.cancellation_handler.map.remove(&self.key); info!("dropped query cancellation key {}", &self.key); } } @@ -142,13 +212,16 @@ mod tests { #[tokio::test] async fn check_session_drop() -> anyhow::Result<()> { - let cancel_map: Arc = Default::default(); + let cancellation_handler = Arc::new(CancellationHandler { + map: CancelMap::default(), + redis_client: None, + }); - let session = cancel_map.clone().get_session(); - assert!(cancel_map.contains(&session)); + let session = cancellation_handler.clone().get_session(); + assert!(cancellation_handler.contains(&session)); drop(session); // Check that the session has been dropped. - assert!(cancel_map.is_empty()); + assert!(cancellation_handler.is_empty()); Ok(()) } diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 5fcb537834..9f276c3c24 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -21,6 +21,7 @@ pub struct ProxyConfig { pub require_client_ip: bool, pub disable_ip_check_for_http: bool, pub endpoint_rps_limit: Vec, + pub redis_rps_limit: Vec, pub region: String, pub handshake_timeout: Duration, } diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index f7f162a075..66031f5eb2 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -152,6 +152,15 @@ pub static NUM_OPEN_CLIENTS_IN_HTTP_POOL: Lazy = Lazy::new(|| { .unwrap() }); +pub static NUM_CANCELLATION_REQUESTS: Lazy = Lazy::new(|| { + register_int_counter_vec!( + "proxy_cancellation_requests_total", + "Number of cancellation requests (per found/not_found).", + &["source", "kind"], + ) + .unwrap() +}); + #[derive(Clone)] pub struct LatencyTimer { // time since the stopwatch was started diff --git a/proxy/src/proxy.rs b/proxy/src/proxy.rs index 5f65de4c98..ce77098a5f 100644 --- a/proxy/src/proxy.rs +++ b/proxy/src/proxy.rs @@ -10,7 +10,7 @@ pub mod wake_compute; use crate::{ auth, - cancellation::{self, CancelMap}, + cancellation::{self, CancellationHandler}, compute, config::{ProxyConfig, TlsConfig}, context::RequestMonitoring, @@ -62,6 +62,7 @@ pub async fn task_main( listener: tokio::net::TcpListener, cancellation_token: CancellationToken, endpoint_rate_limiter: Arc, + cancellation_handler: Arc, ) -> anyhow::Result<()> { scopeguard::defer! { info!("proxy has shut down"); @@ -72,7 +73,6 @@ pub async fn task_main( socket2::SockRef::from(&listener).set_keepalive(true)?; let connections = tokio_util::task::task_tracker::TaskTracker::new(); - let cancel_map = Arc::new(CancelMap::default()); while let Some(accept_result) = run_until_cancelled(listener.accept(), &cancellation_token).await @@ -80,7 +80,7 @@ pub async fn task_main( let (socket, peer_addr) = accept_result?; let session_id = uuid::Uuid::new_v4(); - let cancel_map = Arc::clone(&cancel_map); + let cancellation_handler = Arc::clone(&cancellation_handler); let endpoint_rate_limiter = endpoint_rate_limiter.clone(); let session_span = info_span!( @@ -113,7 +113,7 @@ pub async fn task_main( let res = handle_client( config, &mut ctx, - cancel_map, + cancellation_handler, socket, ClientMode::Tcp, endpoint_rate_limiter, @@ -227,7 +227,7 @@ impl ReportableError for ClientRequestError { pub async fn handle_client( config: &'static ProxyConfig, ctx: &mut RequestMonitoring, - cancel_map: Arc, + cancellation_handler: Arc, stream: S, mode: ClientMode, endpoint_rate_limiter: Arc, @@ -253,8 +253,8 @@ pub async fn handle_client( match tokio::time::timeout(config.handshake_timeout, do_handshake).await?? { HandshakeData::Startup(stream, params) => (stream, params), HandshakeData::Cancel(cancel_key_data) => { - return Ok(cancel_map - .cancel_session(cancel_key_data) + return Ok(cancellation_handler + .cancel_session(cancel_key_data, ctx.session_id) .await .map(|()| None)?) } @@ -315,7 +315,7 @@ pub async fn handle_client( .or_else(|e| stream.throw_error(e)) .await?; - let session = cancel_map.get_session(); + let session = cancellation_handler.get_session(); prepare_client_connection(&node, &session, &mut stream).await?; // Before proxy passing, forward to compute whatever data is left in the diff --git a/proxy/src/rate_limiter.rs b/proxy/src/rate_limiter.rs index b26386d159..f0da4ead23 100644 --- a/proxy/src/rate_limiter.rs +++ b/proxy/src/rate_limiter.rs @@ -4,4 +4,4 @@ mod limiter; pub use aimd::Aimd; pub use limit_algorithm::{AimdConfig, Fixed, RateLimitAlgorithm, RateLimiterConfig}; pub use limiter::Limiter; -pub use limiter::{EndpointRateLimiter, RateBucketInfo}; +pub use limiter::{EndpointRateLimiter, RateBucketInfo, RedisRateLimiter}; diff --git a/proxy/src/rate_limiter/limiter.rs b/proxy/src/rate_limiter/limiter.rs index cbae72711c..3181060e2f 100644 --- a/proxy/src/rate_limiter/limiter.rs +++ b/proxy/src/rate_limiter/limiter.rs @@ -22,6 +22,44 @@ use super::{ RateLimiterConfig, }; +pub struct RedisRateLimiter { + data: Vec, + info: &'static [RateBucketInfo], +} + +impl RedisRateLimiter { + pub fn new(info: &'static [RateBucketInfo]) -> Self { + Self { + data: vec![ + RateBucket { + start: Instant::now(), + count: 0, + }; + info.len() + ], + info, + } + } + + /// Check that number of connections is below `max_rps` rps. + pub fn check(&mut self) -> bool { + let now = Instant::now(); + + let should_allow_request = self + .data + .iter_mut() + .zip(self.info) + .all(|(bucket, info)| bucket.should_allow_request(info, now)); + + if should_allow_request { + // only increment the bucket counts if the request will actually be accepted + self.data.iter_mut().for_each(RateBucket::inc); + } + + should_allow_request + } +} + // Simple per-endpoint rate limiter. // // Check that number of connections to the endpoint is below `max_rps` rps. diff --git a/proxy/src/redis.rs b/proxy/src/redis.rs index c2a91bed97..35d6db074e 100644 --- a/proxy/src/redis.rs +++ b/proxy/src/redis.rs @@ -1 +1,2 @@ pub mod notifications; +pub mod publisher; diff --git a/proxy/src/redis/notifications.rs b/proxy/src/redis/notifications.rs index 158884aa17..b8297a206c 100644 --- a/proxy/src/redis/notifications.rs +++ b/proxy/src/redis/notifications.rs @@ -1,38 +1,44 @@ use std::{convert::Infallible, sync::Arc}; use futures::StreamExt; +use pq_proto::CancelKeyData; use redis::aio::PubSub; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; use crate::{ cache::project_info::ProjectInfoCache, + cancellation::{CancelMap, CancellationHandler, NotificationsCancellationHandler}, intern::{ProjectIdInt, RoleNameInt}, }; -const CHANNEL_NAME: &str = "neondb-proxy-ws-updates"; +const CPLANE_CHANNEL_NAME: &str = "neondb-proxy-ws-updates"; +pub(crate) const PROXY_CHANNEL_NAME: &str = "neondb-proxy-to-proxy-updates"; const RECONNECT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(20); const INVALIDATION_LAG: std::time::Duration = std::time::Duration::from_secs(20); -struct ConsoleRedisClient { +struct RedisConsumerClient { client: redis::Client, } -impl ConsoleRedisClient { +impl RedisConsumerClient { pub fn new(url: &str) -> anyhow::Result { let client = redis::Client::open(url)?; Ok(Self { client }) } async fn try_connect(&self) -> anyhow::Result { let mut conn = self.client.get_async_connection().await?.into_pubsub(); - tracing::info!("subscribing to a channel `{CHANNEL_NAME}`"); - conn.subscribe(CHANNEL_NAME).await?; + tracing::info!("subscribing to a channel `{CPLANE_CHANNEL_NAME}`"); + conn.subscribe(CPLANE_CHANNEL_NAME).await?; + tracing::info!("subscribing to a channel `{PROXY_CHANNEL_NAME}`"); + conn.subscribe(PROXY_CHANNEL_NAME).await?; Ok(conn) } } -#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] #[serde(tag = "topic", content = "data")] -enum Notification { +pub(crate) enum Notification { #[serde( rename = "/allowed_ips_updated", deserialize_with = "deserialize_json_string" @@ -45,16 +51,25 @@ enum Notification { deserialize_with = "deserialize_json_string" )] PasswordUpdate { password_update: PasswordUpdate }, + #[serde(rename = "/cancel_session")] + Cancel(CancelSession), } -#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] -struct AllowedIpsUpdate { +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub(crate) struct AllowedIpsUpdate { project_id: ProjectIdInt, } -#[derive(Clone, Debug, Deserialize, Eq, PartialEq)] -struct PasswordUpdate { +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub(crate) struct PasswordUpdate { project_id: ProjectIdInt, role_name: RoleNameInt, } +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub(crate) struct CancelSession { + pub region_id: Option, + pub cancel_key_data: CancelKeyData, + pub session_id: Uuid, +} + fn deserialize_json_string<'de, D, T>(deserializer: D) -> Result where T: for<'de2> serde::Deserialize<'de2>, @@ -64,6 +79,88 @@ where serde_json::from_str(&s).map_err(::custom) } +struct MessageHandler< + C: ProjectInfoCache + Send + Sync + 'static, + H: NotificationsCancellationHandler + Send + Sync + 'static, +> { + cache: Arc, + cancellation_handler: Arc, + region_id: String, +} + +impl< + C: ProjectInfoCache + Send + Sync + 'static, + H: NotificationsCancellationHandler + Send + Sync + 'static, + > MessageHandler +{ + pub fn new(cache: Arc, cancellation_handler: Arc, region_id: String) -> Self { + Self { + cache, + cancellation_handler, + region_id, + } + } + pub fn disable_ttl(&self) { + self.cache.disable_ttl(); + } + pub fn enable_ttl(&self) { + self.cache.enable_ttl(); + } + #[tracing::instrument(skip(self, msg), fields(session_id = tracing::field::Empty))] + async fn handle_message(&self, msg: redis::Msg) -> anyhow::Result<()> { + use Notification::*; + let payload: String = msg.get_payload()?; + tracing::debug!(?payload, "received a message payload"); + + let msg: Notification = match serde_json::from_str(&payload) { + Ok(msg) => msg, + Err(e) => { + tracing::error!("broken message: {e}"); + return Ok(()); + } + }; + tracing::debug!(?msg, "received a message"); + match msg { + Cancel(cancel_session) => { + tracing::Span::current().record( + "session_id", + &tracing::field::display(cancel_session.session_id), + ); + if let Some(cancel_region) = cancel_session.region_id { + // If the message is not for this region, ignore it. + if cancel_region != self.region_id { + return Ok(()); + } + } + // This instance of cancellation_handler doesn't have a RedisPublisherClient so it can't publish the message. + match self + .cancellation_handler + .cancel_session_no_publish(cancel_session.cancel_key_data) + .await + { + Ok(()) => {} + Err(e) => { + tracing::error!("failed to cancel session: {e}"); + } + } + } + _ => { + invalidate_cache(self.cache.clone(), msg.clone()); + // It might happen that the invalid entry is on the way to be cached. + // To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds. + // TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message. + let cache = self.cache.clone(); + tokio::spawn(async move { + tokio::time::sleep(INVALIDATION_LAG).await; + invalidate_cache(cache, msg); + }); + } + } + + Ok(()) + } +} + fn invalidate_cache(cache: Arc, msg: Notification) { use Notification::*; match msg { @@ -74,50 +171,33 @@ fn invalidate_cache(cache: Arc, msg: Notification) { password_update.project_id, password_update.role_name, ), + Cancel(_) => unreachable!("cancel message should be handled separately"), } } -#[tracing::instrument(skip(cache))] -fn handle_message(msg: redis::Msg, cache: Arc) -> anyhow::Result<()> -where - C: ProjectInfoCache + Send + Sync + 'static, -{ - let payload: String = msg.get_payload()?; - tracing::debug!(?payload, "received a message payload"); - - let msg: Notification = match serde_json::from_str(&payload) { - Ok(msg) => msg, - Err(e) => { - tracing::error!("broken message: {e}"); - return Ok(()); - } - }; - tracing::debug!(?msg, "received a message"); - invalidate_cache(cache.clone(), msg.clone()); - // It might happen that the invalid entry is on the way to be cached. - // To make sure that the entry is invalidated, let's repeat the invalidation in INVALIDATION_LAG seconds. - // TODO: include the version (or the timestamp) in the message and invalidate only if the entry is cached before the message. - tokio::spawn(async move { - tokio::time::sleep(INVALIDATION_LAG).await; - invalidate_cache(cache, msg.clone()); - }); - - Ok(()) -} - /// Handle console's invalidation messages. #[tracing::instrument(name = "console_notifications", skip_all)] -pub async fn task_main(url: String, cache: Arc) -> anyhow::Result +pub async fn task_main( + url: String, + cache: Arc, + cancel_map: CancelMap, + region_id: String, +) -> anyhow::Result where C: ProjectInfoCache + Send + Sync + 'static, { cache.enable_ttl(); + let handler = MessageHandler::new( + cache, + Arc::new(CancellationHandler::new(cancel_map, None)), + region_id, + ); loop { - let redis = ConsoleRedisClient::new(&url)?; + let redis = RedisConsumerClient::new(&url)?; let conn = match redis.try_connect().await { Ok(conn) => { - cache.disable_ttl(); + handler.disable_ttl(); conn } Err(e) => { @@ -130,7 +210,7 @@ where }; let mut stream = conn.into_on_message(); while let Some(msg) = stream.next().await { - match handle_message(msg, cache.clone()) { + match handler.handle_message(msg).await { Ok(()) => {} Err(e) => { tracing::error!("failed to handle message: {e}, will try to reconnect"); @@ -138,7 +218,7 @@ where } } } - cache.enable_ttl(); + handler.enable_ttl(); } } @@ -198,6 +278,33 @@ mod tests { } ); + Ok(()) + } + #[test] + fn parse_cancel_session() -> anyhow::Result<()> { + let cancel_key_data = CancelKeyData { + backend_pid: 42, + cancel_key: 41, + }; + let uuid = uuid::Uuid::new_v4(); + let msg = Notification::Cancel(CancelSession { + cancel_key_data, + region_id: None, + session_id: uuid, + }); + let text = serde_json::to_string(&msg)?; + let result: Notification = serde_json::from_str(&text)?; + assert_eq!(msg, result); + + let msg = Notification::Cancel(CancelSession { + cancel_key_data, + region_id: Some("region".to_string()), + session_id: uuid, + }); + let text = serde_json::to_string(&msg)?; + let result: Notification = serde_json::from_str(&text)?; + assert_eq!(msg, result,); + Ok(()) } } diff --git a/proxy/src/redis/publisher.rs b/proxy/src/redis/publisher.rs new file mode 100644 index 0000000000..f85593afdd --- /dev/null +++ b/proxy/src/redis/publisher.rs @@ -0,0 +1,80 @@ +use pq_proto::CancelKeyData; +use redis::AsyncCommands; +use uuid::Uuid; + +use crate::rate_limiter::{RateBucketInfo, RedisRateLimiter}; + +use super::notifications::{CancelSession, Notification, PROXY_CHANNEL_NAME}; + +pub struct RedisPublisherClient { + client: redis::Client, + publisher: Option, + region_id: String, + limiter: RedisRateLimiter, +} + +impl RedisPublisherClient { + pub fn new( + url: &str, + region_id: String, + info: &'static [RateBucketInfo], + ) -> anyhow::Result { + let client = redis::Client::open(url)?; + Ok(Self { + client, + publisher: None, + region_id, + limiter: RedisRateLimiter::new(info), + }) + } + pub async fn try_publish( + &mut self, + cancel_key_data: CancelKeyData, + session_id: Uuid, + ) -> anyhow::Result<()> { + if !self.limiter.check() { + tracing::info!("Rate limit exceeded. Skipping cancellation message"); + return Err(anyhow::anyhow!("Rate limit exceeded")); + } + match self.publish(cancel_key_data, session_id).await { + Ok(()) => return Ok(()), + Err(e) => { + tracing::error!("failed to publish a message: {e}"); + self.publisher = None; + } + } + tracing::info!("Publisher is disconnected. Reconnectiong..."); + self.try_connect().await?; + self.publish(cancel_key_data, session_id).await + } + + async fn publish( + &mut self, + cancel_key_data: CancelKeyData, + session_id: Uuid, + ) -> anyhow::Result<()> { + let conn = self + .publisher + .as_mut() + .ok_or_else(|| anyhow::anyhow!("not connected"))?; + let payload = serde_json::to_string(&Notification::Cancel(CancelSession { + region_id: Some(self.region_id.clone()), + cancel_key_data, + session_id, + }))?; + conn.publish(PROXY_CHANNEL_NAME, payload).await?; + Ok(()) + } + pub async fn try_connect(&mut self) -> anyhow::Result<()> { + match self.client.get_async_connection().await { + Ok(conn) => { + self.publisher = Some(conn); + } + Err(e) => { + tracing::error!("failed to connect to redis: {e}"); + return Err(e.into()); + } + } + Ok(()) + } +} diff --git a/proxy/src/serverless.rs b/proxy/src/serverless.rs index a20600b94a..ee3e91495b 100644 --- a/proxy/src/serverless.rs +++ b/proxy/src/serverless.rs @@ -24,7 +24,7 @@ use crate::metrics::NUM_CLIENT_CONNECTION_GAUGE; use crate::protocol2::{ProxyProtocolAccept, WithClientIp}; use crate::rate_limiter::EndpointRateLimiter; use crate::serverless::backend::PoolingBackend; -use crate::{cancellation::CancelMap, config::ProxyConfig}; +use crate::{cancellation::CancellationHandler, config::ProxyConfig}; use futures::StreamExt; use hyper::{ server::{ @@ -50,6 +50,7 @@ pub async fn task_main( ws_listener: TcpListener, cancellation_token: CancellationToken, endpoint_rate_limiter: Arc, + cancellation_handler: Arc, ) -> anyhow::Result<()> { scopeguard::defer! { info!("websocket server has shut down"); @@ -115,7 +116,7 @@ pub async fn task_main( let backend = backend.clone(); let ws_connections = ws_connections.clone(); let endpoint_rate_limiter = endpoint_rate_limiter.clone(); - + let cancellation_handler = cancellation_handler.clone(); async move { let peer_addr = match client_addr { Some(addr) => addr, @@ -127,9 +128,9 @@ pub async fn task_main( let backend = backend.clone(); let ws_connections = ws_connections.clone(); let endpoint_rate_limiter = endpoint_rate_limiter.clone(); + let cancellation_handler = cancellation_handler.clone(); async move { - let cancel_map = Arc::new(CancelMap::default()); let session_id = uuid::Uuid::new_v4(); request_handler( @@ -137,7 +138,7 @@ pub async fn task_main( config, backend, ws_connections, - cancel_map, + cancellation_handler, session_id, peer_addr.ip(), endpoint_rate_limiter, @@ -205,7 +206,7 @@ async fn request_handler( config: &'static ProxyConfig, backend: Arc, ws_connections: TaskTracker, - cancel_map: Arc, + cancellation_handler: Arc, session_id: uuid::Uuid, peer_addr: IpAddr, endpoint_rate_limiter: Arc, @@ -232,7 +233,7 @@ async fn request_handler( config, ctx, websocket, - cancel_map, + cancellation_handler, host, endpoint_rate_limiter, ) diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index 062dd440b2..24f2bb7e8c 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -1,5 +1,5 @@ use crate::{ - cancellation::CancelMap, + cancellation::CancellationHandler, config::ProxyConfig, context::RequestMonitoring, error::{io_error, ReportableError}, @@ -133,7 +133,7 @@ pub async fn serve_websocket( config: &'static ProxyConfig, mut ctx: RequestMonitoring, websocket: HyperWebsocket, - cancel_map: Arc, + cancellation_handler: Arc, hostname: Option, endpoint_rate_limiter: Arc, ) -> anyhow::Result<()> { @@ -141,7 +141,7 @@ pub async fn serve_websocket( let res = handle_client( config, &mut ctx, - cancel_map, + cancellation_handler, WebSocketRw::new(websocket), ClientMode::Websockets { hostname }, endpoint_rate_limiter, diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index ef41774289..418370c3ab 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -96,5 +96,6 @@ PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = ( "pageserver_evictions_total", "pageserver_evictions_with_low_residence_duration_total", *PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS, + # "pageserver_directory_entries_count", -- only used if above a certain threshold # "pageserver_broken_tenants_count" -- used only for broken ) diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 018fb05201..9dd9956c55 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 018fb052011081dc2733d3118d12e5c36df6eba1 +Subproject commit 9dd9956c55ffbbd9abe77d10382453757fedfcf5 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 6ee78a3c29..ca2def9993 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 6ee78a3c29e33cafd85ba09568b6b5eb031d29b9 +Subproject commit ca2def999368d9df098a637234ad5a9003189463 diff --git a/vendor/postgres-v16 b/vendor/postgres-v16 index 550cdd26d4..9c37a49884 160000 --- a/vendor/postgres-v16 +++ b/vendor/postgres-v16 @@ -1 +1 @@ -Subproject commit 550cdd26d445afdd26b15aa93c8c2f3dc52f8361 +Subproject commit 9c37a4988463a97d9cacb321acf3828b09823269 diff --git a/vendor/revisions.json b/vendor/revisions.json index 91ebb8cb34..72bc0d7e0d 100644 --- a/vendor/revisions.json +++ b/vendor/revisions.json @@ -1,5 +1,5 @@ { - "postgres-v16": "550cdd26d445afdd26b15aa93c8c2f3dc52f8361", - "postgres-v15": "6ee78a3c29e33cafd85ba09568b6b5eb031d29b9", - "postgres-v14": "018fb052011081dc2733d3118d12e5c36df6eba1" + "postgres-v16": "9c37a4988463a97d9cacb321acf3828b09823269", + "postgres-v15": "ca2def999368d9df098a637234ad5a9003189463", + "postgres-v14": "9dd9956c55ffbbd9abe77d10382453757fedfcf5" } diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index 8e9cc43152..e808fabbe7 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -38,7 +38,7 @@ futures-io = { version = "0.3" } futures-sink = { version = "0.3" } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } getrandom = { version = "0.2", default-features = false, features = ["std"] } -hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] } +hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["raw"] } hashbrown-594e8ee84c453af0 = { package = "hashbrown", version = "0.13", features = ["raw"] } hex = { version = "0.4", features = ["serde"] } hmac = { version = "0.12", default-features = false, features = ["reset"] } @@ -91,7 +91,7 @@ cc = { version = "1", default-features = false, features = ["parallel"] } chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "wasmbind"] } either = { version = "1" } getrandom = { version = "0.2", default-features = false, features = ["std"] } -hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", default-features = false, features = ["raw"] } +hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["raw"] } indexmap = { version = "1", default-features = false, features = ["std"] } itertools = { version = "0.10" } libc = { version = "0.2", features = ["extra_traits", "use_std"] }