mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-08 05:00:38 +00:00
Compare commits
21 Commits
problame/v
...
bodobolero
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a3475286a9 | ||
|
|
9fc67363a9 | ||
|
|
cf4cdd6cd5 | ||
|
|
b8940f1685 | ||
|
|
c5bc73fff0 | ||
|
|
46210035c5 | ||
|
|
81892199f6 | ||
|
|
83eb02b07a | ||
|
|
a71f58e69c | ||
|
|
e6eb0020a1 | ||
|
|
eb0ca9b648 | ||
|
|
6843fd8f89 | ||
|
|
edc900028e | ||
|
|
789196572e | ||
|
|
425eed24e8 | ||
|
|
f67010109f | ||
|
|
0c3e3a8667 | ||
|
|
82719542c6 | ||
|
|
d25f7e3dd5 | ||
|
|
fbccd1e676 | ||
|
|
dc2ab4407f |
8
.github/workflows/benchmarking.yml
vendored
8
.github/workflows/benchmarking.yml
vendored
@@ -99,7 +99,7 @@ jobs:
|
||||
# Set --sparse-ordering option of pytest-order plugin
|
||||
# to ensure tests are running in order of appears in the file.
|
||||
# It's important for test_perf_pgbench.py::test_pgbench_remote_* tests
|
||||
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py
|
||||
extra_params: -m remote_cluster --sparse-ordering --timeout 5400 --ignore test_runner/performance/test_perf_olap.py --ignore test_runner/performance/test_perf_pgvector_queries.py
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
@@ -410,14 +410,14 @@ jobs:
|
||||
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
|
||||
- name: Benchmark pgvector hnsw queries
|
||||
- name: Benchmark pgvector queries
|
||||
uses: ./.github/actions/run-python-test-set
|
||||
with:
|
||||
build_type: ${{ env.BUILD_TYPE }}
|
||||
test_selection: performance
|
||||
test_selection: performance/test_perf_pgvector_queries.py
|
||||
run_in_parallel: false
|
||||
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
|
||||
extra_params: -m remote_cluster --timeout 21600 -k test_pgbench_remote_pgvector
|
||||
extra_params: -m remote_cluster --timeout 21600
|
||||
env:
|
||||
BENCHMARK_CONNSTR: ${{ steps.set-up-connstr.outputs.connstr }}
|
||||
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
|
||||
|
||||
@@ -55,7 +55,7 @@ jobs:
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Use custom DOCKER_CONFIG directory to avoid conflicts with default settings
|
||||
# The default value is ~/.docker
|
||||
|
||||
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -858,7 +858,7 @@ jobs:
|
||||
cache-to: type=registry,ref=neondatabase/compute-node-${{ matrix.version }}:cache-${{ matrix.arch }},mode=max
|
||||
tags: |
|
||||
neondatabase/compute-node-${{ matrix.version }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.arch }}
|
||||
|
||||
|
||||
- name: Build neon extensions test image
|
||||
if: matrix.version == 'v16'
|
||||
uses: docker/build-push-action@v5
|
||||
@@ -965,7 +965,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v1
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
fetch-depth: 0
|
||||
|
||||
|
||||
@@ -120,7 +120,7 @@ num_cpus = "1.15"
|
||||
num-traits = "0.2.15"
|
||||
once_cell = "1.13"
|
||||
opentelemetry = "0.20.0"
|
||||
opentelemetry-otlp = { version = "0.13.0", default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-otlp = { version = "0.13.0", default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions = "0.12.0"
|
||||
parking_lot = "0.12"
|
||||
parquet = { version = "51.0.0", default-features = false, features = ["zstd"] }
|
||||
@@ -128,7 +128,7 @@ parquet_derive = "51.0.0"
|
||||
pbkdf2 = { version = "0.12.1", features = ["simple", "std"] }
|
||||
pin-project-lite = "0.2"
|
||||
procfs = "0.14"
|
||||
prometheus = {version = "0.13", default_features=false, features = ["process"]} # removes protobuf dependency
|
||||
prometheus = {version = "0.13", default-features=false, features = ["process"]} # removes protobuf dependency
|
||||
prost = "0.11"
|
||||
rand = "0.8"
|
||||
redis = { version = "0.25.2", features = ["tokio-rustls-comp", "keep-alive"] }
|
||||
@@ -184,7 +184,7 @@ tower-service = "0.3.2"
|
||||
tracing = "0.1"
|
||||
tracing-error = "0.2.0"
|
||||
tracing-opentelemetry = "0.21.0"
|
||||
tracing-subscriber = { version = "0.3", default_features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] }
|
||||
tracing-subscriber = { version = "0.3", default-features = false, features = ["smallvec", "fmt", "tracing-log", "std", "env-filter", "json", "ansi"] }
|
||||
twox-hash = { version = "1.6.3", default-features = false }
|
||||
url = "2.2"
|
||||
urlencoding = "2.1"
|
||||
|
||||
@@ -141,7 +141,7 @@ WORKDIR /home/nonroot
|
||||
|
||||
# Rust
|
||||
# Please keep the version of llvm (installed above) in sync with rust llvm (`rustc --version --verbose | grep LLVM`)
|
||||
ENV RUSTC_VERSION=1.78.0
|
||||
ENV RUSTC_VERSION=1.79.0
|
||||
ENV RUSTUP_HOME="/home/nonroot/.rustup"
|
||||
ENV PATH="/home/nonroot/.cargo/bin:${PATH}"
|
||||
RUN curl -sSO https://static.rust-lang.org/rustup/dist/$(uname -m)-unknown-linux-gnu/rustup-init && whoami && \
|
||||
|
||||
@@ -246,12 +246,17 @@ COPY patches/pgvector.patch /pgvector.patch
|
||||
# By default, pgvector Makefile uses `-march=native`. We don't want that,
|
||||
# because we build the images on different machines than where we run them.
|
||||
# Pass OPTFLAGS="" to remove it.
|
||||
RUN wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.1.tar.gz -O pgvector.tar.gz && \
|
||||
echo "fe6c8cb4e0cd1a8cb60f5badf9e1701e0fcabcfc260931c26d01e155c4dd21d1 pgvector.tar.gz" | sha256sum --check && \
|
||||
RUN if [ "$(uname -m)" = "x86_64" ]; then \
|
||||
OPTFLAGS=" -march=x86-64 "; \
|
||||
elif [ "$(uname -m)" = "aarch64" ]; then \
|
||||
OPTFLAGS=""; \
|
||||
fi && \
|
||||
wget https://github.com/pgvector/pgvector/archive/refs/tags/v0.7.2.tar.gz -O pgvector.tar.gz && \
|
||||
echo "617fba855c9bcb41a2a9bc78a78567fd2e147c72afd5bf9d37b31b9591632b30 pgvector.tar.gz" | sha256sum --check && \
|
||||
mkdir pgvector-src && cd pgvector-src && tar xzf ../pgvector.tar.gz --strip-components=1 -C . && \
|
||||
patch -p1 < /pgvector.patch && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="" PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="" install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="$OPTFLAGS" PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) OPTFLAGS="$OPTFLAGS" install PG_CONFIG=/usr/local/pgsql/bin/pg_config && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/vector.control
|
||||
|
||||
#########################################################################################
|
||||
@@ -979,7 +984,7 @@ RUN cd /ext-src/ && for f in *.tar.gz; \
|
||||
do echo $f; dname=$(echo $f | sed 's/\.tar.*//')-src; \
|
||||
rm -rf $dname; mkdir $dname; tar xzf $f --strip-components=1 -C $dname \
|
||||
|| exit 1; rm -f $f; done
|
||||
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
|
||||
RUN cd /ext-src/pgvector-src && patch -p1 <../pgvector.patch
|
||||
# cmake is required for the h3 test
|
||||
RUN apt-get update && apt-get install -y cmake
|
||||
RUN patch -p1 < /ext-src/pg_hintplan.patch
|
||||
|
||||
@@ -735,7 +735,7 @@ fn cli() -> clap::Command {
|
||||
Arg::new("filecache-connstr")
|
||||
.long("filecache-connstr")
|
||||
.default_value(
|
||||
"host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable",
|
||||
"host=localhost port=5432 dbname=postgres user=cloud_admin sslmode=disable application_name=vm-monitor",
|
||||
)
|
||||
.value_name("FILECACHE_CONNSTR"),
|
||||
)
|
||||
|
||||
@@ -4,10 +4,6 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
testing = [ "pageserver_api/testing" ]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
async-trait.workspace = true
|
||||
|
||||
@@ -383,12 +383,6 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<AuxFilePolicy>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'switch_aux_file_policy'")?,
|
||||
#[cfg(feature = "testing")]
|
||||
test_vm_bit_debug_logging: settings
|
||||
.remove("test_vm_bit_debug_logging")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'test_vm_bit_debug_logging' as bool")?,
|
||||
};
|
||||
if !settings.is_empty() {
|
||||
bail!("Unrecognized tenant settings: {settings:?}")
|
||||
@@ -512,12 +506,6 @@ impl PageServerNode {
|
||||
.map(|x| x.parse::<AuxFilePolicy>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'switch_aux_file_policy'")?,
|
||||
#[cfg(feature = "testing")]
|
||||
test_vm_bit_debug_logging: settings
|
||||
.remove("test_vm_bit_debug_logging")
|
||||
.map(|x| x.parse::<bool>())
|
||||
.transpose()
|
||||
.context("Failed to parse 'test_vm_bit_debug_logging' as bool")?,
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -4,10 +4,6 @@ version = "0.1.0"
|
||||
edition.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[features]
|
||||
default = []
|
||||
testing = []
|
||||
|
||||
[dependencies]
|
||||
serde.workspace = true
|
||||
serde_with.workspace = true
|
||||
|
||||
@@ -558,6 +558,12 @@ impl KeySpaceRandomAccum {
|
||||
self.ranges.push(range);
|
||||
}
|
||||
|
||||
pub fn add_keyspace(&mut self, keyspace: KeySpace) {
|
||||
for range in keyspace.ranges {
|
||||
self.add_range(range);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_keyspace(mut self) -> KeySpace {
|
||||
let mut ranges = Vec::new();
|
||||
if !self.ranges.is_empty() {
|
||||
|
||||
@@ -322,8 +322,6 @@ pub struct TenantConfig {
|
||||
pub timeline_get_throttle: Option<ThrottleConfig>,
|
||||
pub image_layer_creation_check_threshold: Option<u8>,
|
||||
pub switch_aux_file_policy: Option<AuxFilePolicy>,
|
||||
#[cfg(feature = "testing")]
|
||||
pub test_vm_bit_debug_logging: Option<bool>,
|
||||
}
|
||||
|
||||
/// The policy for the aux file storage. It can be switched through `switch_aux_file_policy`
|
||||
|
||||
@@ -7,7 +7,7 @@ license.workspace = true
|
||||
[dependencies]
|
||||
hyper.workspace = true
|
||||
opentelemetry = { workspace = true, features=["rt-tokio"] }
|
||||
opentelemetry-otlp = { workspace = true, default_features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-otlp = { workspace = true, default-features=false, features = ["http-proto", "trace", "http", "reqwest-client"] }
|
||||
opentelemetry-semantic-conventions.workspace = true
|
||||
reqwest = { workspace = true, default-features = false, features = ["rustls-tls"] }
|
||||
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
|
||||
|
||||
@@ -8,7 +8,7 @@ license.workspace = true
|
||||
default = []
|
||||
# Enables test-only APIs, incuding failpoints. In particular, enables the `fail_point!` macro,
|
||||
# which adds some runtime cost to run tests on outage conditions
|
||||
testing = ["fail/failpoints", "pageserver_api/testing"]
|
||||
testing = ["fail/failpoints"]
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
|
||||
@@ -2,10 +2,9 @@
|
||||
//! and push them to a HTTP endpoint.
|
||||
use crate::context::{DownloadBehavior, RequestContext};
|
||||
use crate::task_mgr::{self, TaskKind, BACKGROUND_RUNTIME};
|
||||
use crate::tenant::size::CalculateSyntheticSizeError;
|
||||
use crate::tenant::tasks::BackgroundLoopKind;
|
||||
use crate::tenant::{
|
||||
mgr::TenantManager, LogicalSizeCalculationCause, PageReconstructError, Tenant,
|
||||
};
|
||||
use crate::tenant::{mgr::TenantManager, LogicalSizeCalculationCause, Tenant};
|
||||
use camino::Utf8PathBuf;
|
||||
use consumption_metrics::EventType;
|
||||
use pageserver_api::models::TenantState;
|
||||
@@ -350,19 +349,12 @@ async fn calculate_and_log(tenant: &Tenant, cancel: &CancellationToken, ctx: &Re
|
||||
// Same for the loop that fetches computed metrics.
|
||||
// By using the same limiter, we centralize metrics collection for "start" and "finished" counters,
|
||||
// which turns out is really handy to understand the system.
|
||||
let Err(e) = tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await else {
|
||||
return;
|
||||
};
|
||||
|
||||
// this error can be returned if timeline is shutting down, but it does not
|
||||
// mean the synthetic size worker should terminate.
|
||||
let shutting_down = matches!(
|
||||
e.downcast_ref::<PageReconstructError>(),
|
||||
Some(PageReconstructError::Cancelled)
|
||||
);
|
||||
|
||||
if !shutting_down {
|
||||
let tenant_shard_id = tenant.tenant_shard_id();
|
||||
error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
|
||||
match tenant.calculate_synthetic_size(CAUSE, cancel, ctx).await {
|
||||
Ok(_) => {}
|
||||
Err(CalculateSyntheticSizeError::Cancelled) => {}
|
||||
Err(e) => {
|
||||
let tenant_shard_id = tenant.tenant_shard_id();
|
||||
error!("failed to calculate synthetic size for tenant {tenant_shard_id}: {e:#}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1135,7 +1135,10 @@ async fn tenant_size_handler(
|
||||
&ctx,
|
||||
)
|
||||
.await
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
.map_err(|e| match e {
|
||||
crate::tenant::size::CalculateSyntheticSizeError::Cancelled => ApiError::ShuttingDown,
|
||||
other => ApiError::InternalServerError(anyhow::anyhow!(other)),
|
||||
})?;
|
||||
|
||||
let mut sizes = None;
|
||||
let accepts_html = headers
|
||||
@@ -1143,9 +1146,7 @@ async fn tenant_size_handler(
|
||||
.map(|v| v == "text/html")
|
||||
.unwrap_or_default();
|
||||
if !inputs_only.unwrap_or(false) {
|
||||
let storage_model = inputs
|
||||
.calculate_model()
|
||||
.map_err(ApiError::InternalServerError)?;
|
||||
let storage_model = inputs.calculate_model();
|
||||
let size = storage_model.calculate();
|
||||
|
||||
// If request header expects html, return html
|
||||
|
||||
@@ -509,11 +509,24 @@ pub(crate) enum GcError {
|
||||
#[error(transparent)]
|
||||
Remote(anyhow::Error),
|
||||
|
||||
// An error reading while calculating GC cutoffs
|
||||
#[error(transparent)]
|
||||
GcCutoffs(PageReconstructError),
|
||||
|
||||
// If GC was invoked for a particular timeline, this error means it didn't exist
|
||||
#[error("timeline not found")]
|
||||
TimelineNotFound,
|
||||
}
|
||||
|
||||
impl From<PageReconstructError> for GcError {
|
||||
fn from(value: PageReconstructError) -> Self {
|
||||
match value {
|
||||
PageReconstructError::Cancelled => Self::TimelineCancelled,
|
||||
other => Self::GcCutoffs(other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Tenant {
|
||||
/// Yet another helper for timeline initialization.
|
||||
///
|
||||
@@ -1033,7 +1046,6 @@ impl Tenant {
|
||||
remote_metadata,
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
timeline_get_throttle: self.timeline_get_throttle.clone(),
|
||||
},
|
||||
ctx,
|
||||
@@ -1059,7 +1071,6 @@ impl Tenant {
|
||||
timeline_id,
|
||||
&index_part.metadata,
|
||||
remote_timeline_client,
|
||||
self.deletion_queue_client.clone(),
|
||||
)
|
||||
.instrument(tracing::info_span!("timeline_delete", %timeline_id))
|
||||
.await
|
||||
@@ -2921,17 +2932,9 @@ impl Tenant {
|
||||
.checked_sub(horizon)
|
||||
.unwrap_or(Lsn(0));
|
||||
|
||||
let res = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await;
|
||||
|
||||
match res {
|
||||
Ok(cutoffs) => {
|
||||
let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs);
|
||||
assert!(old.is_none());
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(timeline_id = %timeline.timeline_id, "ignoring failure to find gc cutoffs: {e:#}");
|
||||
}
|
||||
}
|
||||
let cutoffs = timeline.find_gc_cutoffs(cutoff, pitr, cancel, ctx).await?;
|
||||
let old = gc_cutoffs.insert(timeline.timeline_id, cutoffs);
|
||||
assert!(old.is_none());
|
||||
}
|
||||
|
||||
if !self.is_active() || self.cancel.is_cancelled() {
|
||||
@@ -3443,7 +3446,6 @@ impl Tenant {
|
||||
);
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client: self.deletion_queue_client.clone(),
|
||||
timeline_get_throttle: self.timeline_get_throttle.clone(),
|
||||
}
|
||||
}
|
||||
@@ -3553,7 +3555,7 @@ impl Tenant {
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<size::ModelInputs> {
|
||||
) -> Result<size::ModelInputs, size::CalculateSyntheticSizeError> {
|
||||
let logical_sizes_at_once = self
|
||||
.conf
|
||||
.concurrent_tenant_size_logical_size_queries
|
||||
@@ -3568,8 +3570,8 @@ impl Tenant {
|
||||
// See more for on the issue #2748 condenced out of the initial PR review.
|
||||
let mut shared_cache = tokio::select! {
|
||||
locked = self.cached_logical_sizes.lock() => locked,
|
||||
_ = cancel.cancelled() => anyhow::bail!("cancelled"),
|
||||
_ = self.cancel.cancelled() => anyhow::bail!("tenant is shutting down"),
|
||||
_ = cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled),
|
||||
_ = self.cancel.cancelled() => return Err(size::CalculateSyntheticSizeError::Cancelled),
|
||||
};
|
||||
|
||||
size::gather_inputs(
|
||||
@@ -3593,10 +3595,10 @@ impl Tenant {
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<u64> {
|
||||
) -> Result<u64, size::CalculateSyntheticSizeError> {
|
||||
let inputs = self.gather_size_inputs(None, cause, cancel, ctx).await?;
|
||||
|
||||
let size = inputs.calculate()?;
|
||||
let size = inputs.calculate();
|
||||
|
||||
self.set_cached_synthetic_size(size);
|
||||
|
||||
@@ -3831,8 +3833,6 @@ pub(crate) mod harness {
|
||||
tenant_conf.image_layer_creation_check_threshold,
|
||||
),
|
||||
switch_aux_file_policy: Some(tenant_conf.switch_aux_file_policy),
|
||||
#[cfg(feature = "testing")]
|
||||
test_vm_bit_debug_logging: Some(tenant_conf.test_vm_bit_debug_logging),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4043,6 +4043,7 @@ mod tests {
|
||||
use crate::repository::{Key, Value};
|
||||
use crate::tenant::harness::*;
|
||||
use crate::tenant::timeline::CompactFlags;
|
||||
use crate::walrecord::NeonWalRecord;
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use bytes::{Bytes, BytesMut};
|
||||
use hex_literal::hex;
|
||||
@@ -6366,7 +6367,7 @@ mod tests {
|
||||
.await?;
|
||||
Ok(res.pop_last().map(|(k, v)| {
|
||||
assert_eq!(k, key);
|
||||
v.unwrap().0
|
||||
v.unwrap()
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -6707,8 +6708,8 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simple_bottom_most_compaction() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_simple_bottom_most_compaction")?;
|
||||
async fn test_simple_bottom_most_compaction_images() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_simple_bottom_most_compaction_images")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
fn get_key(id: u32) -> Key {
|
||||
@@ -6863,4 +6864,79 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_neon_test_record() -> anyhow::Result<()> {
|
||||
let harness = TenantHarness::create("test_neon_test_record")?;
|
||||
let (tenant, ctx) = harness.load().await;
|
||||
|
||||
fn get_key(id: u32) -> Key {
|
||||
// using aux key here b/c they are guaranteed to be inside `collect_keyspace`.
|
||||
let mut key = Key::from_hex("620000000033333333444444445500000000").unwrap();
|
||||
key.field6 = id;
|
||||
key
|
||||
}
|
||||
|
||||
let delta1 = vec![
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_append(",0x20")),
|
||||
),
|
||||
(
|
||||
get_key(1),
|
||||
Lsn(0x30),
|
||||
Value::WalRecord(NeonWalRecord::wal_append(",0x30")),
|
||||
),
|
||||
(get_key(2), Lsn(0x10), Value::Image("0x10".into())),
|
||||
(
|
||||
get_key(2),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_append(",0x20")),
|
||||
),
|
||||
(
|
||||
get_key(2),
|
||||
Lsn(0x30),
|
||||
Value::WalRecord(NeonWalRecord::wal_append(",0x30")),
|
||||
),
|
||||
(get_key(3), Lsn(0x10), Value::Image("0x10".into())),
|
||||
(
|
||||
get_key(3),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_clear()),
|
||||
),
|
||||
(get_key(4), Lsn(0x10), Value::Image("0x10".into())),
|
||||
(
|
||||
get_key(4),
|
||||
Lsn(0x20),
|
||||
Value::WalRecord(NeonWalRecord::wal_init()),
|
||||
),
|
||||
];
|
||||
let image1 = vec![(get_key(1), "0x10".into())];
|
||||
|
||||
let tline = tenant
|
||||
.create_test_timeline_with_layers(
|
||||
TIMELINE_ID,
|
||||
Lsn(0x10),
|
||||
DEFAULT_PG_VERSION,
|
||||
&ctx,
|
||||
vec![delta1], // delta layers
|
||||
vec![(Lsn(0x10), image1)], // image layers
|
||||
Lsn(0x50),
|
||||
)
|
||||
.await?;
|
||||
|
||||
assert_eq!(
|
||||
tline.get(get_key(1), Lsn(0x50), &ctx).await?,
|
||||
Bytes::from_static(b"0x10,0x20,0x30")
|
||||
);
|
||||
assert_eq!(
|
||||
tline.get(get_key(2), Lsn(0x50), &ctx).await?,
|
||||
Bytes::from_static(b"0x10,0x20,0x30")
|
||||
);
|
||||
// assert_eq!(tline.get(get_key(3), Lsn(0x50), &ctx).await?, Bytes::new());
|
||||
// assert_eq!(tline.get(get_key(4), Lsn(0x50), &ctx).await?, Bytes::new());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -377,9 +377,6 @@ pub struct TenantConf {
|
||||
/// There is a `last_aux_file_policy` flag which gets persisted in `index_part.json` once the first aux
|
||||
/// file is written.
|
||||
pub switch_aux_file_policy: AuxFilePolicy,
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
pub test_vm_bit_debug_logging: bool,
|
||||
}
|
||||
|
||||
/// Same as TenantConf, but this struct preserves the information about
|
||||
@@ -479,11 +476,6 @@ pub struct TenantConfOpt {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub switch_aux_file_policy: Option<AuxFilePolicy>,
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(default)]
|
||||
pub test_vm_bit_debug_logging: Option<bool>,
|
||||
}
|
||||
|
||||
impl TenantConfOpt {
|
||||
@@ -546,10 +538,6 @@ impl TenantConfOpt {
|
||||
switch_aux_file_policy: self
|
||||
.switch_aux_file_policy
|
||||
.unwrap_or(global_conf.switch_aux_file_policy),
|
||||
#[cfg(feature = "testing")]
|
||||
test_vm_bit_debug_logging: self
|
||||
.test_vm_bit_debug_logging
|
||||
.unwrap_or(global_conf.test_vm_bit_debug_logging),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -594,8 +582,6 @@ impl Default for TenantConf {
|
||||
timeline_get_throttle: crate::tenant::throttle::Config::disabled(),
|
||||
image_layer_creation_check_threshold: DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD,
|
||||
switch_aux_file_policy: AuxFilePolicy::default_tenant_config(),
|
||||
#[cfg(feature = "testing")]
|
||||
test_vm_bit_debug_logging: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -671,8 +657,6 @@ impl From<TenantConfOpt> for models::TenantConfig {
|
||||
timeline_get_throttle: value.timeline_get_throttle.map(ThrottleConfig::from),
|
||||
image_layer_creation_check_threshold: value.image_layer_creation_check_threshold,
|
||||
switch_aux_file_policy: value.switch_aux_file_policy,
|
||||
#[cfg(feature = "testing")]
|
||||
test_vm_bit_debug_logging: value.test_vm_bit_debug_logging,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -513,7 +513,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
// cover our access to local storage.
|
||||
let Ok(_guard) = self.secondary_state.gate.enter() else {
|
||||
// Shutting down
|
||||
return Ok(());
|
||||
return Err(UpdateError::Cancelled);
|
||||
};
|
||||
|
||||
let tenant_shard_id = self.secondary_state.get_tenant_shard_id();
|
||||
@@ -846,7 +846,7 @@ impl<'a> TenantDownloader<'a> {
|
||||
for layer in timeline.layers {
|
||||
if self.secondary_state.cancel.is_cancelled() {
|
||||
tracing::debug!("Cancelled -- dropping out of layer loop");
|
||||
return Ok(());
|
||||
return Err(UpdateError::Cancelled);
|
||||
}
|
||||
|
||||
// Existing on-disk layers: just update their access time.
|
||||
|
||||
@@ -3,7 +3,6 @@ use std::collections::hash_map::Entry;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
use tokio::sync::oneshot::error::RecvError;
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -11,7 +10,7 @@ use tokio_util::sync::CancellationToken;
|
||||
use crate::context::RequestContext;
|
||||
use crate::pgdatadir_mapping::CalculateLogicalSizeError;
|
||||
|
||||
use super::{LogicalSizeCalculationCause, Tenant};
|
||||
use super::{GcError, LogicalSizeCalculationCause, Tenant};
|
||||
use crate::tenant::Timeline;
|
||||
use utils::id::TimelineId;
|
||||
use utils::lsn::Lsn;
|
||||
@@ -43,6 +42,44 @@ pub struct SegmentMeta {
|
||||
pub kind: LsnKind,
|
||||
}
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub(crate) enum CalculateSyntheticSizeError {
|
||||
/// Something went wrong internally to the calculation of logical size at a particular branch point
|
||||
#[error("Failed to calculated logical size on timeline {timeline_id} at {lsn}: {error}")]
|
||||
LogicalSize {
|
||||
timeline_id: TimelineId,
|
||||
lsn: Lsn,
|
||||
error: CalculateLogicalSizeError,
|
||||
},
|
||||
|
||||
/// Something went wrong internally when calculating GC parameters at start of size calculation
|
||||
#[error(transparent)]
|
||||
GcInfo(GcError),
|
||||
|
||||
/// Totally unexpected errors, like panics joining a task
|
||||
#[error(transparent)]
|
||||
Fatal(anyhow::Error),
|
||||
|
||||
/// The LSN we are trying to calculate a size at no longer exists at the point we query it
|
||||
#[error("Could not find size at {lsn} in timeline {timeline_id}")]
|
||||
LsnNotFound { timeline_id: TimelineId, lsn: Lsn },
|
||||
|
||||
/// Tenant shut down while calculating size
|
||||
#[error("Cancelled")]
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
impl From<GcError> for CalculateSyntheticSizeError {
|
||||
fn from(value: GcError) -> Self {
|
||||
match value {
|
||||
GcError::TenantCancelled | GcError::TimelineCancelled => {
|
||||
CalculateSyntheticSizeError::Cancelled
|
||||
}
|
||||
other => CalculateSyntheticSizeError::GcInfo(other),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SegmentMeta {
|
||||
fn size_needed(&self) -> bool {
|
||||
match self.kind {
|
||||
@@ -116,12 +153,9 @@ pub(super) async fn gather_inputs(
|
||||
cause: LogicalSizeCalculationCause,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<ModelInputs> {
|
||||
) -> Result<ModelInputs, CalculateSyntheticSizeError> {
|
||||
// refresh is needed to update gc related pitr_cutoff and horizon_cutoff
|
||||
tenant
|
||||
.refresh_gc_info(cancel, ctx)
|
||||
.await
|
||||
.context("Failed to refresh gc_info before gathering inputs")?;
|
||||
tenant.refresh_gc_info(cancel, ctx).await?;
|
||||
|
||||
// Collect information about all the timelines
|
||||
let mut timelines = tenant.list_timelines();
|
||||
@@ -327,6 +361,12 @@ pub(super) async fn gather_inputs(
|
||||
)
|
||||
.await?;
|
||||
|
||||
if tenant.cancel.is_cancelled() {
|
||||
// If we're shutting down, return an error rather than a sparse result that might include some
|
||||
// timelines from before we started shutting down
|
||||
return Err(CalculateSyntheticSizeError::Cancelled);
|
||||
}
|
||||
|
||||
Ok(ModelInputs {
|
||||
segments,
|
||||
timeline_inputs,
|
||||
@@ -345,7 +385,7 @@ async fn fill_logical_sizes(
|
||||
logical_size_cache: &mut HashMap<(TimelineId, Lsn), u64>,
|
||||
cause: LogicalSizeCalculationCause,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<()> {
|
||||
) -> Result<(), CalculateSyntheticSizeError> {
|
||||
let timeline_hash: HashMap<TimelineId, Arc<Timeline>> = HashMap::from_iter(
|
||||
timelines
|
||||
.iter()
|
||||
@@ -387,7 +427,7 @@ async fn fill_logical_sizes(
|
||||
}
|
||||
|
||||
// Perform the size lookups
|
||||
let mut have_any_error = false;
|
||||
let mut have_any_error = None;
|
||||
while let Some(res) = joinset.join_next().await {
|
||||
// each of these come with Result<anyhow::Result<_>, JoinError>
|
||||
// because of spawn + spawn_blocking
|
||||
@@ -398,21 +438,36 @@ async fn fill_logical_sizes(
|
||||
Err(join_error) => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
error!("task that calls spawn_ondemand_logical_size_calculation panicked: {join_error:#}");
|
||||
have_any_error = true;
|
||||
|
||||
have_any_error = Some(CalculateSyntheticSizeError::Fatal(
|
||||
anyhow::anyhow!(join_error)
|
||||
.context("task that calls spawn_ondemand_logical_size_calculation"),
|
||||
));
|
||||
}
|
||||
Ok(Err(recv_result_error)) => {
|
||||
// cannot really do anything, as this panic is likely a bug
|
||||
error!("failed to receive logical size query result: {recv_result_error:#}");
|
||||
have_any_error = true;
|
||||
have_any_error = Some(CalculateSyntheticSizeError::Fatal(
|
||||
anyhow::anyhow!(recv_result_error)
|
||||
.context("Receiving logical size query result"),
|
||||
));
|
||||
}
|
||||
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Err(error)))) => {
|
||||
if !matches!(error, CalculateLogicalSizeError::Cancelled) {
|
||||
if matches!(error, CalculateLogicalSizeError::Cancelled) {
|
||||
// Skip this: it's okay if one timeline among many is shutting down while we
|
||||
// calculate inputs for the overall tenant.
|
||||
continue;
|
||||
} else {
|
||||
warn!(
|
||||
timeline_id=%timeline.timeline_id,
|
||||
"failed to calculate logical size at {lsn}: {error:#}"
|
||||
);
|
||||
have_any_error = Some(CalculateSyntheticSizeError::LogicalSize {
|
||||
timeline_id: timeline.timeline_id,
|
||||
lsn,
|
||||
error,
|
||||
});
|
||||
}
|
||||
have_any_error = true;
|
||||
}
|
||||
Ok(Ok(TimelineAtLsnSizeResult(timeline, lsn, Ok(size)))) => {
|
||||
debug!(timeline_id=%timeline.timeline_id, %lsn, size, "size calculated");
|
||||
@@ -426,10 +481,10 @@ async fn fill_logical_sizes(
|
||||
// prune any keys not needed anymore; we record every used key and added key.
|
||||
logical_size_cache.retain(|key, _| sizes_needed.contains_key(key));
|
||||
|
||||
if have_any_error {
|
||||
if let Some(error) = have_any_error {
|
||||
// we cannot complete this round, because we are missing data.
|
||||
// we have however cached all we were able to request calculation on.
|
||||
anyhow::bail!("failed to calculate some logical_sizes");
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
// Insert the looked up sizes to the Segments
|
||||
@@ -444,32 +499,29 @@ async fn fill_logical_sizes(
|
||||
if let Some(Some(size)) = sizes_needed.get(&(timeline_id, lsn)) {
|
||||
seg.segment.size = Some(*size);
|
||||
} else {
|
||||
bail!("could not find size at {} in timeline {}", lsn, timeline_id);
|
||||
return Err(CalculateSyntheticSizeError::LsnNotFound { timeline_id, lsn });
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl ModelInputs {
|
||||
pub fn calculate_model(&self) -> anyhow::Result<tenant_size_model::StorageModel> {
|
||||
pub fn calculate_model(&self) -> tenant_size_model::StorageModel {
|
||||
// Convert SegmentMetas into plain Segments
|
||||
let storage = StorageModel {
|
||||
StorageModel {
|
||||
segments: self
|
||||
.segments
|
||||
.iter()
|
||||
.map(|seg| seg.segment.clone())
|
||||
.collect(),
|
||||
};
|
||||
|
||||
Ok(storage)
|
||||
}
|
||||
}
|
||||
|
||||
// calculate total project size
|
||||
pub fn calculate(&self) -> anyhow::Result<u64> {
|
||||
let storage = self.calculate_model()?;
|
||||
pub fn calculate(&self) -> u64 {
|
||||
let storage = self.calculate_model();
|
||||
let sizes = storage.calculate();
|
||||
|
||||
Ok(sizes.total_size)
|
||||
sizes.total_size
|
||||
}
|
||||
}
|
||||
|
||||
@@ -656,7 +708,7 @@ fn verify_size_for_multiple_branches() {
|
||||
"#;
|
||||
let inputs: ModelInputs = serde_json::from_str(doc).unwrap();
|
||||
|
||||
assert_eq!(inputs.calculate().unwrap(), 37_851_408);
|
||||
assert_eq!(inputs.calculate(), 37_851_408);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -711,7 +763,7 @@ fn verify_size_for_one_branch() {
|
||||
|
||||
let model: ModelInputs = serde_json::from_str(doc).unwrap();
|
||||
|
||||
let res = model.calculate_model().unwrap().calculate();
|
||||
let res = model.calculate_model().calculate();
|
||||
|
||||
println!("calculated synthetic size: {}", res.total_size);
|
||||
println!("result: {:?}", serde_json::to_string(&res.segments));
|
||||
|
||||
@@ -73,7 +73,7 @@ where
|
||||
/// the same ValueReconstructState struct in the next 'get_value_reconstruct_data'
|
||||
/// call, to collect more records.
|
||||
///
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq)]
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ValueReconstructState {
|
||||
pub records: Vec<(Lsn, NeonWalRecord)>,
|
||||
pub img: Option<(Lsn, Bytes)>,
|
||||
@@ -318,7 +318,7 @@ pub(crate) struct LayerFringe {
|
||||
#[derive(Debug)]
|
||||
struct LayerKeyspace {
|
||||
layer: ReadableLayer,
|
||||
target_keyspace: Vec<KeySpace>,
|
||||
target_keyspace: KeySpaceRandomAccum,
|
||||
}
|
||||
|
||||
impl LayerFringe {
|
||||
@@ -342,17 +342,13 @@ impl LayerFringe {
|
||||
_,
|
||||
LayerKeyspace {
|
||||
layer,
|
||||
target_keyspace,
|
||||
mut target_keyspace,
|
||||
},
|
||||
)) => {
|
||||
let mut keyspace = KeySpaceRandomAccum::new();
|
||||
for ks in target_keyspace {
|
||||
for part in ks.ranges {
|
||||
keyspace.add_range(part);
|
||||
}
|
||||
}
|
||||
Some((layer, keyspace.consume_keyspace(), read_desc.lsn_range))
|
||||
}
|
||||
)) => Some((
|
||||
layer,
|
||||
target_keyspace.consume_keyspace(),
|
||||
read_desc.lsn_range,
|
||||
)),
|
||||
None => unreachable!("fringe internals are always consistent"),
|
||||
}
|
||||
}
|
||||
@@ -367,16 +363,18 @@ impl LayerFringe {
|
||||
let entry = self.layers.entry(layer_id.clone());
|
||||
match entry {
|
||||
Entry::Occupied(mut entry) => {
|
||||
entry.get_mut().target_keyspace.push(keyspace);
|
||||
entry.get_mut().target_keyspace.add_keyspace(keyspace);
|
||||
}
|
||||
Entry::Vacant(entry) => {
|
||||
self.planned_reads_by_lsn.push(ReadDesc {
|
||||
lsn_range,
|
||||
layer_id: layer_id.clone(),
|
||||
});
|
||||
let mut accum = KeySpaceRandomAccum::new();
|
||||
accum.add_keyspace(keyspace);
|
||||
entry.insert(LayerKeyspace {
|
||||
layer,
|
||||
target_keyspace: vec![keyspace],
|
||||
target_keyspace: accum,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -219,7 +219,6 @@ pub struct DeltaLayerInner {
|
||||
// values copied from summary
|
||||
index_start_blk: u32,
|
||||
index_root_blk: u32,
|
||||
lsn_range: Range<Lsn>,
|
||||
|
||||
file: VirtualFile,
|
||||
file_id: FileId,
|
||||
@@ -785,7 +784,6 @@ impl DeltaLayerInner {
|
||||
file_id,
|
||||
index_start_blk: actual_summary.index_start_blk,
|
||||
index_root_blk: actual_summary.index_root_blk,
|
||||
lsn_range: actual_summary.lsn_range,
|
||||
max_vectored_read_bytes,
|
||||
}))
|
||||
}
|
||||
@@ -822,7 +820,6 @@ impl DeltaLayerInner {
|
||||
if entry_lsn < lsn_range.start {
|
||||
return false;
|
||||
}
|
||||
assert!(entry_lsn <= search_key.lsn(), "certain because of how backwards visit direction works");
|
||||
offsets.push((entry_lsn, blob_ref.pos()));
|
||||
|
||||
!blob_ref.will_init()
|
||||
@@ -912,7 +909,7 @@ impl DeltaLayerInner {
|
||||
|
||||
let reads = Self::plan_reads(
|
||||
&keyspace,
|
||||
lsn_range,
|
||||
lsn_range.clone(),
|
||||
data_end_offset,
|
||||
index_reader,
|
||||
planner,
|
||||
@@ -925,7 +922,7 @@ impl DeltaLayerInner {
|
||||
self.do_reads_and_update_state(reads, reconstruct_state, ctx)
|
||||
.await;
|
||||
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, self.lsn_range.start);
|
||||
reconstruct_state.on_lsn_advanced(&keyspace, lsn_range.start);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -62,6 +62,7 @@ use std::{
|
||||
ops::ControlFlow,
|
||||
};
|
||||
|
||||
use crate::metrics::GetKind;
|
||||
use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS;
|
||||
use crate::{
|
||||
aux_file::AuxFileSizeEstimator,
|
||||
@@ -75,7 +76,6 @@ use crate::{
|
||||
disk_usage_eviction_task::DiskUsageEvictionInfo,
|
||||
pgdatadir_mapping::CollectKeySpaceError,
|
||||
};
|
||||
use crate::{deletion_queue::DeletionQueueClient, metrics::GetKind};
|
||||
use crate::{
|
||||
disk_usage_eviction_task::finite_f32,
|
||||
tenant::storage_layer::{
|
||||
@@ -205,7 +205,6 @@ fn drop_wlock<T>(rlock: tokio::sync::RwLockWriteGuard<'_, T>) {
|
||||
/// The outward-facing resources required to build a Timeline
|
||||
pub struct TimelineResources {
|
||||
pub remote_client: RemoteTimelineClient,
|
||||
pub deletion_queue_client: DeletionQueueClient,
|
||||
pub timeline_get_throttle: Arc<
|
||||
crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>,
|
||||
>,
|
||||
@@ -910,9 +909,7 @@ impl Timeline {
|
||||
img: cached_page_img,
|
||||
};
|
||||
|
||||
self.get_impl(key, lsn, reconstruct_state, ctx)
|
||||
.await
|
||||
.map(|v| v.0)
|
||||
self.get_impl(key, lsn, reconstruct_state, ctx).await
|
||||
}
|
||||
GetImpl::Vectored => {
|
||||
let keyspace = KeySpace {
|
||||
@@ -924,59 +921,16 @@ impl Timeline {
|
||||
let mut reconstruct_state = ValuesReconstructState::new();
|
||||
|
||||
// Only add the cached image to the reconstruct state when it exists.
|
||||
let cached_page_img_lsn = cached_page_img.as_ref().map(|(lsn, _)| *lsn);
|
||||
if cached_page_img.is_some() {
|
||||
let mut key_state = VectoredValueReconstructState::default();
|
||||
key_state.img = cached_page_img;
|
||||
reconstruct_state.keys.insert(key, Ok(key_state));
|
||||
}
|
||||
|
||||
let debug_log = {
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
self.get_test_vm_bit_debug_logging()
|
||||
}
|
||||
#[cfg(not(feature = "testing"))]
|
||||
{
|
||||
false
|
||||
}
|
||||
};
|
||||
|
||||
if debug_log {
|
||||
tracing::info!(%key, %lsn, ?cached_page_img_lsn, "debug-logging page reconstruction");
|
||||
}
|
||||
|
||||
if debug_log {
|
||||
tracing::info!(
|
||||
location = "before vectored get",
|
||||
"debug-logging page reconstruction"
|
||||
);
|
||||
self.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.dump(false, ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let vectored_res = self
|
||||
.get_vectored_impl(keyspace.clone(), lsn, &mut reconstruct_state, ctx)
|
||||
.await;
|
||||
|
||||
if debug_log {
|
||||
tracing::info!(
|
||||
location = "before validation",
|
||||
"debug-logging page reconstruction"
|
||||
);
|
||||
self.layers
|
||||
.read()
|
||||
.await
|
||||
.layer_map()
|
||||
.dump(false, ctx)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
if self.conf.validate_vectored_get {
|
||||
self.validate_get_vectored_impl(&vectored_res, keyspace, lsn, ctx)
|
||||
.await;
|
||||
@@ -994,7 +948,7 @@ impl Timeline {
|
||||
"Singular vectored get returned wrong key"
|
||||
)))
|
||||
} else {
|
||||
value.map(|v| v.0)
|
||||
value
|
||||
}
|
||||
}
|
||||
None => Err(PageReconstructError::MissingKey(MissingKeyError {
|
||||
@@ -1018,7 +972,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
mut reconstruct_state: ValueReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<(Bytes, ValueReconstructState), PageReconstructError> {
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
// XXX: structured stats collection for layer eviction here.
|
||||
trace!(
|
||||
"get page request for {}@{} from task kind {:?}",
|
||||
@@ -1157,12 +1111,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
let Ok(res) = res else {
|
||||
return Err(res.unwrap_err());
|
||||
};
|
||||
Ok(BTreeMap::from_iter(
|
||||
res.into_iter().map(|(k, v)| (k, v.map(|v| v.0))),
|
||||
))
|
||||
res
|
||||
}
|
||||
|
||||
/// Scan the keyspace and return all existing key-values in the keyspace. This currently uses vectored
|
||||
@@ -1226,12 +1175,7 @@ impl Timeline {
|
||||
recording.observe(throttled);
|
||||
}
|
||||
|
||||
let Ok(vectored_res) = vectored_res else {
|
||||
return Err(vectored_res.unwrap_err());
|
||||
};
|
||||
Ok(BTreeMap::from_iter(
|
||||
vectored_res.into_iter().map(|(k, v)| (k, v.map(|v| v.0))),
|
||||
))
|
||||
vectored_res
|
||||
}
|
||||
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
@@ -1240,10 +1184,7 @@ impl Timeline {
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<
|
||||
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
|
||||
GetVectoredError,
|
||||
> {
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
let mut values = BTreeMap::new();
|
||||
|
||||
for range in keyspace.ranges {
|
||||
@@ -1302,10 +1243,7 @@ impl Timeline {
|
||||
lsn: Lsn,
|
||||
reconstruct_state: &mut ValuesReconstructState,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<
|
||||
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
|
||||
GetVectoredError,
|
||||
> {
|
||||
) -> Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError> {
|
||||
let get_kind = if keyspace.total_raw_size() == 1 {
|
||||
GetKind::Singular
|
||||
} else {
|
||||
@@ -1322,10 +1260,7 @@ impl Timeline {
|
||||
let reconstruct_timer = crate::metrics::RECONSTRUCT_TIME
|
||||
.for_get_kind(get_kind)
|
||||
.start_timer();
|
||||
let mut results: BTreeMap<
|
||||
Key,
|
||||
Result<(Bytes, ValueReconstructState), PageReconstructError>,
|
||||
> = BTreeMap::new();
|
||||
let mut results: BTreeMap<Key, Result<Bytes, PageReconstructError>> = BTreeMap::new();
|
||||
let layers_visited = reconstruct_state.get_layers_visited();
|
||||
|
||||
for (key, res) in std::mem::take(&mut reconstruct_state.keys) {
|
||||
@@ -1361,10 +1296,7 @@ impl Timeline {
|
||||
/// Not subject to [`Self::timeline_get_throttle`].
|
||||
pub(super) async fn validate_get_vectored_impl(
|
||||
&self,
|
||||
vectored_res: &Result<
|
||||
BTreeMap<Key, Result<(Bytes, ValueReconstructState), PageReconstructError>>,
|
||||
GetVectoredError,
|
||||
>,
|
||||
vectored_res: &Result<BTreeMap<Key, Result<Bytes, PageReconstructError>>, GetVectoredError>,
|
||||
keyspace: KeySpace,
|
||||
lsn: Lsn,
|
||||
ctx: &RequestContext,
|
||||
@@ -1437,8 +1369,8 @@ impl Timeline {
|
||||
key: &Key,
|
||||
keyspace: &KeySpace,
|
||||
lsn: Lsn,
|
||||
(seq, seq_reconstruct_state): &(Bytes, ValueReconstructState),
|
||||
(vec, vec_reconstruct_state): &(Bytes, ValueReconstructState),
|
||||
seq: &Bytes,
|
||||
vec: &Bytes,
|
||||
) {
|
||||
if *key == AUX_FILES_KEY {
|
||||
// The value reconstruct of AUX_FILES_KEY from records is not deterministic
|
||||
@@ -1461,16 +1393,10 @@ impl Timeline {
|
||||
}
|
||||
} else {
|
||||
// All other keys should reconstruct deterministically, so we simply compare the blobs.
|
||||
if seq != vec {
|
||||
assert_eq!(
|
||||
seq_reconstruct_state, vec_reconstruct_state,
|
||||
"Reconstruct state mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
|
||||
);
|
||||
assert_eq!(
|
||||
seq, vec,
|
||||
"Image mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
|
||||
);
|
||||
}
|
||||
assert_eq!(
|
||||
seq, vec,
|
||||
"Image mismatch for key {key} - keyspace={keyspace:?} lsn={lsn}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2239,15 +2165,6 @@ impl Timeline {
|
||||
.unwrap_or(default_tenant_conf.evictions_low_residence_duration_metric_threshold)
|
||||
}
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
fn get_test_vm_bit_debug_logging(&self) -> bool {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
.tenant_conf
|
||||
.test_vm_bit_debug_logging
|
||||
.unwrap_or(self.conf.default_tenant_conf.test_vm_bit_debug_logging)
|
||||
}
|
||||
|
||||
fn get_image_layer_creation_check_threshold(&self) -> u8 {
|
||||
let tenant_conf = self.tenant_conf.load();
|
||||
tenant_conf
|
||||
@@ -2263,10 +2180,6 @@ impl Timeline {
|
||||
pub(super) fn tenant_conf_updated(&self, new_conf: &TenantConfOpt) {
|
||||
// NB: Most tenant conf options are read by background loops, so,
|
||||
// changes will automatically be picked up.
|
||||
#[cfg(feature = "testing")]
|
||||
{
|
||||
info!(?new_conf.test_vm_bit_debug_logging, "updating tenant conf");
|
||||
}
|
||||
|
||||
// The threshold is embedded in the metric. So, we need to update it.
|
||||
{
|
||||
@@ -4442,7 +4355,7 @@ impl Timeline {
|
||||
let mut total_kb_retrieved = 0;
|
||||
let mut total_keys_retrieved = 0;
|
||||
for (k, v) in data {
|
||||
let (v, _) = v.map_err(CreateImageLayersError::PageReconstructError)?;
|
||||
let v = v.map_err(CreateImageLayersError::PageReconstructError)?;
|
||||
total_kb_retrieved += KEY_SIZE + v.len();
|
||||
total_keys_retrieved += 1;
|
||||
new_data.insert(k, v);
|
||||
@@ -4909,7 +4822,7 @@ impl Timeline {
|
||||
pitr: Duration,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
) -> anyhow::Result<GcCutoffs> {
|
||||
) -> Result<GcCutoffs, PageReconstructError> {
|
||||
let _timer = self
|
||||
.metrics
|
||||
.find_gc_cutoffs_histo
|
||||
@@ -5237,7 +5150,7 @@ impl Timeline {
|
||||
key: Key,
|
||||
request_lsn: Lsn,
|
||||
mut data: ValueReconstructState,
|
||||
) -> Result<(Bytes, ValueReconstructState), PageReconstructError> {
|
||||
) -> Result<Bytes, PageReconstructError> {
|
||||
// Perform WAL redo if needed
|
||||
data.records.reverse();
|
||||
|
||||
@@ -5250,7 +5163,7 @@ impl Timeline {
|
||||
img_lsn,
|
||||
request_lsn,
|
||||
);
|
||||
Ok((img.clone(), data))
|
||||
Ok(img.clone())
|
||||
} else {
|
||||
Err(PageReconstructError::from(anyhow!(
|
||||
"base image for {key} at {request_lsn} not found"
|
||||
@@ -5282,8 +5195,6 @@ impl Timeline {
|
||||
|
||||
let last_rec_lsn = data.records.last().unwrap().0;
|
||||
|
||||
let ret_state = data.clone();
|
||||
|
||||
let img = match self
|
||||
.walredo_mgr
|
||||
.as_ref()
|
||||
@@ -5314,7 +5225,7 @@ impl Timeline {
|
||||
}
|
||||
}
|
||||
|
||||
Ok((img, ret_state))
|
||||
Ok(img)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1064,7 +1064,7 @@ impl Timeline {
|
||||
img: base_image,
|
||||
records: delta_above_base_image,
|
||||
};
|
||||
let (img, _) = tline.reconstruct_value(key, horizon, state).await?;
|
||||
let img = tline.reconstruct_value(key, horizon, state).await?;
|
||||
Ok((keys_above_horizon, img))
|
||||
}
|
||||
|
||||
|
||||
@@ -11,7 +11,6 @@ use utils::{crashsafe, fs_ext, id::TimelineId, pausable_failpoint};
|
||||
|
||||
use crate::{
|
||||
config::PageServerConf,
|
||||
deletion_queue::DeletionQueueClient,
|
||||
task_mgr::{self, TaskKind},
|
||||
tenant::{
|
||||
metadata::TimelineMetadata,
|
||||
@@ -263,7 +262,6 @@ impl DeleteTimelineFlow {
|
||||
timeline_id: TimelineId,
|
||||
local_metadata: &TimelineMetadata,
|
||||
remote_client: RemoteTimelineClient,
|
||||
deletion_queue_client: DeletionQueueClient,
|
||||
) -> anyhow::Result<()> {
|
||||
// Note: here we even skip populating layer map. Timeline is essentially uninitialized.
|
||||
// RemoteTimelineClient is the only functioning part.
|
||||
@@ -274,7 +272,6 @@ impl DeleteTimelineFlow {
|
||||
None, // Ancestor is not needed for deletion.
|
||||
TimelineResources {
|
||||
remote_client,
|
||||
deletion_queue_client,
|
||||
timeline_get_throttle: tenant.timeline_get_throttle.clone(),
|
||||
},
|
||||
// Important. We dont pass ancestor above because it can be missing.
|
||||
|
||||
@@ -49,6 +49,19 @@ pub enum NeonWalRecord {
|
||||
file_path: String,
|
||||
content: Option<Bytes>,
|
||||
},
|
||||
|
||||
/// A testing record for unit testing purposes. It supports append data to an existing image, or clear it.
|
||||
#[cfg(test)]
|
||||
Test {
|
||||
/// Append a string to the image.
|
||||
append: String,
|
||||
/// Clear the image before appending.
|
||||
clear: bool,
|
||||
/// Treat this record as an init record. `clear` should be set to true if this field is set
|
||||
/// to true. This record does not need the history WALs to reconstruct. See [`NeonWalRecord::will_init`] and
|
||||
/// its references in `timeline.rs`.
|
||||
will_init: bool,
|
||||
},
|
||||
}
|
||||
|
||||
impl NeonWalRecord {
|
||||
@@ -58,11 +71,39 @@ impl NeonWalRecord {
|
||||
// If you change this function, you'll also need to change ValueBytes::will_init
|
||||
match self {
|
||||
NeonWalRecord::Postgres { will_init, rec: _ } => *will_init,
|
||||
|
||||
#[cfg(test)]
|
||||
NeonWalRecord::Test { will_init, .. } => *will_init,
|
||||
// None of the special neon record types currently initialize the page
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn wal_append(s: impl AsRef<str>) -> Self {
|
||||
Self::Test {
|
||||
append: s.as_ref().to_string(),
|
||||
clear: false,
|
||||
will_init: false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn wal_clear() -> Self {
|
||||
Self::Test {
|
||||
append: "".to_string(),
|
||||
clear: true,
|
||||
will_init: false,
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn wal_init() -> Self {
|
||||
Self::Test {
|
||||
append: "".to_string(),
|
||||
clear: true,
|
||||
will_init: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// DecodedBkpBlock represents per-page data contained in a WAL record.
|
||||
|
||||
@@ -244,6 +244,20 @@ pub(crate) fn apply_in_neon(
|
||||
let mut writer = page.writer();
|
||||
dir.ser_into(&mut writer)?;
|
||||
}
|
||||
#[cfg(test)]
|
||||
NeonWalRecord::Test {
|
||||
append,
|
||||
clear,
|
||||
will_init,
|
||||
} => {
|
||||
if *will_init {
|
||||
assert!(*clear, "init record must be clear to ensure correctness");
|
||||
}
|
||||
if *clear {
|
||||
page.clear();
|
||||
}
|
||||
page.put_slice(append.as_bytes());
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,19 +1,8 @@
|
||||
From 0b0194a57bd0f3598bd57dbedd0df3932330169d Mon Sep 17 00:00:00 2001
|
||||
From: Heikki Linnakangas <heikki.linnakangas@iki.fi>
|
||||
Date: Fri, 2 Feb 2024 22:26:45 +0200
|
||||
Subject: [PATCH 1/1] Make v0.6.0 work with Neon
|
||||
|
||||
Now that the WAL-logging happens as a separate step at the end of the
|
||||
build, we need a few neon-specific hints to make it work.
|
||||
---
|
||||
src/hnswbuild.c | 36 ++++++++++++++++++++++++++++++++++++
|
||||
1 file changed, 36 insertions(+)
|
||||
|
||||
diff --git a/src/hnswbuild.c b/src/hnswbuild.c
|
||||
index 680789b..ec54dea 100644
|
||||
index dcfb2bd..d5189ee 100644
|
||||
--- a/src/hnswbuild.c
|
||||
+++ b/src/hnswbuild.c
|
||||
@@ -840,9 +840,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
|
||||
@@ -860,9 +860,17 @@ HnswParallelBuildMain(dsm_segment *seg, shm_toc *toc)
|
||||
|
||||
hnswarea = shm_toc_lookup(toc, PARALLEL_KEY_HNSW_AREA, false);
|
||||
|
||||
@@ -31,7 +20,7 @@ index 680789b..ec54dea 100644
|
||||
/* Close relations within worker */
|
||||
index_close(indexRel, indexLockmode);
|
||||
table_close(heapRel, heapLockmode);
|
||||
@@ -1089,13 +1097,41 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
|
||||
@@ -1117,12 +1125,38 @@ BuildIndex(Relation heap, Relation index, IndexInfo *indexInfo,
|
||||
SeedRandom(42);
|
||||
#endif
|
||||
|
||||
@@ -43,14 +32,13 @@ index 680789b..ec54dea 100644
|
||||
|
||||
BuildGraph(buildstate, forkNum);
|
||||
|
||||
- if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM)
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_finish_unlogged_build_phase_1(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
if (RelationNeedsWAL(index))
|
||||
+ {
|
||||
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocks(index), true);
|
||||
|
||||
+ if (RelationNeedsWAL(index) || forkNum == INIT_FORKNUM) {
|
||||
log_newpage_range(index, forkNum, 0, RelationGetNumberOfBlocksInFork(index, forkNum), true);
|
||||
+#ifdef NEON_SMGR
|
||||
+ {
|
||||
+#if PG_VERSION_NUM >= 160000
|
||||
@@ -60,7 +48,7 @@ index 680789b..ec54dea 100644
|
||||
+#endif
|
||||
+
|
||||
+ SetLastWrittenLSNForBlockRange(XactLastRecEnd, rlocator,
|
||||
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
|
||||
+ MAIN_FORKNUM, 0, RelationGetNumberOfBlocks(index));
|
||||
+ SetLastWrittenLSNForRelation(XactLastRecEnd, rlocator, MAIN_FORKNUM);
|
||||
+ }
|
||||
+#endif
|
||||
@@ -69,10 +57,6 @@ index 680789b..ec54dea 100644
|
||||
+#ifdef NEON_SMGR
|
||||
+ smgr_end_unlogged_build(RelationGetSmgr(index));
|
||||
+#endif
|
||||
+
|
||||
|
||||
FreeBuildState(buildstate);
|
||||
}
|
||||
|
||||
--
|
||||
2.39.2
|
||||
|
||||
|
||||
@@ -3112,12 +3112,12 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf
|
||||
request_lsn = UINT64_MAX;
|
||||
|
||||
/*
|
||||
* GetRedoStartLsn() returns LSN of basebackup. We know that the SLRU
|
||||
* GetRedoStartLsn() returns LSN of the basebackup. We know that the SLRU
|
||||
* segment has not changed since the basebackup, because in order to
|
||||
* modify it, we would have had to download it already. And once
|
||||
* downloaded, we never evict SLRU segments from local disk.
|
||||
*/
|
||||
not_modified_since = GetRedoStartLsn();
|
||||
not_modified_since = nm_adjust_lsn(GetRedoStartLsn());
|
||||
|
||||
SlruKind kind;
|
||||
|
||||
|
||||
@@ -1,16 +1,183 @@
|
||||
use measured::FixedCardinalityLabel;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt;
|
||||
use std::fmt::{self, Display};
|
||||
|
||||
use crate::auth::IpPattern;
|
||||
|
||||
use crate::intern::{BranchIdInt, EndpointIdInt, ProjectIdInt};
|
||||
use crate::proxy::retry::ShouldRetry;
|
||||
|
||||
/// Generic error response with human-readable description.
|
||||
/// Note that we can't always present it to user as is.
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ConsoleError {
|
||||
pub error: Box<str>,
|
||||
#[serde(skip)]
|
||||
pub http_status_code: http::StatusCode,
|
||||
pub status: Option<Status>,
|
||||
}
|
||||
|
||||
impl ConsoleError {
|
||||
pub fn get_reason(&self) -> Reason {
|
||||
self.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.details.error_info.as_ref())
|
||||
.map(|e| e.reason)
|
||||
.unwrap_or(Reason::Unknown)
|
||||
}
|
||||
pub fn get_user_facing_message(&self) -> String {
|
||||
use super::provider::errors::REQUEST_FAILED;
|
||||
self.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.details.user_facing_message.as_ref())
|
||||
.map(|m| m.message.clone().into())
|
||||
.unwrap_or_else(|| {
|
||||
// Ask @neondatabase/control-plane for review before adding more.
|
||||
match self.http_status_code {
|
||||
http::StatusCode::NOT_FOUND => {
|
||||
// Status 404: failed to get a project-related resource.
|
||||
format!("{REQUEST_FAILED}: endpoint cannot be found")
|
||||
}
|
||||
http::StatusCode::NOT_ACCEPTABLE => {
|
||||
// Status 406: endpoint is disabled (we don't allow connections).
|
||||
format!("{REQUEST_FAILED}: endpoint is disabled")
|
||||
}
|
||||
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
|
||||
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
|
||||
format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
|
||||
}
|
||||
_ => REQUEST_FAILED.to_owned(),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ConsoleError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let msg = self
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.details.user_facing_message.as_ref())
|
||||
.map(|m| m.message.as_ref())
|
||||
.unwrap_or_else(|| &self.error);
|
||||
write!(f, "{}", msg)
|
||||
}
|
||||
}
|
||||
|
||||
impl ShouldRetry for ConsoleError {
|
||||
fn could_retry(&self) -> bool {
|
||||
if self.status.is_none() || self.status.as_ref().unwrap().details.retry_info.is_none() {
|
||||
// retry some temporary failures because the compute was in a bad state
|
||||
// (bad request can be returned when the endpoint was in transition)
|
||||
return match &self {
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::BAD_REQUEST,
|
||||
..
|
||||
} => true,
|
||||
// don't retry when quotas are exceeded
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref error,
|
||||
..
|
||||
} => !error.contains("compute time quota of non-primary branches is exceeded"),
|
||||
// locked can be returned when the endpoint was in transition
|
||||
// or when quotas are exceeded. don't retry when quotas are exceeded
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::LOCKED,
|
||||
ref error,
|
||||
..
|
||||
} => {
|
||||
!error.contains("quota exceeded")
|
||||
&& !error.contains("the limit for current plan reached")
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
}
|
||||
|
||||
// retry if the response has a retry delay
|
||||
if let Some(retry_info) = self
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.details.retry_info.as_ref())
|
||||
{
|
||||
retry_info.retry_delay_ms > 0
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Status {
|
||||
pub code: Box<str>,
|
||||
pub message: Box<str>,
|
||||
pub details: Details,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct Details {
|
||||
pub error_info: Option<ErrorInfo>,
|
||||
pub retry_info: Option<RetryInfo>,
|
||||
pub user_facing_message: Option<UserFacingMessage>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct ErrorInfo {
|
||||
pub reason: Reason,
|
||||
// Schema could also have `metadata` field, but it's not structured. Skip it for now.
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Deserialize, Default)]
|
||||
pub enum Reason {
|
||||
#[serde(rename = "ROLE_PROTECTED")]
|
||||
RoleProtected,
|
||||
#[serde(rename = "RESOURCE_NOT_FOUND")]
|
||||
ResourceNotFound,
|
||||
#[serde(rename = "PROJECT_NOT_FOUND")]
|
||||
ProjectNotFound,
|
||||
#[serde(rename = "ENDPOINT_NOT_FOUND")]
|
||||
EndpointNotFound,
|
||||
#[serde(rename = "BRANCH_NOT_FOUND")]
|
||||
BranchNotFound,
|
||||
#[serde(rename = "RATE_LIMIT_EXCEEDED")]
|
||||
RateLimitExceeded,
|
||||
#[serde(rename = "NON_PRIMARY_BRANCH_COMPUTE_TIME_EXCEEDED")]
|
||||
NonPrimaryBranchComputeTimeExceeded,
|
||||
#[serde(rename = "ACTIVE_TIME_QUOTA_EXCEEDED")]
|
||||
ActiveTimeQuotaExceeded,
|
||||
#[serde(rename = "COMPUTE_TIME_QUOTA_EXCEEDED")]
|
||||
ComputeTimeQuotaExceeded,
|
||||
#[serde(rename = "WRITTEN_DATA_QUOTA_EXCEEDED")]
|
||||
WrittenDataQuotaExceeded,
|
||||
#[serde(rename = "DATA_TRANSFER_QUOTA_EXCEEDED")]
|
||||
DataTransferQuotaExceeded,
|
||||
#[serde(rename = "LOGICAL_SIZE_QUOTA_EXCEEDED")]
|
||||
LogicalSizeQuotaExceeded,
|
||||
#[default]
|
||||
#[serde(other)]
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl Reason {
|
||||
pub fn is_not_found(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
Reason::ResourceNotFound
|
||||
| Reason::ProjectNotFound
|
||||
| Reason::EndpointNotFound
|
||||
| Reason::BranchNotFound
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct RetryInfo {
|
||||
pub retry_delay_ms: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
pub struct UserFacingMessage {
|
||||
pub message: Box<str>,
|
||||
}
|
||||
|
||||
/// Response which holds client's auth secret, e.g. [`crate::scram::ServerSecret`].
|
||||
|
||||
@@ -25,8 +25,8 @@ use tracing::info;
|
||||
|
||||
pub mod errors {
|
||||
use crate::{
|
||||
console::messages::{self, ConsoleError},
|
||||
error::{io_error, ReportableError, UserFacingError},
|
||||
http,
|
||||
proxy::retry::ShouldRetry,
|
||||
};
|
||||
use thiserror::Error;
|
||||
@@ -34,17 +34,14 @@ pub mod errors {
|
||||
use super::ApiLockError;
|
||||
|
||||
/// A go-to error message which doesn't leak any detail.
|
||||
const REQUEST_FAILED: &str = "Console request failed";
|
||||
pub const REQUEST_FAILED: &str = "Console request failed";
|
||||
|
||||
/// Common console API error.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ApiError {
|
||||
/// Error returned by the console itself.
|
||||
#[error("{REQUEST_FAILED} with {}: {}", .status, .text)]
|
||||
Console {
|
||||
status: http::StatusCode,
|
||||
text: Box<str>,
|
||||
},
|
||||
#[error("{REQUEST_FAILED} with {0}")]
|
||||
Console(ConsoleError),
|
||||
|
||||
/// Various IO errors like broken pipe or malformed payload.
|
||||
#[error("{REQUEST_FAILED}: {0}")]
|
||||
@@ -53,11 +50,11 @@ pub mod errors {
|
||||
|
||||
impl ApiError {
|
||||
/// Returns HTTP status code if it's the reason for failure.
|
||||
pub fn http_status_code(&self) -> Option<http::StatusCode> {
|
||||
pub fn get_reason(&self) -> messages::Reason {
|
||||
use ApiError::*;
|
||||
match self {
|
||||
Console { status, .. } => Some(*status),
|
||||
_ => None,
|
||||
Console(e) => e.get_reason(),
|
||||
_ => messages::Reason::Unknown,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -67,22 +64,7 @@ pub mod errors {
|
||||
use ApiError::*;
|
||||
match self {
|
||||
// To minimize risks, only select errors are forwarded to users.
|
||||
// Ask @neondatabase/control-plane for review before adding more.
|
||||
Console { status, .. } => match *status {
|
||||
http::StatusCode::NOT_FOUND => {
|
||||
// Status 404: failed to get a project-related resource.
|
||||
format!("{REQUEST_FAILED}: endpoint cannot be found")
|
||||
}
|
||||
http::StatusCode::NOT_ACCEPTABLE => {
|
||||
// Status 406: endpoint is disabled (we don't allow connections).
|
||||
format!("{REQUEST_FAILED}: endpoint is disabled")
|
||||
}
|
||||
http::StatusCode::LOCKED | http::StatusCode::UNPROCESSABLE_ENTITY => {
|
||||
// Status 423: project might be in maintenance mode (or bad state), or quotas exceeded.
|
||||
format!("{REQUEST_FAILED}: endpoint is temporarily unavailable. Check your quotas and/or contact our support.")
|
||||
}
|
||||
_ => REQUEST_FAILED.to_owned(),
|
||||
},
|
||||
Console(c) => c.get_user_facing_message(),
|
||||
_ => REQUEST_FAILED.to_owned(),
|
||||
}
|
||||
}
|
||||
@@ -91,29 +73,56 @@ pub mod errors {
|
||||
impl ReportableError for ApiError {
|
||||
fn get_error_kind(&self) -> crate::error::ErrorKind {
|
||||
match self {
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
|
||||
..
|
||||
} => crate::error::ErrorKind::User,
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
text,
|
||||
} if text.contains("compute time quota of non-primary branches is exceeded") => {
|
||||
crate::error::ErrorKind::User
|
||||
ApiError::Console(e) => {
|
||||
use crate::error::ErrorKind::*;
|
||||
match e.get_reason() {
|
||||
crate::console::messages::Reason::RoleProtected => User,
|
||||
crate::console::messages::Reason::ResourceNotFound => User,
|
||||
crate::console::messages::Reason::ProjectNotFound => User,
|
||||
crate::console::messages::Reason::EndpointNotFound => User,
|
||||
crate::console::messages::Reason::BranchNotFound => User,
|
||||
crate::console::messages::Reason::RateLimitExceeded => ServiceRateLimit,
|
||||
crate::console::messages::Reason::NonPrimaryBranchComputeTimeExceeded => {
|
||||
User
|
||||
}
|
||||
crate::console::messages::Reason::ActiveTimeQuotaExceeded => User,
|
||||
crate::console::messages::Reason::ComputeTimeQuotaExceeded => User,
|
||||
crate::console::messages::Reason::WrittenDataQuotaExceeded => User,
|
||||
crate::console::messages::Reason::DataTransferQuotaExceeded => User,
|
||||
crate::console::messages::Reason::LogicalSizeQuotaExceeded => User,
|
||||
crate::console::messages::Reason::Unknown => match &e {
|
||||
ConsoleError {
|
||||
http_status_code:
|
||||
http::StatusCode::NOT_FOUND | http::StatusCode::NOT_ACCEPTABLE,
|
||||
..
|
||||
} => crate::error::ErrorKind::User,
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
error,
|
||||
..
|
||||
} if error.contains(
|
||||
"compute time quota of non-primary branches is exceeded",
|
||||
) =>
|
||||
{
|
||||
crate::error::ErrorKind::User
|
||||
}
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::LOCKED,
|
||||
error,
|
||||
..
|
||||
} if error.contains("quota exceeded")
|
||||
|| error.contains("the limit for current plan reached") =>
|
||||
{
|
||||
crate::error::ErrorKind::User
|
||||
}
|
||||
ConsoleError {
|
||||
http_status_code: http::StatusCode::TOO_MANY_REQUESTS,
|
||||
..
|
||||
} => crate::error::ErrorKind::ServiceRateLimit,
|
||||
ConsoleError { .. } => crate::error::ErrorKind::ControlPlane,
|
||||
},
|
||||
}
|
||||
}
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::LOCKED,
|
||||
text,
|
||||
} if text.contains("quota exceeded")
|
||||
|| text.contains("the limit for current plan reached") =>
|
||||
{
|
||||
crate::error::ErrorKind::User
|
||||
}
|
||||
ApiError::Console {
|
||||
status: http::StatusCode::TOO_MANY_REQUESTS,
|
||||
..
|
||||
} => crate::error::ErrorKind::ServiceRateLimit,
|
||||
ApiError::Console { .. } => crate::error::ErrorKind::ControlPlane,
|
||||
ApiError::Transport(_) => crate::error::ErrorKind::ControlPlane,
|
||||
}
|
||||
}
|
||||
@@ -124,31 +133,7 @@ pub mod errors {
|
||||
match self {
|
||||
// retry some transport errors
|
||||
Self::Transport(io) => io.could_retry(),
|
||||
// retry some temporary failures because the compute was in a bad state
|
||||
// (bad request can be returned when the endpoint was in transition)
|
||||
Self::Console {
|
||||
status: http::StatusCode::BAD_REQUEST,
|
||||
..
|
||||
} => true,
|
||||
// don't retry when quotas are exceeded
|
||||
Self::Console {
|
||||
status: http::StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref text,
|
||||
} => !text.contains("compute time quota of non-primary branches is exceeded"),
|
||||
// locked can be returned when the endpoint was in transition
|
||||
// or when quotas are exceeded. don't retry when quotas are exceeded
|
||||
Self::Console {
|
||||
status: http::StatusCode::LOCKED,
|
||||
ref text,
|
||||
} => {
|
||||
// written data quota exceeded
|
||||
// data transfer quota exceeded
|
||||
// compute time quota exceeded
|
||||
// logical size quota exceeded
|
||||
!text.contains("quota exceeded")
|
||||
&& !text.contains("the limit for current plan reached")
|
||||
}
|
||||
_ => false,
|
||||
Self::Console(e) => e.could_retry(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -509,7 +494,7 @@ impl<K: Hash + Eq + Clone> ApiLocks<K> {
|
||||
self.metrics
|
||||
.semaphore_acquire_seconds
|
||||
.observe(now.elapsed().as_secs_f64());
|
||||
|
||||
info!("acquired permit {:?}", now.elapsed().as_secs_f64());
|
||||
Ok(WakeComputePermit { permit: permit? })
|
||||
}
|
||||
|
||||
|
||||
@@ -94,12 +94,14 @@ impl Api {
|
||||
let body = match parse_body::<GetRoleSecret>(response).await {
|
||||
Ok(body) => body,
|
||||
// Error 404 is special: it's ok not to have a secret.
|
||||
Err(e) => match e.http_status_code() {
|
||||
Some(http::StatusCode::NOT_FOUND) => {
|
||||
// TODO(anna): retry
|
||||
Err(e) => {
|
||||
if e.get_reason().is_not_found() {
|
||||
return Ok(AuthInfo::default());
|
||||
} else {
|
||||
return Err(e.into());
|
||||
}
|
||||
_otherwise => return Err(e.into()),
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
let secret = if body.role_secret.is_empty() {
|
||||
@@ -328,19 +330,24 @@ async fn parse_body<T: for<'a> serde::Deserialize<'a>>(
|
||||
info!("request succeeded, processing the body");
|
||||
return Ok(response.json().await?);
|
||||
}
|
||||
let s = response.bytes().await?;
|
||||
// Log plaintext to be able to detect, whether there are some cases not covered by the error struct.
|
||||
info!("response_error plaintext: {:?}", s);
|
||||
|
||||
// Don't throw an error here because it's not as important
|
||||
// as the fact that the request itself has failed.
|
||||
let body = response.json().await.unwrap_or_else(|e| {
|
||||
let mut body = serde_json::from_slice(&s).unwrap_or_else(|e| {
|
||||
warn!("failed to parse error body: {e}");
|
||||
ConsoleError {
|
||||
error: "reason unclear (malformed error message)".into(),
|
||||
http_status_code: status,
|
||||
status: None,
|
||||
}
|
||||
});
|
||||
body.http_status_code = status;
|
||||
|
||||
let text = body.error;
|
||||
error!("console responded with an error ({status}): {text}");
|
||||
Err(ApiError::Console { status, text })
|
||||
error!("console responded with an error ({status}): {body:?}");
|
||||
Err(ApiError::Console(body))
|
||||
}
|
||||
|
||||
fn parse_host_port(input: &str) -> Option<(&str, u16)> {
|
||||
|
||||
@@ -12,7 +12,7 @@ use crate::auth::backend::{
|
||||
};
|
||||
use crate::config::{CertResolver, RetryConfig};
|
||||
use crate::console::caches::NodeInfoCache;
|
||||
use crate::console::messages::MetricsAuxInfo;
|
||||
use crate::console::messages::{ConsoleError, MetricsAuxInfo};
|
||||
use crate::console::provider::{CachedAllowedIps, CachedRoleSecret, ConsoleBackend};
|
||||
use crate::console::{self, CachedNodeInfo, NodeInfo};
|
||||
use crate::error::ErrorKind;
|
||||
@@ -484,18 +484,20 @@ impl TestBackend for TestConnectMechanism {
|
||||
match action {
|
||||
ConnectAction::Wake => Ok(helper_create_cached_node_info(self.cache)),
|
||||
ConnectAction::WakeFail => {
|
||||
let err = console::errors::ApiError::Console {
|
||||
status: http::StatusCode::FORBIDDEN,
|
||||
text: "TEST".into(),
|
||||
};
|
||||
let err = console::errors::ApiError::Console(ConsoleError {
|
||||
http_status_code: http::StatusCode::FORBIDDEN,
|
||||
error: "TEST".into(),
|
||||
status: None,
|
||||
});
|
||||
assert!(!err.could_retry());
|
||||
Err(console::errors::WakeComputeError::ApiError(err))
|
||||
}
|
||||
ConnectAction::WakeRetry => {
|
||||
let err = console::errors::ApiError::Console {
|
||||
status: http::StatusCode::BAD_REQUEST,
|
||||
text: "TEST".into(),
|
||||
};
|
||||
let err = console::errors::ApiError::Console(ConsoleError {
|
||||
http_status_code: http::StatusCode::BAD_REQUEST,
|
||||
error: "TEST".into(),
|
||||
status: None,
|
||||
});
|
||||
assert!(err.could_retry());
|
||||
Err(console::errors::WakeComputeError::ApiError(err))
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::config::RetryConfig;
|
||||
use crate::console::messages::ConsoleError;
|
||||
use crate::console::{errors::WakeComputeError, provider::CachedNodeInfo};
|
||||
use crate::context::RequestMonitoring;
|
||||
use crate::metrics::{
|
||||
@@ -88,36 +89,76 @@ fn report_error(e: &WakeComputeError, retry: bool) {
|
||||
let kind = match e {
|
||||
WakeComputeError::BadComputeAddress(_) => WakeupFailureKind::BadComputeAddress,
|
||||
WakeComputeError::ApiError(ApiError::Transport(_)) => WakeupFailureKind::ApiTransportError,
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
ref text,
|
||||
}) if text.contains("written data quota exceeded")
|
||||
|| text.contains("the limit for current plan reached") =>
|
||||
{
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref text,
|
||||
}) if text.contains("compute time quota of non-primary branches is exceeded") => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::LOCKED,
|
||||
..
|
||||
}) => WakeupFailureKind::ApiConsoleLocked,
|
||||
WakeComputeError::ApiError(ApiError::Console {
|
||||
status: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
}) => WakeupFailureKind::ApiConsoleBadRequest,
|
||||
WakeComputeError::ApiError(ApiError::Console { status, .. })
|
||||
if status.is_server_error() =>
|
||||
{
|
||||
WakeupFailureKind::ApiConsoleOtherServerError
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console { .. }) => {
|
||||
WakeupFailureKind::ApiConsoleOtherError
|
||||
}
|
||||
WakeComputeError::ApiError(ApiError::Console(e)) => match e.get_reason() {
|
||||
crate::console::messages::Reason::RoleProtected => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::ResourceNotFound => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::ProjectNotFound => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::EndpointNotFound => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::BranchNotFound => {
|
||||
WakeupFailureKind::ApiConsoleBadRequest
|
||||
}
|
||||
crate::console::messages::Reason::RateLimitExceeded => {
|
||||
WakeupFailureKind::ApiConsoleLocked
|
||||
}
|
||||
crate::console::messages::Reason::NonPrimaryBranchComputeTimeExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::ActiveTimeQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::ComputeTimeQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::WrittenDataQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::DataTransferQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::LogicalSizeQuotaExceeded => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
crate::console::messages::Reason::Unknown => match e {
|
||||
ConsoleError {
|
||||
http_status_code: StatusCode::LOCKED,
|
||||
ref error,
|
||||
..
|
||||
} if error.contains("written data quota exceeded")
|
||||
|| error.contains("the limit for current plan reached") =>
|
||||
{
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
ConsoleError {
|
||||
http_status_code: StatusCode::UNPROCESSABLE_ENTITY,
|
||||
ref error,
|
||||
..
|
||||
} if error.contains("compute time quota of non-primary branches is exceeded") => {
|
||||
WakeupFailureKind::QuotaExceeded
|
||||
}
|
||||
ConsoleError {
|
||||
http_status_code: StatusCode::LOCKED,
|
||||
..
|
||||
} => WakeupFailureKind::ApiConsoleLocked,
|
||||
ConsoleError {
|
||||
http_status_code: StatusCode::BAD_REQUEST,
|
||||
..
|
||||
} => WakeupFailureKind::ApiConsoleBadRequest,
|
||||
ConsoleError {
|
||||
http_status_code, ..
|
||||
} if http_status_code.is_server_error() => {
|
||||
WakeupFailureKind::ApiConsoleOtherServerError
|
||||
}
|
||||
ConsoleError { .. } => WakeupFailureKind::ApiConsoleOtherError,
|
||||
},
|
||||
},
|
||||
WakeComputeError::TooManyConnections => WakeupFailureKind::ApiConsoleLocked,
|
||||
WakeComputeError::TooManyConnectionAttempts(_) => WakeupFailureKind::TimeoutError,
|
||||
};
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use std::usize;
|
||||
|
||||
use super::{LimitAlgorithm, Outcome, Sample};
|
||||
|
||||
/// Loss-based congestion avoidance.
|
||||
|
||||
@@ -32,8 +32,6 @@ pub struct ClientFirstMessage<'a> {
|
||||
pub bare: &'a str,
|
||||
/// Channel binding mode.
|
||||
pub cbind_flag: ChannelBinding<&'a str>,
|
||||
/// (Client username)[<https://github.com/postgres/postgres/blob/94226d4506e66d6e7cbf/src/backend/libpq/auth-scram.c#L13>].
|
||||
pub username: &'a str,
|
||||
/// Client nonce.
|
||||
pub nonce: &'a str,
|
||||
}
|
||||
@@ -58,6 +56,14 @@ impl<'a> ClientFirstMessage<'a> {
|
||||
|
||||
// In theory, these might be preceded by "reserved-mext" (i.e. "m=")
|
||||
let username = parts.next()?.strip_prefix("n=")?;
|
||||
|
||||
// https://github.com/postgres/postgres/blob/f83908798f78c4cafda217ca875602c88ea2ae28/src/backend/libpq/auth-scram.c#L13-L14
|
||||
if !username.is_empty() {
|
||||
tracing::warn!(username, "scram username provided, but is not expected")
|
||||
// TODO(conrad):
|
||||
// return None;
|
||||
}
|
||||
|
||||
let nonce = parts.next()?.strip_prefix("r=")?;
|
||||
|
||||
// Validate but ignore auth extensions
|
||||
@@ -66,7 +72,6 @@ impl<'a> ClientFirstMessage<'a> {
|
||||
Some(Self {
|
||||
bare,
|
||||
cbind_flag,
|
||||
username,
|
||||
nonce,
|
||||
})
|
||||
}
|
||||
@@ -188,19 +193,18 @@ mod tests {
|
||||
|
||||
// (Almost) real strings captured during debug sessions
|
||||
let cases = [
|
||||
(NotSupportedClient, "n,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"),
|
||||
(NotSupportedServer, "y,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju"),
|
||||
(NotSupportedClient, "n,,n=,r=t8JwklwKecDLwSsA72rHmVju"),
|
||||
(NotSupportedServer, "y,,n=,r=t8JwklwKecDLwSsA72rHmVju"),
|
||||
(
|
||||
Required("tls-server-end-point"),
|
||||
"p=tls-server-end-point,,n=pepe,r=t8JwklwKecDLwSsA72rHmVju",
|
||||
"p=tls-server-end-point,,n=,r=t8JwklwKecDLwSsA72rHmVju",
|
||||
),
|
||||
];
|
||||
|
||||
for (cb, input) in cases {
|
||||
let msg = ClientFirstMessage::parse(input).unwrap();
|
||||
|
||||
assert_eq!(msg.bare, "n=pepe,r=t8JwklwKecDLwSsA72rHmVju");
|
||||
assert_eq!(msg.username, "pepe");
|
||||
assert_eq!(msg.bare, "n=,r=t8JwklwKecDLwSsA72rHmVju");
|
||||
assert_eq!(msg.nonce, "t8JwklwKecDLwSsA72rHmVju");
|
||||
assert_eq!(msg.cbind_flag, cb);
|
||||
}
|
||||
@@ -208,14 +212,13 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn parse_client_first_message_with_invalid_gs2_authz() {
|
||||
assert!(ClientFirstMessage::parse("n,authzid,n=user,r=nonce").is_none())
|
||||
assert!(ClientFirstMessage::parse("n,authzid,n=,r=nonce").is_none())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_client_first_message_with_extra_params() {
|
||||
let msg = ClientFirstMessage::parse("n,,n=user,r=nonce,a=foo,b=bar,c=baz").unwrap();
|
||||
assert_eq!(msg.bare, "n=user,r=nonce,a=foo,b=bar,c=baz");
|
||||
assert_eq!(msg.username, "user");
|
||||
let msg = ClientFirstMessage::parse("n,,n=,r=nonce,a=foo,b=bar,c=baz").unwrap();
|
||||
assert_eq!(msg.bare, "n=,r=nonce,a=foo,b=bar,c=baz");
|
||||
assert_eq!(msg.nonce, "nonce");
|
||||
assert_eq!(msg.cbind_flag, ChannelBinding::NotSupportedClient);
|
||||
}
|
||||
@@ -223,9 +226,9 @@ mod tests {
|
||||
#[test]
|
||||
fn parse_client_first_message_with_extra_params_invalid() {
|
||||
// must be of the form `<ascii letter>=<...>`
|
||||
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,abc=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,1=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=user,r=nonce,a").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,abc=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,1=foo").is_none());
|
||||
assert!(ClientFirstMessage::parse("n,,n=,r=nonce,a").is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.78.0"
|
||||
channel = "1.79.0"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -2409,11 +2409,17 @@ impl Service {
|
||||
(detach_waiters, shard_ids, node.clone())
|
||||
};
|
||||
|
||||
if let Err(e) = self.await_waiters(detach_waiters, RECONCILE_TIMEOUT).await {
|
||||
// Failing to detach shouldn't hold up deletion, e.g. if a node is offline we should be able
|
||||
// to use some other node to run the remote deletion.
|
||||
tracing::warn!("Failed to detach some locations: {e}");
|
||||
}
|
||||
// This reconcile wait can fail in a few ways:
|
||||
// A there is a very long queue for the reconciler semaphore
|
||||
// B some pageserver is failing to handle a detach promptly
|
||||
// C some pageserver goes offline right at the moment we send it a request.
|
||||
//
|
||||
// A and C are transient: the semaphore will eventually become available, and once a node is marked offline
|
||||
// the next attempt to reconcile will silently skip detaches for an offline node and succeed. If B happens,
|
||||
// it's a bug, and needs resolving at the pageserver level (we shouldn't just leave attachments behind while
|
||||
// deleting the underlying data).
|
||||
self.await_waiters(detach_waiters, RECONCILE_TIMEOUT)
|
||||
.await?;
|
||||
|
||||
let locations = shard_ids
|
||||
.into_iter()
|
||||
@@ -2431,13 +2437,11 @@ impl Service {
|
||||
for result in results {
|
||||
match result {
|
||||
Ok(StatusCode::ACCEPTED) => {
|
||||
// This could happen if we failed detach above, and hit a pageserver where the tenant
|
||||
// is still attached: it will accept the deletion in the background
|
||||
tracing::warn!(
|
||||
"Unexpectedly still attached on {}, client should retry",
|
||||
// This should never happen: we waited for detaches to finish above
|
||||
return Err(ApiError::InternalServerError(anyhow::anyhow!(
|
||||
"Unexpectedly still attached on {}",
|
||||
node
|
||||
);
|
||||
return Ok(StatusCode::ACCEPTED);
|
||||
)));
|
||||
}
|
||||
Ok(_) => {}
|
||||
Err(mgmt_api::Error::Cancelled) => {
|
||||
|
||||
@@ -94,8 +94,6 @@ DEFAULT_PAGESERVER_ALLOWED_ERRORS = (
|
||||
".*WARN.*path=/v1/utilization .*request was dropped before completing",
|
||||
# Can happen during shutdown
|
||||
".*scheduling deletion on drop failed: queue is in state Stopped.*",
|
||||
# Can happen during shutdown
|
||||
".*ignoring failure to find gc cutoffs: timeline shutting down.*",
|
||||
)
|
||||
|
||||
|
||||
|
||||
15
test_runner/performance/pgvector/halfvec_build.sql
Normal file
15
test_runner/performance/pgvector/halfvec_build.sql
Normal file
@@ -0,0 +1,15 @@
|
||||
DROP TABLE IF EXISTS halfvec_test_table;
|
||||
|
||||
CREATE TABLE halfvec_test_table (
|
||||
_id text NOT NULL,
|
||||
title text,
|
||||
text text,
|
||||
embeddings halfvec(1536),
|
||||
PRIMARY KEY (_id)
|
||||
);
|
||||
|
||||
INSERT INTO halfvec_test_table (_id, title, text, embeddings)
|
||||
SELECT _id, title, text, embeddings::halfvec
|
||||
FROM documents;
|
||||
|
||||
CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128);
|
||||
@@ -0,0 +1,13 @@
|
||||
-- run with pooled connection
|
||||
-- pgbench -T 300 -c 100 -j20 -f pgbench_halfvec_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
|
||||
|
||||
with x (x) as (
|
||||
select "embeddings" as x
|
||||
from halfvec_test_table
|
||||
TABLESAMPLE SYSTEM (1)
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT title, "embeddings" <=> (select x from x) as distance
|
||||
FROM halfvec_test_table
|
||||
ORDER BY 2
|
||||
LIMIT 30;
|
||||
@@ -1,13 +0,0 @@
|
||||
-- run with pooled connection
|
||||
-- pgbench -T 300 -c 100 -j20 -f pgbench_hnsw_queries.sql -postgresql://neondb_owner:<secret>@ep-floral-thunder-w1gzhaxi-pooler.eu-west-1.aws.neon.build/neondb?sslmode=require"
|
||||
|
||||
with x (x) as (
|
||||
select "embeddings" as x
|
||||
from hnsw_test_table
|
||||
TABLESAMPLE SYSTEM (1)
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT title, "embeddings" <=> (select x from x) as distance
|
||||
FROM hnsw_test_table
|
||||
ORDER BY 2
|
||||
LIMIT 30;
|
||||
@@ -106,6 +106,7 @@ QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
# Disable auto formatting for the list of queries so that it's easier to read
|
||||
# fmt: off
|
||||
PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
LabelledQuery("PGVPREP", r"ALTER EXTENSION VECTOR UPDATE;"),
|
||||
LabelledQuery("PGV0", r"DROP TABLE IF EXISTS hnsw_test_table;"),
|
||||
LabelledQuery("PGV1", r"CREATE TABLE hnsw_test_table AS TABLE documents WITH NO DATA;"),
|
||||
LabelledQuery("PGV2", r"INSERT INTO hnsw_test_table SELECT * FROM documents;"),
|
||||
@@ -115,6 +116,10 @@ PGVECTOR_QUERIES: Tuple[LabelledQuery, ...] = (
|
||||
LabelledQuery("PGV6", r"CREATE INDEX ON hnsw_test_table USING hnsw (embeddings vector_l1_ops);"),
|
||||
LabelledQuery("PGV7", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_hamming_ops);"),
|
||||
LabelledQuery("PGV8", r"CREATE INDEX ON hnsw_test_table USING hnsw ((binary_quantize(embeddings)::bit(1536)) bit_jaccard_ops);"),
|
||||
LabelledQuery("PGV9", r"DROP TABLE IF EXISTS halfvec_test_table;"),
|
||||
LabelledQuery("PGV10", r"CREATE TABLE halfvec_test_table (_id text NOT NULL, title text, text text, embeddings halfvec(1536), PRIMARY KEY (_id));"),
|
||||
LabelledQuery("PGV11", r"INSERT INTO halfvec_test_table (_id, title, text, embeddings) SELECT _id, title, text, embeddings::halfvec FROM documents;"),
|
||||
LabelledQuery("PGV12", r"CREATE INDEX documents_half_precision_hnsw_idx ON halfvec_test_table USING hnsw (embeddings halfvec_cosine_ops) WITH (m = 64, ef_construction = 128);"),
|
||||
)
|
||||
# fmt: on
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ class PgBenchLoadType(enum.Enum):
|
||||
SIMPLE_UPDATE = "simple-update"
|
||||
SELECT_ONLY = "select-only"
|
||||
PGVECTOR_HNSW = "pgvector-hnsw"
|
||||
PGVECTOR_HALFVEC = "pgvector-halfvec"
|
||||
|
||||
|
||||
def utc_now_timestamp() -> int:
|
||||
@@ -153,6 +154,26 @@ def run_test_pgbench(env: PgCompare, scale: int, duration: int, workload_type: P
|
||||
password=password,
|
||||
)
|
||||
|
||||
if workload_type == PgBenchLoadType.PGVECTOR_HALFVEC:
|
||||
# Run simple-update workload
|
||||
run_pgbench(
|
||||
env,
|
||||
"pgvector-halfvec",
|
||||
[
|
||||
"pgbench",
|
||||
"-f",
|
||||
"test_runner/performance/pgvector/pgbench_custom_script_pgvector_halfvec_queries.sql",
|
||||
"-c100",
|
||||
"-j20",
|
||||
f"-T{duration}",
|
||||
"-P2",
|
||||
"--protocol=prepared",
|
||||
"--progress-timestamp",
|
||||
connstr,
|
||||
],
|
||||
password=password,
|
||||
)
|
||||
|
||||
env.report_size()
|
||||
|
||||
|
||||
@@ -222,13 +243,3 @@ def test_pgbench_remote_simple_update(remote_compare: PgCompare, scale: int, dur
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_select_only(remote_compare: PgCompare, scale: int, duration: int):
|
||||
run_test_pgbench(remote_compare, scale, duration, PgBenchLoadType.SELECT_ONLY)
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)
|
||||
|
||||
24
test_runner/performance/test_perf_pgvector_queries.py
Normal file
24
test_runner/performance/test_perf_pgvector_queries.py
Normal file
@@ -0,0 +1,24 @@
|
||||
import pytest
|
||||
from fixtures.compare_fixtures import PgCompare
|
||||
|
||||
from performance.test_perf_pgbench import PgBenchLoadType, get_durations_matrix, run_test_pgbench
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with HNSW.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector_hnsw(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HNSW)
|
||||
|
||||
|
||||
# The following test runs on an existing database that has pgvector extension installed
|
||||
# and a table with 1 million embedding vectors loaded and indexed with halfvec.
|
||||
#
|
||||
# Run this pgbench tests against an existing remote Postgres cluster with the necessary setup.
|
||||
@pytest.mark.parametrize("duration", get_durations_matrix())
|
||||
@pytest.mark.remote_cluster
|
||||
def test_pgbench_remote_pgvector_halfvec(remote_compare: PgCompare, duration: int):
|
||||
run_test_pgbench(remote_compare, 1, duration, PgBenchLoadType.PGVECTOR_HALFVEC)
|
||||
@@ -300,7 +300,7 @@ def test_replica_query_race(neon_simple_env: NeonEnv):
|
||||
p_cur.execute("CREATE TABLE test AS SELECT 0 AS counter")
|
||||
|
||||
standby_ep = env.endpoints.new_replica_start(origin=primary_ep, endpoint_id="standby")
|
||||
time.sleep(1)
|
||||
wait_replica_caughtup(primary_ep, standby_ep)
|
||||
|
||||
# In primary, run a lot of UPDATEs on a single page
|
||||
finished = False
|
||||
|
||||
@@ -129,3 +129,33 @@ def test_ondemand_download_replica(neon_env_builder: NeonEnvBuilder, shard_count
|
||||
cur_replica = conn_replica.cursor()
|
||||
cur_replica.execute("SELECT * FROM clogtest")
|
||||
assert cur_replica.fetchall() == [(1,), (3,)]
|
||||
|
||||
|
||||
def test_ondemand_download_after_wal_switch(neon_env_builder: NeonEnvBuilder):
|
||||
"""
|
||||
Test on-demand SLRU download on standby, when starting right after
|
||||
WAL segment switch.
|
||||
|
||||
This is a repro for a bug in how the LSN at WAL page/segment
|
||||
boundary was handled (https://github.com/neondatabase/neon/issues/8030)
|
||||
"""
|
||||
|
||||
tenant_conf = {
|
||||
"lazy_slru_download": "true",
|
||||
}
|
||||
env = neon_env_builder.init_start(initial_tenant_conf=tenant_conf)
|
||||
|
||||
endpoint = env.endpoints.create_start("main")
|
||||
pg_conn = endpoint.connect()
|
||||
cur = pg_conn.cursor()
|
||||
|
||||
# Create a test table
|
||||
cur.execute("CREATE TABLE clogtest (id integer)")
|
||||
cur.execute("INSERT INTO clogtest VALUES (1)")
|
||||
|
||||
# Start standby at WAL segment boundary
|
||||
cur.execute("SELECT pg_switch_wal()")
|
||||
lsn = Lsn(query_scalar(cur, "SELECT pg_current_wal_insert_lsn()"))
|
||||
_endpoint_at_lsn = env.endpoints.create_start(
|
||||
branch_name="main", endpoint_id="ep-at-lsn", lsn=lsn
|
||||
)
|
||||
|
||||
@@ -133,6 +133,9 @@ def test_storage_controller_smoke(
|
||||
|
||||
wait_until(10, 1, lambda: node_evacuated(env.pageservers[0].id))
|
||||
|
||||
# Let all the reconciliations after marking the node offline complete
|
||||
env.storage_controller.reconcile_until_idle()
|
||||
|
||||
# Marking pageserver active should not migrate anything to it
|
||||
# immediately
|
||||
env.storage_controller.node_configure(env.pageservers[0].id, {"availability": "Active"})
|
||||
|
||||
@@ -678,10 +678,6 @@ def test_synthetic_size_while_deleting(neon_env_builder: NeonEnvBuilder):
|
||||
with pytest.raises(PageserverApiException, match=matcher):
|
||||
completion.result()
|
||||
|
||||
# this happens on both cases
|
||||
env.pageserver.allowed_errors.append(
|
||||
".*ignoring failure to find gc cutoffs: timeline shutting down.*"
|
||||
)
|
||||
# this happens only in the case of deletion (http response logging)
|
||||
env.pageserver.allowed_errors.append(".*Failed to refresh gc_info before gathering inputs.*")
|
||||
|
||||
|
||||
@@ -287,13 +287,6 @@ def test_vm_bit_clear_on_heap_lock_blackbox(neon_env_builder: NeonEnvBuilder):
|
||||
# already truncated away.
|
||||
#
|
||||
# ERROR: could not access status of transaction 1027
|
||||
|
||||
# Debugging https://github.com/neondatabase/neon/issues/6967
|
||||
# the select() below fails occassionally at get_impl="vectored" validation
|
||||
env.pageserver.http_client().patch_tenant_config_client_side(
|
||||
tenant_id,
|
||||
{"test_vm_bit_debug_logging": True},
|
||||
)
|
||||
cur.execute("select xmin, xmax, * from vmtest_lock where id = 40000 for update")
|
||||
tup = cur.fetchall()
|
||||
log.info(f"tuple = {tup}")
|
||||
|
||||
@@ -601,13 +601,16 @@ async def run_segment_init_failure(env: NeonEnv):
|
||||
conn = await ep.connect_async()
|
||||
ep.safe_psql("select pg_switch_wal()") # jump to the segment boundary
|
||||
# next insertion should hang until failpoint is disabled.
|
||||
asyncio.create_task(conn.execute("insert into t select generate_series(1,1), 'payload'"))
|
||||
bg_query = asyncio.create_task(
|
||||
conn.execute("insert into t select generate_series(1,1), 'payload'")
|
||||
)
|
||||
sleep_sec = 2
|
||||
await asyncio.sleep(sleep_sec)
|
||||
# also restart ep at segment boundary to make test more interesting
|
||||
ep.stop()
|
||||
# it must still be not finished
|
||||
# assert not bg_query.done()
|
||||
assert not bg_query.done()
|
||||
# Also restart ep at segment boundary to make test more interesting. Do it in immediate mode;
|
||||
# fast will hang because it will try to gracefully finish sending WAL.
|
||||
ep.stop(mode="immediate")
|
||||
# Without segment rename during init (#6402) previous statement created
|
||||
# partially initialized 16MB segment, so sk restart also triggers #6401.
|
||||
sk.stop().start()
|
||||
|
||||
@@ -18,7 +18,7 @@ commands:
|
||||
- name: postgres-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres" /bin/postgres_exporter'
|
||||
shell: 'DATA_SOURCE_NAME="user=cloud_admin sslmode=disable dbname=postgres application_name=postgres-exporter" /bin/postgres_exporter'
|
||||
- name: sql-exporter
|
||||
user: nobody
|
||||
sysvInitAction: respawn
|
||||
@@ -93,7 +93,7 @@ files:
|
||||
target:
|
||||
# Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL)
|
||||
# the schema gets dropped or replaced to match the driver expected DSN format.
|
||||
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable'
|
||||
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter'
|
||||
|
||||
# Collectors (referenced by name) to execute on the target.
|
||||
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
|
||||
@@ -128,7 +128,7 @@ files:
|
||||
target:
|
||||
# Data source name always has a URI schema that matches the driver name. In some cases (e.g. MySQL)
|
||||
# the schema gets dropped or replaced to match the driver expected DSN format.
|
||||
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable'
|
||||
data_source_name: 'postgresql://cloud_admin@127.0.0.1:5432/postgres?sslmode=disable&application_name=sql_exporter_autoscaling'
|
||||
|
||||
# Collectors (referenced by name) to execute on the target.
|
||||
# Glob patterns are supported (see <https://pkg.go.dev/path/filepath#Match> for syntax).
|
||||
|
||||
Reference in New Issue
Block a user