diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index 57d24063bf..9eff483680 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -99,7 +99,7 @@ jobs: # Set --sparse-ordering option of pytest-order plugin # to ensure tests are running in order of appears in the file. # It's important for test_perf_pgbench.py::test_pgbench_remote_* tests - extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py + extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py --ignore test_runner/performance/test_perf_pgvector_queries.py env: BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }} VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}" @@ -410,14 +410,14 @@ jobs: PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}" BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }} - - name: Benchmark pgvector hnsw queries + - name: Benchmark pgvector queries uses: ./.github/actions/run-python-test-set with: build_type: ${{ env.BUILD_TYPE }} - test_selection: performance + test_selection: performance/test_perf_pgvector_queries.py run_in_parallel: false save_perf_report: ${{ env.SAVE_PERF_REPORT }} - extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_pgvector + extra_params: -m remote_cluster --timeout 21600 env: BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }} VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}" diff --git a/.github/workflows/build-build-tools-image.yml b/.github/workflows/build-build-tools-image.yml index 9aacb09d10..2c994b08ae 100644 --- a/.github/workflows/build-build-tools-image.yml +++ b/.github/workflows/build-build-tools-image.yml @@ -55,7 +55,7 @@ jobs: exit 1 fi - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 # Use custom DOCKER_CONFIG directory to avoid conflicts with default settings # The default value is ~/.docker diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 71ca7329ee..703fc8d145 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -858,7 +858,7 @@ jobs: cache-to: type=registry,ref=neondatabase/compute-node-${{ matrix.version }}:cache-${{ matrix.arch }},mode=max tags: | neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }} - + - name: Build neon extensions test image if: matrix.version == 'v16' uses: docker/build-push-action@v5 @@ -965,7 +965,7 @@ jobs: steps: - name: Checkout - uses: actions/checkout@v1 + uses: actions/checkout@v4 with: fetch-depth: 0 diff --git a/Cargo.toml b/Cargo.toml index dc89c2341b..8fddaaef12 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ num_cpus = "1.15" num-traits = "0.2.15" once_cell = "1.13" opentelemetry = "0.20.0" -opentelemetry-otlp = { version = "0.13.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] } +opentelemetry-otlp = { version = "0.13.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] } opentelemetry-semantic-conventions = "0.12.0" parking_lot = "0.12" parquet = { version = "51.0.0", default-features = false, features = ["zstd"] } @@ -128,7 +128,7 @@ parquet_derive = "51.0.0" pbkdf2 = { version = "0.12.1", features = ["simple", "std"] } pin-project-lite = "0.2" procfs = "0.14" -prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency +prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency prost = "0.11" rand = "0.8" redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] } @@ -184,7 +184,7 @@ tower-service = "0.3.2" tracing = "0.1" tracing-error = "0.2.0" tracing-opentelemetry = "0.21.0" -tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] } +tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] } twox-hash = { version = "1.6.3", default-features = false } url = "2.2" urlencoding = "2.1" diff --git a/Dockerfile.build-tools b/Dockerfile.build-tools index 460b8c996d..e7c61ace0e 100644 --- a/Dockerfile.build-tools +++ b/Dockerfile.build-tools @@ -141,7 +141,7 @@ WORKDIR /home/nonroot # Rust # Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`) -ENV RUSTC_VERSION=1.78.0 +ENV RUSTC_VERSION=1.79.0 ENV RUSTUP_HOME="/home/nonroot/.rustup" ENV PATH="/home/nonroot/.cargo/bin:${PATH}" RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \ diff --git a/Dockerfile.compute-node b/Dockerfile.compute-node index a86fdd0bc3..3a73ac71b0 100644 --- a/Dockerfile.compute-node +++ b/Dockerfile.compute-node @@ -246,8 +246,8 @@ COPY patches/pgvector.patch /pgvector.patch # By default, pgvector Makefile uses `-march=native`. We don't want that, # because we build the images on different machines than where we run them. # Pass OPTFLAGS="" to remove it. -RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.1.tar.gz -O pgvector.tar.gz && \ - echo "fe6c8cb4e0cd1a8cb60f5badf9e1701e0fcabcfc260931c26d01e155c4dd21d1 pgvector.tar.gz" | sha256sum --check && \ +RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.2.tar.gz -O pgvector.tar.gz && \ + echo "617fba855c9bcb41a2a9bc78a78567fd2e147c72afd5bf9d37b31b9591632b30 pgvector.tar.gz" | sha256sum --check && \ mkdir pgvector-src && cd pgvector-src && tar xzf ../pgvector.tar.gz --strip-components=1 -C . && \ patch -p1 < /pgvector.patch && \ make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="" PG_CONFIG=/usr/local/pgsql/bin/pg_config && \ @@ -979,7 +979,7 @@ RUN cd /ext-src/ && for f in *.tar.gz; \ do echo $f; dname=$(echo $f | sed 's/\.tar.*//')-src; \ rm -rf $dname; mkdir $dname; tar xzf $f --strip-components=1 -C $dname \ || exit 1; rm -f $f; done -RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch +RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch # cmake is required for the h3 test RUN apt-get update && apt-get install -y cmake RUN patch -p1 < /ext-src/pg_hintplan.patch diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 9295f091d5..7bf5db5a57 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -735,7 +735,7 @@ fn cli() -> clap::Command { Arg::new("filecache-connstr") .long("filecache-connstr") .default_value( - "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable", + "host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor", ) .value_name("FILECACHE_CONNSTR"), ) diff --git a/libs/pageserver_api/src/keyspace.rs b/libs/pageserver_api/src/keyspace.rs index 12c6dc3a6d..9a61f2ad81 100644 --- a/libs/pageserver_api/src/keyspace.rs +++ b/libs/pageserver_api/src/keyspace.rs @@ -558,6 +558,12 @@ impl KeySpaceRandomAccum { self.ranges.push(range); } + pub fn add_keyspace(&mut self, keyspace: KeySpace) { + for range in keyspace.ranges { + self.add_range(range); + } + } + pub fn to_keyspace(mut self) -> KeySpace { let mut ranges = Vec::new(); if !self.ranges.is_empty() { diff --git a/libs/tracing-utils/Cargo.toml b/libs/tracing-utils/Cargo.toml index b285c9b5b0..512a748124 100644 --- a/libs/tracing-utils/Cargo.toml +++ b/libs/tracing-utils/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true [dependencies] hyper.workspace = true opentelemetry = { workspace = true, features=["rt-tokio"] } -opentelemetry-otlp = { workspace = true, default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] } +opentelemetry-otlp = { workspace = true, default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] } opentelemetry-semantic-conventions.workspace = true reqwest = { workspace = true, default-features = false, features = ["rustls-tls"] } tokio = { workspace = true, features = ["rt", "rt-multi-thread"] } diff --git a/pageserver/src/consumption_metrics.rs b/pageserver/src/consumption_metrics.rs index 540d0d2e8c..18c1a6cd9b 100644 --- a/pageserver/src/consumption_metrics.rs +++ b/pageserver/src/consumption_metrics.rs @@ -2,10 +2,9 @@ //! and push them to a HTTP endpoint. use crate::context::{DownloadBehavior, RequestContext}; use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME}; +use crate::tenant::size::CalculateSyntheticSizeError; use crate::tenant::tasks::BackgroundLoopKind; -use crate::tenant::{ - mgr::TenantManager, LogicalSizeCalculationCause, PageReconstructError, Tenant, -}; +use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant}; use camino::Utf8PathBuf; use consumption_metrics::EventType; use pageserver_api::models::TenantState; @@ -350,19 +349,12 @@ async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &Re // Same for the loop that fetches computed metrics. // By using the same limiter, we centralize metrics collection for "start" and "finished" counters, // which turns out is really handy to understand the system. - let Err(e) = tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await else { - return; - }; - - // this error can be returned if timeline is shutting down, but it does not - // mean the synthetic size worker should terminate. - let shutting_down = matches!( - e.downcast_ref::(), - Some(PageReconstructError::Cancelled) - ); - - if !shutting_down { - let tenant_shard_id = tenant.tenant_shard_id(); - error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}"); + match tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await { + Ok(_) => {} + Err(CalculateSyntheticSizeError::Cancelled) => {} + Err(e) => { + let tenant_shard_id = tenant.tenant_shard_id(); + error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}"); + } } } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 12d02c52fe..657708c0d6 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -1135,7 +1135,10 @@ async fn tenant_size_handler( &ctx, ) .await - .map_err(ApiError::InternalServerError)?; + .map_err(|e| match e { + crate::tenant::size::CalculateSyntheticSizeError::Cancelled => ApiError::ShuttingDown, + other => ApiError::InternalServerError(anyhow::anyhow!(other)), + })?; let mut sizes = None; let accepts_html = headers @@ -1143,9 +1146,7 @@ async fn tenant_size_handler( .map(|v| v == "text/html") .unwrap_or_default(); if !inputs_only.unwrap_or(false) { - let storage_model = inputs - .calculate_model() - .map_err(ApiError::InternalServerError)?; + let storage_model = inputs.calculate_model(); let size = storage_model.calculate(); // If request header expects html, return html diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 538b64aa60..b505c09d07 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -510,11 +510,24 @@ pub(crate) enum GcError { #[error(transparent)] Remote(anyhow::Error), + // An error reading while calculating GC cutoffs + #[error(transparent)] + GcCutoffs(PageReconstructError), + // If GC was invoked for a particular timeline, this error means it didn't exist #[error("timeline not found")] TimelineNotFound, } +impl From for GcError { + fn from(value: PageReconstructError) -> Self { + match value { + PageReconstructError::Cancelled => Self::TimelineCancelled, + other => Self::GcCutoffs(other), + } + } +} + impl Tenant { /// Yet another helper for timeline initialization. /// @@ -1034,7 +1047,6 @@ impl Tenant { remote_metadata, TimelineResources { remote_client, - deletion_queue_client: self.deletion_queue_client.clone(), timeline_get_throttle: self.timeline_get_throttle.clone(), }, ctx, @@ -1060,7 +1072,6 @@ impl Tenant { timeline_id, &index_part.metadata, remote_timeline_client, - self.deletion_queue_client.clone(), ) .instrument(tracing::info_span!("timeline_delete", %timeline_id)) .await @@ -2922,17 +2933,9 @@ impl Tenant { .checked_sub(horizon) .unwrap_or(Lsn(0)); - let res = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await; - - match res { - Ok(cutoffs) => { - let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs); - assert!(old.is_none()); - } - Err(e) => { - tracing::warn!(timeline_id = %timeline.timeline_id, "ignoring failure to find gc cutoffs: {e:#}"); - } - } + let cutoffs = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await?; + let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs); + assert!(old.is_none()); } if !self.is_active() || self.cancel.is_cancelled() { @@ -3445,7 +3448,6 @@ impl Tenant { ); TimelineResources { remote_client, - deletion_queue_client: self.deletion_queue_client.clone(), timeline_get_throttle: self.timeline_get_throttle.clone(), } } @@ -3555,7 +3557,7 @@ impl Tenant { cause: LogicalSizeCalculationCause, cancel: &CancellationToken, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> Result { let logical_sizes_at_once = self .conf .concurrent_tenant_size_logical_size_queries @@ -3570,8 +3572,8 @@ impl Tenant { // See more for on the issue #2748 condenced out of the initial PR review. let mut shared_cache = tokio::select! { locked = self.cached_logical_sizes.lock() => locked, - _ = cancel.cancelled() => anyhow::bail!("cancelled"), - _ = self.cancel.cancelled() => anyhow::bail!("tenant is shutting down"), + _ = cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled), + _ = self.cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled), }; size::gather_inputs( @@ -3595,10 +3597,10 @@ impl Tenant { cause: LogicalSizeCalculationCause, cancel: &CancellationToken, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> Result { let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?; - let size = inputs.calculate()?; + let size = inputs.calculate(); self.set_cached_synthetic_size(size); @@ -4043,6 +4045,7 @@ mod tests { use crate::repository::{Key, Value}; use crate::tenant::harness::*; use crate::tenant::timeline::CompactFlags; + use crate::walrecord::NeonWalRecord; use crate::DEFAULT_PG_VERSION; use bytes::{Bytes, BytesMut}; use hex_literal::hex; @@ -6756,8 +6759,8 @@ mod tests { } #[tokio::test] - async fn test_simple_bottom_most_compaction() -> anyhow::Result<()> { - let harness = TenantHarness::create("test_simple_bottom_most_compaction")?; + async fn test_simple_bottom_most_compaction_images() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_simple_bottom_most_compaction_images")?; let (tenant, ctx) = harness.load().await; fn get_key(id: u32) -> Key { @@ -6912,4 +6915,79 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_neon_test_record() -> anyhow::Result<()> { + let harness = TenantHarness::create("test_neon_test_record")?; + let (tenant, ctx) = harness.load().await; + + fn get_key(id: u32) -> Key { + // using aux key here b/c they are guaranteed to be inside `collect_keyspace`. + let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap(); + key.field6 = id; + key + } + + let delta1 = vec![ + ( + get_key(1), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append(",0x20")), + ), + ( + get_key(1), + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append(",0x30")), + ), + (get_key(2), Lsn(0x10), Value::Image("0x10".into())), + ( + get_key(2), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_append(",0x20")), + ), + ( + get_key(2), + Lsn(0x30), + Value::WalRecord(NeonWalRecord::wal_append(",0x30")), + ), + (get_key(3), Lsn(0x10), Value::Image("0x10".into())), + ( + get_key(3), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_clear()), + ), + (get_key(4), Lsn(0x10), Value::Image("0x10".into())), + ( + get_key(4), + Lsn(0x20), + Value::WalRecord(NeonWalRecord::wal_init()), + ), + ]; + let image1 = vec![(get_key(1), "0x10".into())]; + + let tline = tenant + .create_test_timeline_with_layers( + TIMELINE_ID, + Lsn(0x10), + DEFAULT_PG_VERSION, + &ctx, + vec![delta1], // delta layers + vec![(Lsn(0x10), image1)], // image layers + Lsn(0x50), + ) + .await?; + + assert_eq!( + tline.get(get_key(1), Lsn(0x50), &ctx).await?, + Bytes::from_static(b"0x10,0x20,0x30") + ); + assert_eq!( + tline.get(get_key(2), Lsn(0x50), &ctx).await?, + Bytes::from_static(b"0x10,0x20,0x30") + ); + // assert_eq!(tline.get(get_key(3), Lsn(0x50), &ctx).await?, Bytes::new()); + // assert_eq!(tline.get(get_key(4), Lsn(0x50), &ctx).await?, Bytes::new()); + + Ok(()) + } } diff --git a/pageserver/src/tenant/secondary/downloader.rs b/pageserver/src/tenant/secondary/downloader.rs index 62803c7838..24176ecf19 100644 --- a/pageserver/src/tenant/secondary/downloader.rs +++ b/pageserver/src/tenant/secondary/downloader.rs @@ -513,7 +513,7 @@ impl<'a> TenantDownloader<'a> { // cover our access to local storage. let Ok(_guard) = self.secondary_state.gate.enter() else { // Shutting down - return Ok(()); + return Err(UpdateError::Cancelled); }; let tenant_shard_id = self.secondary_state.get_tenant_shard_id(); @@ -846,7 +846,7 @@ impl<'a> TenantDownloader<'a> { for layer in timeline.layers { if self.secondary_state.cancel.is_cancelled() { tracing::debug!("Cancelled -- dropping out of layer loop"); - return Ok(()); + return Err(UpdateError::Cancelled); } // Existing on-disk layers: just update their access time. diff --git a/pageserver/src/tenant/size.rs b/pageserver/src/tenant/size.rs index 64fff5536c..cdd5b0cbe7 100644 --- a/pageserver/src/tenant/size.rs +++ b/pageserver/src/tenant/size.rs @@ -3,7 +3,6 @@ use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use anyhow::{bail, Context}; use tokio::sync::oneshot::error::RecvError; use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; @@ -11,7 +10,7 @@ use tokio_util::sync::CancellationToken; use crate::context::RequestContext; use crate::pgdatadir_mapping::CalculateLogicalSizeError; -use super::{LogicalSizeCalculationCause, Tenant}; +use super::{GcError, LogicalSizeCalculationCause, Tenant}; use crate::tenant::Timeline; use utils::id::TimelineId; use utils::lsn::Lsn; @@ -43,6 +42,44 @@ pub struct SegmentMeta { pub kind: LsnKind, } +#[derive(thiserror::Error, Debug)] +pub(crate) enum CalculateSyntheticSizeError { + /// Something went wrong internally to the calculation of logical size at a particular branch point + #[error("Failed to calculated logical size on timeline {timeline_id} at {lsn}: {error}")] + LogicalSize { + timeline_id: TimelineId, + lsn: Lsn, + error: CalculateLogicalSizeError, + }, + + /// Something went wrong internally when calculating GC parameters at start of size calculation + #[error(transparent)] + GcInfo(GcError), + + /// Totally unexpected errors, like panics joining a task + #[error(transparent)] + Fatal(anyhow::Error), + + /// The LSN we are trying to calculate a size at no longer exists at the point we query it + #[error("Could not find size at {lsn} in timeline {timeline_id}")] + LsnNotFound { timeline_id: TimelineId, lsn: Lsn }, + + /// Tenant shut down while calculating size + #[error("Cancelled")] + Cancelled, +} + +impl From for CalculateSyntheticSizeError { + fn from(value: GcError) -> Self { + match value { + GcError::TenantCancelled | GcError::TimelineCancelled => { + CalculateSyntheticSizeError::Cancelled + } + other => CalculateSyntheticSizeError::GcInfo(other), + } + } +} + impl SegmentMeta { fn size_needed(&self) -> bool { match self.kind { @@ -116,12 +153,9 @@ pub(super) async fn gather_inputs( cause: LogicalSizeCalculationCause, cancel: &CancellationToken, ctx: &RequestContext, -) -> anyhow::Result { +) -> Result { // refresh is needed to update gc related pitr_cutoff and horizon_cutoff - tenant - .refresh_gc_info(cancel, ctx) - .await - .context("Failed to refresh gc_info before gathering inputs")?; + tenant.refresh_gc_info(cancel, ctx).await?; // Collect information about all the timelines let mut timelines = tenant.list_timelines(); @@ -327,6 +361,12 @@ pub(super) async fn gather_inputs( ) .await?; + if tenant.cancel.is_cancelled() { + // If we're shutting down, return an error rather than a sparse result that might include some + // timelines from before we started shutting down + return Err(CalculateSyntheticSizeError::Cancelled); + } + Ok(ModelInputs { segments, timeline_inputs, @@ -345,7 +385,7 @@ async fn fill_logical_sizes( logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>, cause: LogicalSizeCalculationCause, ctx: &RequestContext, -) -> anyhow::Result<()> { +) -> Result<(), CalculateSyntheticSizeError> { let timeline_hash: HashMap> = HashMap::from_iter( timelines .iter() @@ -387,7 +427,7 @@ async fn fill_logical_sizes( } // Perform the size lookups - let mut have_any_error = false; + let mut have_any_error = None; while let Some(res) = joinset.join_next().await { // each of these come with Result, JoinError> // because of spawn + spawn_blocking @@ -398,21 +438,36 @@ async fn fill_logical_sizes( Err(join_error) => { // cannot really do anything, as this panic is likely a bug error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}"); - have_any_error = true; + + have_any_error = Some(CalculateSyntheticSizeError::Fatal( + anyhow::anyhow!(join_error) + .context("task that calls spawn_ondemand_logical_size_calculation"), + )); } Ok(Err(recv_result_error)) => { // cannot really do anything, as this panic is likely a bug error!("failed to receive logical size query result: {recv_result_error:#}"); - have_any_error = true; + have_any_error = Some(CalculateSyntheticSizeError::Fatal( + anyhow::anyhow!(recv_result_error) + .context("Receiving logical size query result"), + )); } Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => { - if !matches!(error, CalculateLogicalSizeError::Cancelled) { + if matches!(error, CalculateLogicalSizeError::Cancelled) { + // Skip this: it's okay if one timeline among many is shutting down while we + // calculate inputs for the overall tenant. + continue; + } else { warn!( timeline_id=%timeline.timeline_id, "failed to calculate logical size at {lsn}: {error:#}" ); + have_any_error = Some(CalculateSyntheticSizeError::LogicalSize { + timeline_id: timeline.timeline_id, + lsn, + error, + }); } - have_any_error = true; } Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => { debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated"); @@ -426,10 +481,10 @@ async fn fill_logical_sizes( // prune any keys not needed anymore; we record every used key and added key. logical_size_cache.retain(|key, _| sizes_needed.contains_key(key)); - if have_any_error { + if let Some(error) = have_any_error { // we cannot complete this round, because we are missing data. // we have however cached all we were able to request calculation on. - anyhow::bail!("failed to calculate some logical_sizes"); + return Err(error); } // Insert the looked up sizes to the Segments @@ -444,32 +499,29 @@ async fn fill_logical_sizes( if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) { seg.segment.size = Some(*size); } else { - bail!("could not find size at {} in timeline {}", lsn, timeline_id); + return Err(CalculateSyntheticSizeError::LsnNotFound { timeline_id, lsn }); } } Ok(()) } impl ModelInputs { - pub fn calculate_model(&self) -> anyhow::Result { + pub fn calculate_model(&self) -> tenant_size_model::StorageModel { // Convert SegmentMetas into plain Segments - let storage = StorageModel { + StorageModel { segments: self .segments .iter() .map(|seg| seg.segment.clone()) .collect(), - }; - - Ok(storage) + } } // calculate total project size - pub fn calculate(&self) -> anyhow::Result { - let storage = self.calculate_model()?; + pub fn calculate(&self) -> u64 { + let storage = self.calculate_model(); let sizes = storage.calculate(); - - Ok(sizes.total_size) + sizes.total_size } } @@ -656,7 +708,7 @@ fn verify_size_for_multiple_branches() { "#; let inputs: ModelInputs = serde_json::from_str(doc).unwrap(); - assert_eq!(inputs.calculate().unwrap(), 37_851_408); + assert_eq!(inputs.calculate(), 37_851_408); } #[test] @@ -711,7 +763,7 @@ fn verify_size_for_one_branch() { let model: ModelInputs = serde_json::from_str(doc).unwrap(); - let res = model.calculate_model().unwrap().calculate(); + let res = model.calculate_model().calculate(); println!("calculated synthetic size: {}", res.total_size); println!("result: {:?}", serde_json::to_string(&res.segments)); diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 0b3f841ccf..9607546ce0 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -318,7 +318,7 @@ pub(crate) struct LayerFringe { #[derive(Debug)] struct LayerKeyspace { layer: ReadableLayer, - target_keyspace: Vec, + target_keyspace: KeySpaceRandomAccum, } impl LayerFringe { @@ -342,17 +342,13 @@ impl LayerFringe { _, LayerKeyspace { layer, - target_keyspace, + mut target_keyspace, }, - )) => { - let mut keyspace = KeySpaceRandomAccum::new(); - for ks in target_keyspace { - for part in ks.ranges { - keyspace.add_range(part); - } - } - Some((layer, keyspace.consume_keyspace(), read_desc.lsn_range)) - } + )) => Some(( + layer, + target_keyspace.consume_keyspace(), + read_desc.lsn_range, + )), None => unreachable!("fringe internals are always consistent"), } } @@ -367,16 +363,18 @@ impl LayerFringe { let entry = self.layers.entry(layer_id.clone()); match entry { Entry::Occupied(mut entry) => { - entry.get_mut().target_keyspace.push(keyspace); + entry.get_mut().target_keyspace.add_keyspace(keyspace); } Entry::Vacant(entry) => { self.planned_reads_by_lsn.push(ReadDesc { lsn_range, layer_id: layer_id.clone(), }); + let mut accum = KeySpaceRandomAccum::new(); + accum.add_keyspace(keyspace); entry.insert(LayerKeyspace { layer, - target_keyspace: vec![keyspace], + target_keyspace: accum, }); } } diff --git a/pageserver/src/tenant/storage_layer/delta_layer.rs b/pageserver/src/tenant/storage_layer/delta_layer.rs index eb7cf81643..5e01ecd71d 100644 --- a/pageserver/src/tenant/storage_layer/delta_layer.rs +++ b/pageserver/src/tenant/storage_layer/delta_layer.rs @@ -219,7 +219,6 @@ pub struct DeltaLayerInner { // values copied from summary index_start_blk: u32, index_root_blk: u32, - lsn_range: Range, file: VirtualFile, file_id: FileId, @@ -785,7 +784,6 @@ impl DeltaLayerInner { file_id, index_start_blk: actual_summary.index_start_blk, index_root_blk: actual_summary.index_root_blk, - lsn_range: actual_summary.lsn_range, max_vectored_read_bytes, })) } @@ -911,7 +909,7 @@ impl DeltaLayerInner { let reads = Self::plan_reads( &keyspace, - lsn_range, + lsn_range.clone(), data_end_offset, index_reader, planner, @@ -924,7 +922,7 @@ impl DeltaLayerInner { self.do_reads_and_update_state(reads, reconstruct_state, ctx) .await; - reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start); + reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start); Ok(()) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 18b3bb61f9..323a972a73 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -62,6 +62,7 @@ use std::{ ops::ControlFlow, }; +use crate::metrics::GetKind; use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS; use crate::{ aux_file::AuxFileSizeEstimator, @@ -75,7 +76,6 @@ use crate::{ disk_usage_eviction_task::DiskUsageEvictionInfo, pgdatadir_mapping::CollectKeySpaceError, }; -use crate::{deletion_queue::DeletionQueueClient, metrics::GetKind}; use crate::{ disk_usage_eviction_task::finite_f32, tenant::storage_layer::{ @@ -205,7 +205,6 @@ fn drop_wlock(rlock: tokio::sync::RwLockWriteGuard<'_, T>) { /// The outward-facing resources required to build a Timeline pub struct TimelineResources { pub remote_client: RemoteTimelineClient, - pub deletion_queue_client: DeletionQueueClient, pub timeline_get_throttle: Arc< crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>, >, @@ -4843,7 +4842,7 @@ impl Timeline { pitr: Duration, cancel: &CancellationToken, ctx: &RequestContext, - ) -> anyhow::Result { + ) -> Result { let _timer = self .metrics .find_gc_cutoffs_histo diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 5ca8544d49..441298f3e9 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -11,7 +11,6 @@ use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint}; use crate::{ config::PageServerConf, - deletion_queue::DeletionQueueClient, task_mgr::{self, TaskKind}, tenant::{ metadata::TimelineMetadata, @@ -263,7 +262,6 @@ impl DeleteTimelineFlow { timeline_id: TimelineId, local_metadata: &TimelineMetadata, remote_client: RemoteTimelineClient, - deletion_queue_client: DeletionQueueClient, ) -> anyhow::Result<()> { // Note: here we even skip populating layer map. Timeline is essentially uninitialized. // RemoteTimelineClient is the only functioning part. @@ -274,7 +272,6 @@ impl DeleteTimelineFlow { None, // Ancestor is not needed for deletion. TimelineResources { remote_client, - deletion_queue_client, timeline_get_throttle: tenant.timeline_get_throttle.clone(), }, // Important. We dont pass ancestor above because it can be missing. diff --git a/pageserver/src/walrecord.rs b/pageserver/src/walrecord.rs index 205f8dee4d..62a3a91b0b 100644 --- a/pageserver/src/walrecord.rs +++ b/pageserver/src/walrecord.rs @@ -49,6 +49,19 @@ pub enum NeonWalRecord { file_path: String, content: Option, }, + + /// A testing record for unit testing purposes. It supports append data to an existing image, or clear it. + #[cfg(test)] + Test { + /// Append a string to the image. + append: String, + /// Clear the image before appending. + clear: bool, + /// Treat this record as an init record. `clear` should be set to true if this field is set + /// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and + /// its references in `timeline.rs`. + will_init: bool, + }, } impl NeonWalRecord { @@ -58,11 +71,39 @@ impl NeonWalRecord { // If you change this function, you'll also need to change ValueBytes::will_init match self { NeonWalRecord::Postgres { will_init, rec: _ } => *will_init, - + #[cfg(test)] + NeonWalRecord::Test { will_init, .. } => *will_init, // None of the special neon record types currently initialize the page _ => false, } } + + #[cfg(test)] + pub(crate) fn wal_append(s: impl AsRef) -> Self { + Self::Test { + append: s.as_ref().to_string(), + clear: false, + will_init: false, + } + } + + #[cfg(test)] + pub(crate) fn wal_clear() -> Self { + Self::Test { + append: "".to_string(), + clear: true, + will_init: false, + } + } + + #[cfg(test)] + pub(crate) fn wal_init() -> Self { + Self::Test { + append: "".to_string(), + clear: true, + will_init: true, + } + } } /// DecodedBkpBlock represents per-page data contained in a WAL record. diff --git a/pageserver/src/walredo/apply_neon.rs b/pageserver/src/walredo/apply_neon.rs index 24e8d8b01c..facf01004c 100644 --- a/pageserver/src/walredo/apply_neon.rs +++ b/pageserver/src/walredo/apply_neon.rs @@ -244,6 +244,20 @@ pub(crate) fn apply_in_neon( let mut writer = page.writer(); dir.ser_into(&mut writer)?; } + #[cfg(test)] + NeonWalRecord::Test { + append, + clear, + will_init, + } => { + if *will_init { + assert!(*clear, "init record must be clear to ensure correctness"); + } + if *clear { + page.clear(); + } + page.put_slice(append.as_bytes()); + } } Ok(()) } diff --git a/patches/pgvector.patch b/patches/pgvector.patch index 84ac6644c5..3e1ffcaaaf 100644 --- a/patches/pgvector.patch +++ b/patches/pgvector.patch @@ -1,19 +1,8 @@ -From 0b0194a57bd0f3598bd57dbedd0df3932330169d Mon Sep 17 00:00:00 2001 -From: Heikki Linnakangas -Date: Fri, 2 Feb 2024 22:26:45 +0200 -Subject: [PATCH 1/1] Make v0.6.0 work with Neon - -Now that the WAL-logging happens as a separate step at the end of the -build, we need a few neon-specific hints to make it work. ---- - src/hnswbuild.c | 36 ++++++++++++++++++++++++++++++++++++ - 1 file changed, 36 insertions(+) - diff --git a/src/hnswbuild.c b/src/hnswbuild.c -index 680789b..ec54dea 100644 +index dcfb2bd..d5189ee 100644 --- a/src/hnswbuild.c +++ b/src/hnswbuild.c -@@ -840,9 +840,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc) +@@ -860,9 +860,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc) hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false); @@ -31,7 +20,7 @@ index 680789b..ec54dea 100644 /* Close relations within worker */ index_close(indexRel, indexLockmode); table_close(heapRel, heapLockmode); -@@ -1089,13 +1097,41 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo, +@@ -1117,12 +1125,38 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo, SeedRandom(42); #endif @@ -43,14 +32,13 @@ index 680789b..ec54dea 100644 BuildGraph(buildstate, forkNum); +- if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM) +#ifdef NEON_SMGR + smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index)); +#endif + - if (RelationNeedsWAL(index)) -+ { - log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocks(index), true); - ++ if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM) { + log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocksInFork(index, forkNum), true); +#ifdef NEON_SMGR + { +#if PG_VERSION_NUM >= 160000 @@ -60,7 +48,7 @@ index 680789b..ec54dea 100644 +#endif + + SetLastWrittenLSNForBlockRange(XactLastRecEnd, rlocator, -+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index)); ++ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index)); + SetLastWrittenLSNForRelation(XactLastRecEnd, rlocator, MAIN_FORKNUM); + } +#endif @@ -69,10 +57,6 @@ index 680789b..ec54dea 100644 +#ifdef NEON_SMGR + smgr_end_unlogged_build(RelationGetSmgr(index)); +#endif -+ + FreeBuildState(buildstate); } - --- -2.39.2 - diff --git a/proxy/src/rate_limiter/limit_algorithm/aimd.rs b/proxy/src/rate_limiter/limit_algorithm/aimd.rs index ccc9c42420..b39740bb21 100644 --- a/proxy/src/rate_limiter/limit_algorithm/aimd.rs +++ b/proxy/src/rate_limiter/limit_algorithm/aimd.rs @@ -1,5 +1,3 @@ -use std::usize; - use super::{LimitAlgorithm, Outcome, Sample}; /// Loss-based congestion avoidance. diff --git a/proxy/src/scram/messages.rs b/proxy/src/scram/messages.rs index f9372540ca..cf677a3334 100644 --- a/proxy/src/scram/messages.rs +++ b/proxy/src/scram/messages.rs @@ -32,8 +32,6 @@ pub struct ClientFirstMessage<'a> { pub bare: &'a str, /// Channel binding mode. pub cbind_flag: ChannelBinding<&'a str>, - /// (Client username)[]. - pub username: &'a str, /// Client nonce. pub nonce: &'a str, } @@ -58,6 +56,14 @@ impl<'a> ClientFirstMessage<'a> { // In theory, these might be preceded by "reserved-mext" (i.e. "m=") let username = parts.next()?.strip_prefix("n=")?; + + // https://github.com/postgres/postgres/blob/f83908798f78c4cafda217ca875602c88ea2ae28/src/backend/libpq/auth-scram.c#L13-L14 + if !username.is_empty() { + tracing::warn!(username, "scram username provided, but is not expected") + // TODO(conrad): + // return None; + } + let nonce = parts.next()?.strip_prefix("r=")?; // Validate but ignore auth extensions @@ -66,7 +72,6 @@ impl<'a> ClientFirstMessage<'a> { Some(Self { bare, cbind_flag, - username, nonce, }) } @@ -188,19 +193,18 @@ mod tests { // (Almost) real strings captured during debug sessions let cases = [ - (NotSupportedClient, "n,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"), - (NotSupportedServer, "y,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"), + (NotSupportedClient, "n,,n=,r=t8JwklwKecDLwSsA72rHmVju"), + (NotSupportedServer, "y,,n=,r=t8JwklwKecDLwSsA72rHmVju"), ( Required("tls-server-end-point"), - "p=tls-server-end-point,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju", + "p=tls-server-end-point,,n=,r=t8JwklwKecDLwSsA72rHmVju", ), ]; for (cb, input) in cases { let msg = ClientFirstMessage::parse(input).unwrap(); - assert_eq!(msg.bare, "n=pepe,r=t8JwklwKecDLwSsA72rHmVju"); - assert_eq!(msg.username, "pepe"); + assert_eq!(msg.bare, "n=,r=t8JwklwKecDLwSsA72rHmVju"); assert_eq!(msg.nonce, "t8JwklwKecDLwSsA72rHmVju"); assert_eq!(msg.cbind_flag, cb); } @@ -208,14 +212,13 @@ mod tests { #[test] fn parse_client_first_message_with_invalid_gs2_authz() { - assert!(ClientFirstMessage::parse("n,authzid,n=user,r=nonce").is_none()) + assert!(ClientFirstMessage::parse("n,authzid,n=,r=nonce").is_none()) } #[test] fn parse_client_first_message_with_extra_params() { - let msg = ClientFirstMessage::parse("n,,n=user,r=nonce,a=foo,b=bar,c=baz").unwrap(); - assert_eq!(msg.bare, "n=user,r=nonce,a=foo,b=bar,c=baz"); - assert_eq!(msg.username, "user"); + let msg = ClientFirstMessage::parse("n,,n=,r=nonce,a=foo,b=bar,c=baz").unwrap(); + assert_eq!(msg.bare, "n=,r=nonce,a=foo,b=bar,c=baz"); assert_eq!(msg.nonce, "nonce"); assert_eq!(msg.cbind_flag, ChannelBinding::NotSupportedClient); } @@ -223,9 +226,9 @@ mod tests { #[test] fn parse_client_first_message_with_extra_params_invalid() { // must be of the form `=<...>` - assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,abc=foo").is_none()); - assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,1=foo").is_none()); - assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,a").is_none()); + assert!(ClientFirstMessage::parse("n,,n=,r=nonce,abc=foo").is_none()); + assert!(ClientFirstMessage::parse("n,,n=,r=nonce,1=foo").is_none()); + assert!(ClientFirstMessage::parse("n,,n=,r=nonce,a").is_none()); } #[test] diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 214de0a77d..dcae25a287 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.78.0" +channel = "1.79.0" profile = "default" # The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy. # https://rust-lang.github.io/rustup/concepts/profiles.html diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 1e81b5c5a2..cf6a95bf0b 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -2409,11 +2409,17 @@ impl Service { (detach_waiters, shard_ids, node.clone()) }; - if let Err(e) = self.await_waiters(detach_waiters, RECONCILE_TIMEOUT).await { - // Failing to detach shouldn't hold up deletion, e.g. if a node is offline we should be able - // to use some other node to run the remote deletion. - tracing::warn!("Failed to detach some locations: {e}"); - } + // This reconcile wait can fail in a few ways: + // A there is a very long queue for the reconciler semaphore + // B some pageserver is failing to handle a detach promptly + // C some pageserver goes offline right at the moment we send it a request. + // + // A and C are transient: the semaphore will eventually become available, and once a node is marked offline + // the next attempt to reconcile will silently skip detaches for an offline node and succeed. If B happens, + // it's a bug, and needs resolving at the pageserver level (we shouldn't just leave attachments behind while + // deleting the underlying data). + self.await_waiters(detach_waiters, RECONCILE_TIMEOUT) + .await?; let locations = shard_ids .into_iter() @@ -2431,13 +2437,11 @@ impl Service { for result in results { match result { Ok(StatusCode::ACCEPTED) => { - // This could happen if we failed detach above, and hit a pageserver where the tenant - // is still attached: it will accept the deletion in the background - tracing::warn!( - "Unexpectedly still attached on {}, client should retry", + // This should never happen: we waited for detaches to finish above + return Err(ApiError::InternalServerError(anyhow::anyhow!( + "Unexpectedly still attached on {}", node - ); - return Ok(StatusCode::ACCEPTED); + ))); } Ok(_) => {} Err(mgmt_api::Error::Cancelled) => { diff --git a/test_runner/fixtures/pageserver/allowed_errors.py b/test_runner/fixtures/pageserver/allowed_errors.py index ef412cade7..147d5705d3 100755 --- a/test_runner/fixtures/pageserver/allowed_errors.py +++ b/test_runner/fixtures/pageserver/allowed_errors.py @@ -94,8 +94,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = ( ".*WARN.*path=/v1/utilization .*request was dropped before completing", # Can happen during shutdown ".*scheduling deletion on drop failed: queue is in state Stopped.*", - # Can happen during shutdown - ".*ignoring failure to find gc cutoffs: timeline shutting down.*", ) diff --git a/test_runner/performance/pgvector/halfvec_build.sql b/test_runner/performance/pgvector/halfvec_build.sql new file mode 100644 index 0000000000..7e923e4bde --- /dev/null +++ b/test_runner/performance/pgvector/halfvec_build.sql @@ -0,0 +1,15 @@ +DROP TABLE IF EXISTS halfvec_test_table; + +CREATE TABLE halfvec_test_table ( + _id text NOT NULL, + title text, + text text, + embeddings halfvec(1536), + PRIMARY KEY (_id) +); + +INSERT INTO halfvec_test_table (_id, title, text, embeddings) +SELECT _id, title, text, embeddings::halfvec +FROM documents; + +CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128); \ No newline at end of file diff --git a/test_runner/performance/pgvector/pgbench_custom_script_pgvector_halfvec_queries.sql b/test_runner/performance/pgvector/pgbench_custom_script_pgvector_halfvec_queries.sql new file mode 100644 index 0000000000..70d0c18149 --- /dev/null +++ b/test_runner/performance/pgvector/pgbench_custom_script_pgvector_halfvec_queries.sql @@ -0,0 +1,13 @@ +-- run with pooled connection +-- pgbench -T 300 -c 100 -j20 -f pgbench_halfvec_queries.sql -postgresql://neondb_owner:@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require" + +with x (x) as ( + select "embeddings" as x + from halfvec_test_table + TABLESAMPLE SYSTEM (1) + LIMIT 1 +) +SELECT title, "embeddings" <=> (select x from x) as distance +FROM halfvec_test_table +ORDER BY 2 +LIMIT 30; \ No newline at end of file diff --git a/test_runner/performance/pgvector/pgbench_hnsw_queries.sql b/test_runner/performance/pgvector/pgbench_hnsw_queries.sql deleted file mode 100644 index 5034063c1b..0000000000 --- a/test_runner/performance/pgvector/pgbench_hnsw_queries.sql +++ /dev/null @@ -1,13 +0,0 @@ --- run with pooled connection --- pgbench -T 300 -c 100 -j20 -f pgbench_hnsw_queries.sql -postgresql://neondb_owner:@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require" - -with x (x) as ( - select "embeddings" as x - from hnsw_test_table - TABLESAMPLE SYSTEM (1) - LIMIT 1 -) -SELECT title, "embeddings" <=> (select x from x) as distance -FROM hnsw_test_table -ORDER BY 2 -LIMIT 30; diff --git a/test_runner/performance/test_perf_olap.py b/test_runner/performance/test_perf_olap.py index 2367676e67..aaa2f8fec2 100644 --- a/test_runner/performance/test_perf_olap.py +++ b/test_runner/performance/test_perf_olap.py @@ -106,6 +106,7 @@ QUERIES: Tuple[LabelledQuery, ...] = ( # Disable auto formatting for the list of queries so that it's easier to read # fmt: off PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = ( + LabelledQuery("PGVPREP", r"ALTER EXTENSION VECTOR UPDATE;"), LabelledQuery("PGV0", r"DROP TABLE IF EXISTS hnsw_test_table;"), LabelledQuery("PGV1", r"CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;"), LabelledQuery("PGV2", r"INSERT INTO hnsw_test_table SELECT * FROM documents;"), @@ -115,6 +116,10 @@ PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = ( LabelledQuery("PGV6", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);"), LabelledQuery("PGV7", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);"), LabelledQuery("PGV8", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);"), + LabelledQuery("PGV9", r"DROP TABLE IF EXISTS halfvec_test_table;"), + LabelledQuery("PGV10", r"CREATE TABLE halfvec_test_table (_id text NOT NULL, title text, text text, embeddings halfvec(1536), PRIMARY KEY (_id));"), + LabelledQuery("PGV11", r"INSERT INTO halfvec_test_table (_id, title, text, embeddings) SELECT _id, title, text, embeddings::halfvec FROM documents;"), + LabelledQuery("PGV12", r"CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128);"), ) # fmt: on diff --git a/test_runner/performance/test_perf_pgbench.py b/test_runner/performance/test_perf_pgbench.py index d756d6eeca..6eaa29e4f8 100644 --- a/test_runner/performance/test_perf_pgbench.py +++ b/test_runner/performance/test_perf_pgbench.py @@ -18,6 +18,7 @@ class PgBenchLoadType(enum.Enum): SIMPLE_UPDATE = "simple-update" SELECT_ONLY = "select-only" PGVECTOR_HNSW = "pgvector-hnsw" + PGVECTOR_HALFVEC = "pgvector-halfvec" def utc_now_timestamp() -> int: @@ -153,6 +154,26 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P password=password, ) + if workload_type == PgBenchLoadType.PGVECTOR_HALFVEC: + # Run simple-update workload + run_pgbench( + env, + "pgvector-halfvec", + [ + "pgbench", + "-f", + "test_runner/performance/pgvector/pgbench_custom_script_pgvector_halfvec_queries.sql", + "-c100", + "-j20", + f"-T{duration}", + "-P2", + "--protocol=prepared", + "--progress-timestamp", + connstr, + ], + password=password, + ) + env.report_size() @@ -222,13 +243,3 @@ def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, dur @pytest.mark.remote_cluster def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int): run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY) - - -# The following test runs on an existing database that has pgvector extension installed -# and a table with 1 million embedding vectors loaded and indexed with HNSW. -# -# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup. -@pytest.mark.parametrize("duration", get_durations_matrix()) -@pytest.mark.remote_cluster -def test_pgbench_remote_pgvector(remote_compare: PgCompare, duration: int): - run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW) diff --git a/test_runner/performance/test_perf_pgvector_queries.py b/test_runner/performance/test_perf_pgvector_queries.py new file mode 100644 index 0000000000..bb3db16305 --- /dev/null +++ b/test_runner/performance/test_perf_pgvector_queries.py @@ -0,0 +1,24 @@ +import pytest +from fixtures.compare_fixtures import PgCompare + +from performance.test_perf_pgbench import PgBenchLoadType, get_durations_matrix, run_test_pgbench + + +# The following test runs on an existing database that has pgvector extension installed +# and a table with 1 million embedding vectors loaded and indexed with HNSW. +# +# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup. +@pytest.mark.parametrize("duration", get_durations_matrix()) +@pytest.mark.remote_cluster +def test_pgbench_remote_pgvector_hnsw(remote_compare: PgCompare, duration: int): + run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW) + + +# The following test runs on an existing database that has pgvector extension installed +# and a table with 1 million embedding vectors loaded and indexed with halfvec. +# +# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup. +@pytest.mark.parametrize("duration", get_durations_matrix()) +@pytest.mark.remote_cluster +def test_pgbench_remote_pgvector_halfvec(remote_compare: PgCompare, duration: int): + run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HALFVEC) diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py index 1d1b2fb485..8edc8c554c 100644 --- a/test_runner/regress/test_hot_standby.py +++ b/test_runner/regress/test_hot_standby.py @@ -300,7 +300,7 @@ def test_replica_query_race(neon_simple_env: NeonEnv): p_cur.execute("CREATE TABLE test AS SELECT 0 AS counter") standby_ep = env.endpoints.new_replica_start(origin=primary_ep, endpoint_id="standby") - time.sleep(1) + wait_replica_caughtup(primary_ep, standby_ep) # In primary, run a lot of UPDATEs on a single page finished = False diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 2031feaa83..f41468210c 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -133,6 +133,9 @@ def test_storage_controller_smoke( wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id)) + # Let all the reconciliations after marking the node offline complete + env.storage_controller.reconcile_until_idle() + # Marking pageserver active should not migrate anything to it # immediately env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Active"}) diff --git a/test_runner/regress/test_tenant_size.py b/test_runner/regress/test_tenant_size.py index d3a228dbeb..a3dd422903 100644 --- a/test_runner/regress/test_tenant_size.py +++ b/test_runner/regress/test_tenant_size.py @@ -678,10 +678,6 @@ def test_synthetic_size_while_deleting(neon_env_builder: NeonEnvBuilder): with pytest.raises(PageserverApiException, match=matcher): completion.result() - # this happens on both cases - env.pageserver.allowed_errors.append( - ".*ignoring failure to find gc cutoffs: timeline shutting down.*" - ) # this happens only in the case of deletion (http response logging) env.pageserver.allowed_errors.append(".*Failed to refresh gc_info before gathering inputs.*") diff --git a/test_runner/regress/test_wal_acceptor_async.py b/test_runner/regress/test_wal_acceptor_async.py index 715d22eed8..971fad787a 100644 --- a/test_runner/regress/test_wal_acceptor_async.py +++ b/test_runner/regress/test_wal_acceptor_async.py @@ -601,13 +601,16 @@ async def run_segment_init_failure(env: NeonEnv): conn = await ep.connect_async() ep.safe_psql("select pg_switch_wal()") # jump to the segment boundary # next insertion should hang until failpoint is disabled. - asyncio.create_task(conn.execute("insert into t select generate_series(1,1), 'payload'")) + bg_query = asyncio.create_task( + conn.execute("insert into t select generate_series(1,1), 'payload'") + ) sleep_sec = 2 await asyncio.sleep(sleep_sec) - # also restart ep at segment boundary to make test more interesting - ep.stop() # it must still be not finished - # assert not bg_query.done() + assert not bg_query.done() + # Also restart ep at segment boundary to make test more interesting. Do it in immediate mode; + # fast will hang because it will try to gracefully finish sending WAL. + ep.stop(mode="immediate") # Without segment rename during init (#6402) previous statement created # partially initialized 16MB segment, so sk restart also triggers #6401. sk.stop().start() diff --git a/vm-image-spec.yaml b/vm-image-spec.yaml index 15f820bebd..99164645a7 100644 --- a/vm-image-spec.yaml +++ b/vm-image-spec.yaml @@ -18,7 +18,7 @@ commands: - name: postgres-exporter user: nobody sysvInitAction: respawn - shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter' + shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter' - name: sql-exporter user: nobody sysvInitAction: respawn @@ -93,7 +93,7 @@ files: target: # Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL) # the schema gets dropped or replaced to match the driver expected DSN format. - data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable' + data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter' # Collectors (referenced by name) to execute on the target. # Glob patterns are supported (see for syntax). @@ -128,7 +128,7 @@ files: target: # Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL) # the schema gets dropped or replaced to match the driver expected DSN format. - data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable' + data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter_autoscaling' # Collectors (referenced by name) to execute on the target. # Glob patterns are supported (see for syntax).