mirror of
https://github.com/neondatabase/neon.git
synced 2026-02-17 09:30:38 +00:00
Compare commits
7 Commits
release-co
...
problame/b
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c54e5fb83 | ||
|
|
8327f68043 | ||
|
|
846e8fdce4 | ||
|
|
70a3bf37a0 | ||
|
|
23c0748cdd | ||
|
|
b5d54ba52a | ||
|
|
58332cb361 |
47
.github/workflows/build_and_test.yml
vendored
47
.github/workflows/build_and_test.yml
vendored
@@ -728,30 +728,6 @@ jobs:
|
||||
tags: |
|
||||
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{needs.tag.outputs.build-tag}}-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
|
||||
- name: Build compute-tools image
|
||||
# compute-tools are Postgres independent, so build it only once
|
||||
# We pick 16, because that builds on debian 11 with older glibc (and is
|
||||
# thus compatible with newer glibc), rather than 17 on Debian 12, as
|
||||
# that isn't guaranteed to be compatible with Debian 11
|
||||
if: matrix.version.pg == 'v16'
|
||||
uses: docker/build-push-action@v6
|
||||
with:
|
||||
target: compute-tools-image
|
||||
context: .
|
||||
build-args: |
|
||||
GIT_VERSION=${{ github.event.pull_request.head.sha || github.sha }}
|
||||
BUILD_TAG=${{ needs.tag.outputs.build-tag }}
|
||||
TAG=${{ needs.build-build-tools-image.outputs.image-tag }}-${{ matrix.version.debian }}
|
||||
DEBIAN_VERSION=${{ matrix.version.debian }}
|
||||
provenance: false
|
||||
push: true
|
||||
pull: true
|
||||
file: compute/compute-node.Dockerfile
|
||||
cache-from: type=registry,ref=cache.neon.build/compute-node-${{ matrix.version.pg }}:cache-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
cache-to: ${{ github.ref_name == 'main' && format('type=registry,ref=cache.neon.build/compute-tools-{0}:cache-{1}-{2},mode=max', matrix.version.pg, matrix.version.debian, matrix.arch) || '' }}
|
||||
tags: |
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-${{ matrix.arch }}
|
||||
|
||||
compute-node-image:
|
||||
needs: [ compute-node-image-arch, tag ]
|
||||
permissions:
|
||||
@@ -794,14 +770,6 @@ jobs:
|
||||
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
|
||||
neondatabase/neon-test-extensions-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
|
||||
|
||||
- name: Create multi-arch compute-tools image
|
||||
if: matrix.version.pg == 'v16'
|
||||
run: |
|
||||
docker buildx imagetools create -t neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }} \
|
||||
-t neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }} \
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-x64 \
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}-${{ matrix.version.debian }}-arm64
|
||||
|
||||
- name: Configure AWS credentials
|
||||
uses: aws-actions/configure-aws-credentials@v4
|
||||
with:
|
||||
@@ -817,12 +785,6 @@ jobs:
|
||||
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }} \
|
||||
neondatabase/compute-node-${{ matrix.version.pg }}:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
- name: Push multi-arch compute-tools image to ECR
|
||||
if: matrix.version.pg == 'v16'
|
||||
run: |
|
||||
docker buildx imagetools create -t 369495373322.dkr.ecr.eu-central-1.amazonaws.com/compute-tools:${{ needs.tag.outputs.build-tag }} \
|
||||
neondatabase/compute-tools:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
vm-compute-node-image:
|
||||
needs: [ check-permissions, tag, compute-node-image ]
|
||||
runs-on: [ self-hosted, large ]
|
||||
@@ -1001,9 +963,6 @@ jobs:
|
||||
docker buildx imagetools create -t $repo/neon:latest \
|
||||
$repo/neon:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
docker buildx imagetools create -t $repo/compute-tools:latest \
|
||||
$repo/compute-tools:${{ needs.tag.outputs.build-tag }}
|
||||
|
||||
for version in ${VERSIONS}; do
|
||||
docker buildx imagetools create -t $repo/compute-node-${version}:latest \
|
||||
$repo/compute-node-${version}:${{ needs.tag.outputs.build-tag }}
|
||||
@@ -1032,7 +991,7 @@ jobs:
|
||||
- name: Copy all images to prod ECR
|
||||
if: github.ref_name == 'release' || github.ref_name == 'release-proxy' || github.ref_name == 'release-compute'
|
||||
run: |
|
||||
for image in neon compute-tools {vm-,}compute-node-{v14,v15,v16,v17}; do
|
||||
for image in neon {vm-,}compute-node-{v14,v15,v16,v17}; do
|
||||
docker buildx imagetools create -t 093970136003.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }} \
|
||||
369495373322.dkr.ecr.eu-central-1.amazonaws.com/${image}:${{ needs.tag.outputs.build-tag }}
|
||||
done
|
||||
@@ -1044,7 +1003,7 @@ jobs:
|
||||
with:
|
||||
client_id: ${{ vars.AZURE_DEV_CLIENT_ID }}
|
||||
image_tag: ${{ needs.tag.outputs.build-tag }}
|
||||
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
|
||||
images: neon vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
|
||||
registry_name: ${{ vars.AZURE_DEV_REGISTRY_NAME }}
|
||||
subscription_id: ${{ vars.AZURE_DEV_SUBSCRIPTION_ID }}
|
||||
tenant_id: ${{ vars.AZURE_TENANT_ID }}
|
||||
@@ -1056,7 +1015,7 @@ jobs:
|
||||
with:
|
||||
client_id: ${{ vars.AZURE_PROD_CLIENT_ID }}
|
||||
image_tag: ${{ needs.tag.outputs.build-tag }}
|
||||
images: neon compute-tools vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
|
||||
images: neon vm-compute-node-v14 vm-compute-node-v15 vm-compute-node-v16 vm-compute-node-v17 compute-node-v14 compute-node-v15 compute-node-v16 compute-node-v17
|
||||
registry_name: ${{ vars.AZURE_PROD_REGISTRY_NAME }}
|
||||
subscription_id: ${{ vars.AZURE_PROD_SUBSCRIPTION_ID }}
|
||||
tenant_id: ${{ vars.AZURE_TENANT_ID }}
|
||||
|
||||
@@ -976,22 +976,9 @@ RUN apt update && apt install --no-install-recommends --no-install-suggests -y p
|
||||
|
||||
FROM rust-extensions-build-pgrx12 AS pg-jsonschema-pg-build
|
||||
ARG PG_VERSION
|
||||
# version 0.3.3 supports v17
|
||||
# last release v0.3.3 - Oct 16, 2024
|
||||
#
|
||||
# there were no breaking changes
|
||||
# so we can use the same version for all postgres versions
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15" | "v16" | "v17") \
|
||||
export PG_JSONSCHEMA_VERSION=0.3.3 \
|
||||
export PG_JSONSCHEMA_CHECKSUM=40c2cffab4187e0233cb8c3bde013be92218c282f95f4469c5282f6b30d64eac \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v${PG_JSONSCHEMA_VERSION}.tar.gz -O pg_jsonschema.tar.gz && \
|
||||
echo "${PG_JSONSCHEMA_CHECKSUM} pg_jsonschema.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/supabase/pg_jsonschema/archive/refs/tags/v0.3.3.tar.gz -O pg_jsonschema.tar.gz && \
|
||||
echo "40c2cffab4187e0233cb8c3bde013be92218c282f95f4469c5282f6b30d64eac pg_jsonschema.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_jsonschema-src && cd pg_jsonschema-src && tar xzf ../pg_jsonschema.tar.gz --strip-components=1 -C . && \
|
||||
# see commit 252b3685a27a0f4c31a0f91e983c6314838e89e8
|
||||
# `unsafe-postgres` feature allows to build pgx extensions
|
||||
@@ -1012,22 +999,9 @@ RUN case "${PG_VERSION}" in \
|
||||
FROM rust-extensions-build-pgrx12 AS pg-graphql-pg-build
|
||||
ARG PG_VERSION
|
||||
|
||||
# version 1.5.9 supports v17
|
||||
# last release v1.5.9 - Oct 16, 2024
|
||||
#
|
||||
# there were no breaking changes
|
||||
# so we can use the same version for all postgres versions
|
||||
RUN case "${PG_VERSION}" in \
|
||||
"v14" | "v15" | "v16" | "v17") \
|
||||
export PG_GRAPHQL_VERSION=1.5.9 \
|
||||
export PG_GRAPHQL_CHECKSUM=cf768385a41278be1333472204fc0328118644ae443182cf52f7b9b23277e497 \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
wget https://github.com/supabase/pg_graphql/archive/refs/tags/v${PG_GRAPHQL_VERSION}.tar.gz -O pg_graphql.tar.gz && \
|
||||
echo "${PG_GRAPHQL_CHECKSUM} pg_graphql.tar.gz" | sha256sum --check && \
|
||||
RUN wget https://github.com/supabase/pg_graphql/archive/refs/tags/v1.5.9.tar.gz -O pg_graphql.tar.gz && \
|
||||
echo "cf768385a41278be1333472204fc0328118644ae443182cf52f7b9b23277e497 pg_graphql.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_graphql-src && cd pg_graphql-src && tar xzf ../pg_graphql.tar.gz --strip-components=1 -C . && \
|
||||
sed -i 's/pgrx = "=0.12.6"/pgrx = { version = "0.12.6", features = [ "unsafe-postgres" ] }/g' Cargo.toml && \
|
||||
cargo pgrx install --release && \
|
||||
@@ -1258,20 +1232,6 @@ RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
|
||||
-C pgxn/neon_rmgr \
|
||||
-s install && \
|
||||
case "${PG_VERSION}" in \
|
||||
"v14" | "v15") \
|
||||
;; \
|
||||
"v16" | "v17") \
|
||||
echo "Skipping HNSW for PostgreSQL ${PG_VERSION}" && exit 0 \
|
||||
;; \
|
||||
*) \
|
||||
echo "unexpected PostgreSQL version" && exit 1 \
|
||||
;; \
|
||||
esac && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
PG_CONFIG=/usr/local/pgsql/bin/pg_config \
|
||||
-C pgxn/hnsw \
|
||||
-s install
|
||||
|
||||
#########################################################################################
|
||||
@@ -1288,17 +1248,6 @@ USER nonroot
|
||||
COPY --chown=nonroot . .
|
||||
RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin compute_ctl --bin fast_import --bin local_proxy
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Final compute-tools image
|
||||
#
|
||||
#########################################################################################
|
||||
|
||||
FROM debian:$DEBIAN_FLAVOR AS compute-tools-image
|
||||
|
||||
COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/compute_ctl /usr/local/bin/compute_ctl
|
||||
COPY --from=compute-tools /home/nonroot/target/release-line-debug-size-lto/fast_import /usr/local/bin/fast_import
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pgbouncer"
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
//!
|
||||
//! # Local Testing
|
||||
//!
|
||||
//! - Comment out most of the pgxns in The Dockerfile.compute-tools to speed up the build.
|
||||
//! - Comment out most of the pgxns in compute-node.Dockerfile to speed up the build.
|
||||
//! - Build the image with the following command:
|
||||
//!
|
||||
//! ```bash
|
||||
|
||||
@@ -7,15 +7,11 @@ Currently we build two main images:
|
||||
- [neondatabase/neon](https://hub.docker.com/repository/docker/neondatabase/neon) — image with pre-built `pageserver`, `safekeeper` and `proxy` binaries and all the required runtime dependencies. Built from [/Dockerfile](/Dockerfile).
|
||||
- [neondatabase/compute-node-v16](https://hub.docker.com/repository/docker/neondatabase/compute-node-v16) — compute node image with pre-built Postgres binaries from [neondatabase/postgres](https://github.com/neondatabase/postgres). Similar images exist for v15 and v14. Built from [/compute-node/Dockerfile](/compute/compute-node.Dockerfile).
|
||||
|
||||
And additional intermediate image:
|
||||
|
||||
- [neondatabase/compute-tools](https://hub.docker.com/repository/docker/neondatabase/compute-tools) — compute node configuration management tools.
|
||||
|
||||
## Build pipeline
|
||||
|
||||
We build all images after a successful `release` tests run and push automatically to Docker Hub with two parallel CI jobs
|
||||
|
||||
1. `neondatabase/compute-tools` and `neondatabase/compute-node-v16` (and -v15 and -v14)
|
||||
1. `neondatabase/compute-node-v17` (and -16, -v15, -v14)
|
||||
|
||||
2. `neondatabase/neon`
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# Storage broker
|
||||
|
||||
Storage broker targets two issues:
|
||||
Storage broker targets two issues
|
||||
- Allowing safekeepers and pageservers learn which nodes also hold their
|
||||
timelines, and timeline statuses there.
|
||||
- Avoiding O(n^2) connections between storage nodes while doing so.
|
||||
@@ -19,7 +19,7 @@ Currently, the only message is `SafekeeperTimelineInfo`. Each safekeeper, for
|
||||
each active timeline, once in a while pushes timeline status to the broker.
|
||||
Other nodes subscribe and receive this info, using it per above.
|
||||
|
||||
Broker serves /metrics on the same port as grpc service.
|
||||
Broker serves /metrics on the same port as grpc service.
|
||||
|
||||
grpcurl can be used to check which values are currently being pushed:
|
||||
```
|
||||
|
||||
@@ -272,6 +272,8 @@ pub struct CompactInfoResponse {
|
||||
pub compact_key_range: Option<CompactKeyRange>,
|
||||
pub compact_lsn_range: Option<CompactLsnRange>,
|
||||
pub sub_compaction: bool,
|
||||
pub running: bool,
|
||||
pub job_id: usize,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
|
||||
@@ -97,8 +97,8 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError};
|
||||
use crate::DEFAULT_PG_VERSION;
|
||||
use crate::{disk_usage_eviction_task, tenant};
|
||||
use pageserver_api::models::{
|
||||
CompactInfoResponse, StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest,
|
||||
TimelineGcRequest, TimelineInfo,
|
||||
StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest,
|
||||
TimelineInfo,
|
||||
};
|
||||
use utils::{
|
||||
auth::SwappableJwtAuth,
|
||||
@@ -2052,15 +2052,7 @@ async fn timeline_compact_info_handler(
|
||||
let tenant = state
|
||||
.tenant_manager
|
||||
.get_attached_tenant_shard(tenant_shard_id)?;
|
||||
let res = tenant.get_scheduled_compaction_tasks(timeline_id);
|
||||
let mut resp = Vec::new();
|
||||
for item in res {
|
||||
resp.push(CompactInfoResponse {
|
||||
compact_key_range: item.compact_key_range,
|
||||
compact_lsn_range: item.compact_lsn_range,
|
||||
sub_compaction: item.sub_compaction,
|
||||
});
|
||||
}
|
||||
let resp = tenant.get_scheduled_compaction_tasks(timeline_id);
|
||||
json_response(StatusCode::OK, resp)
|
||||
}
|
||||
.instrument(info_span!("timeline_compact_info", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id))
|
||||
|
||||
@@ -91,15 +91,6 @@ pub(crate) static STORAGE_TIME_GLOBAL: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_layers_visited_per_read_global",
|
||||
"Number of layers visited to reconstruct one key",
|
||||
vec![1.0, 4.0, 8.0, 16.0, 32.0, 64.0, 128.0, 256.0, 512.0, 1024.0],
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub(crate) static VEC_READ_NUM_LAYERS_VISITED: Lazy<Histogram> = Lazy::new(|| {
|
||||
register_histogram!(
|
||||
"pageserver_layers_visited_per_vectored_read_global",
|
||||
@@ -3894,7 +3885,6 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) {
|
||||
|
||||
// histograms
|
||||
[
|
||||
&READ_NUM_LAYERS_VISITED,
|
||||
&VEC_READ_NUM_LAYERS_VISITED,
|
||||
&WAIT_LSN_TIME,
|
||||
&WAL_REDO_TIME,
|
||||
|
||||
@@ -21,6 +21,7 @@ use enumset::EnumSet;
|
||||
use futures::stream::FuturesUnordered;
|
||||
use futures::StreamExt;
|
||||
use pageserver_api::models;
|
||||
use pageserver_api::models::CompactInfoResponse;
|
||||
use pageserver_api::models::LsnLease;
|
||||
use pageserver_api::models::TimelineArchivalState;
|
||||
use pageserver_api::models::TimelineState;
|
||||
@@ -37,21 +38,17 @@ use remote_timeline_client::manifest::{
|
||||
};
|
||||
use remote_timeline_client::UploadQueueNotReadyError;
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::VecDeque;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::Weak;
|
||||
use std::time::SystemTime;
|
||||
use storage_broker::BrokerClientChannel;
|
||||
use timeline::compaction::GcCompactJob;
|
||||
use timeline::compaction::ScheduledCompactionTask;
|
||||
use timeline::compaction::GcCompactionQueue;
|
||||
use timeline::import_pgdata;
|
||||
use timeline::offload::offload_timeline;
|
||||
use timeline::offload::OffloadError;
|
||||
use timeline::CompactFlags;
|
||||
use timeline::CompactOptions;
|
||||
use timeline::CompactionError;
|
||||
use timeline::ShutdownMode;
|
||||
use tokio::io::BufReader;
|
||||
use tokio::sync::watch;
|
||||
@@ -347,10 +344,8 @@ pub struct Tenant {
|
||||
/// Overhead of mutex is acceptable because compaction is done with a multi-second period.
|
||||
compaction_circuit_breaker: std::sync::Mutex<CircuitBreaker>,
|
||||
|
||||
/// Scheduled compaction tasks. Currently, this can only be populated by triggering
|
||||
/// a manual gc-compaction from the manual compaction API.
|
||||
scheduled_compaction_tasks:
|
||||
std::sync::Mutex<HashMap<TimelineId, VecDeque<ScheduledCompactionTask>>>,
|
||||
/// Scheduled gc-compaction tasks.
|
||||
scheduled_compaction_tasks: std::sync::Mutex<HashMap<TimelineId, Arc<GcCompactionQueue>>>,
|
||||
|
||||
/// If the tenant is in Activating state, notify this to encourage it
|
||||
/// to proceed to Active as soon as possible, rather than waiting for lazy
|
||||
@@ -2997,104 +2992,18 @@ impl Tenant {
|
||||
if has_pending_l0_compaction_task {
|
||||
Some(true)
|
||||
} else {
|
||||
let mut has_pending_scheduled_compaction_task;
|
||||
let next_scheduled_compaction_task = {
|
||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) {
|
||||
if !tline_pending_tasks.is_empty() {
|
||||
info!(
|
||||
"{} tasks left in the compaction schedule queue",
|
||||
tline_pending_tasks.len()
|
||||
);
|
||||
}
|
||||
let next_task = tline_pending_tasks.pop_front();
|
||||
has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty();
|
||||
next_task
|
||||
} else {
|
||||
has_pending_scheduled_compaction_task = false;
|
||||
None
|
||||
}
|
||||
let queue = {
|
||||
let guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
guard.get(timeline_id).cloned()
|
||||
};
|
||||
if let Some(mut next_scheduled_compaction_task) = next_scheduled_compaction_task
|
||||
{
|
||||
if !next_scheduled_compaction_task
|
||||
.options
|
||||
.flags
|
||||
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
|
||||
{
|
||||
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
|
||||
} else if next_scheduled_compaction_task.options.sub_compaction {
|
||||
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
|
||||
let jobs: Vec<GcCompactJob> = timeline
|
||||
.gc_compaction_split_jobs(
|
||||
GcCompactJob::from_compact_options(
|
||||
next_scheduled_compaction_task.options.clone(),
|
||||
),
|
||||
next_scheduled_compaction_task
|
||||
.options
|
||||
.sub_compaction_max_job_size_mb,
|
||||
)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
if jobs.is_empty() {
|
||||
info!("no jobs to run, skipping scheduled compaction task");
|
||||
} else {
|
||||
has_pending_scheduled_compaction_task = true;
|
||||
let jobs_len = jobs.len();
|
||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
let tline_pending_tasks = guard.entry(*timeline_id).or_default();
|
||||
for (idx, job) in jobs.into_iter().enumerate() {
|
||||
// Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
|
||||
// until we do further refactors to allow directly call `compact_with_gc`.
|
||||
let mut flags: EnumSet<CompactFlags> = EnumSet::default();
|
||||
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
|
||||
if job.dry_run {
|
||||
flags |= CompactFlags::DryRun;
|
||||
}
|
||||
let options = CompactOptions {
|
||||
flags,
|
||||
sub_compaction: false,
|
||||
compact_key_range: Some(job.compact_key_range.into()),
|
||||
compact_lsn_range: Some(job.compact_lsn_range.into()),
|
||||
sub_compaction_max_job_size_mb: None,
|
||||
};
|
||||
tline_pending_tasks.push_back(if idx == jobs_len - 1 {
|
||||
ScheduledCompactionTask {
|
||||
options,
|
||||
// The last job in the queue sends the signal and releases the gc guard
|
||||
result_tx: next_scheduled_compaction_task
|
||||
.result_tx
|
||||
.take(),
|
||||
gc_block: next_scheduled_compaction_task
|
||||
.gc_block
|
||||
.take(),
|
||||
}
|
||||
} else {
|
||||
ScheduledCompactionTask {
|
||||
options,
|
||||
result_tx: None,
|
||||
gc_block: None,
|
||||
}
|
||||
});
|
||||
}
|
||||
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
|
||||
}
|
||||
} else {
|
||||
let _ = timeline
|
||||
.compact_with_options(
|
||||
cancel,
|
||||
next_scheduled_compaction_task.options,
|
||||
ctx,
|
||||
)
|
||||
.instrument(info_span!("scheduled_compact_timeline", %timeline_id))
|
||||
.await?;
|
||||
if let Some(tx) = next_scheduled_compaction_task.result_tx.take() {
|
||||
// TODO: we can send compaction statistics in the future
|
||||
tx.send(()).ok();
|
||||
}
|
||||
}
|
||||
if let Some(queue) = queue {
|
||||
let has_pending_tasks = queue
|
||||
.iteration(cancel, ctx, &self.gc_block, timeline)
|
||||
.await?;
|
||||
Some(has_pending_tasks)
|
||||
} else {
|
||||
Some(false)
|
||||
}
|
||||
Some(has_pending_scheduled_compaction_task)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
@@ -3124,34 +3033,32 @@ impl Tenant {
|
||||
}
|
||||
|
||||
/// Cancel scheduled compaction tasks
|
||||
pub(crate) fn cancel_scheduled_compaction(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
) -> Vec<ScheduledCompactionTask> {
|
||||
pub(crate) fn cancel_scheduled_compaction(&self, timeline_id: TimelineId) {
|
||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
if let Some(tline_pending_tasks) = guard.get_mut(&timeline_id) {
|
||||
let current_tline_pending_tasks = std::mem::take(tline_pending_tasks);
|
||||
current_tline_pending_tasks.into_iter().collect()
|
||||
} else {
|
||||
Vec::new()
|
||||
if let Some(q) = guard.get_mut(&timeline_id) {
|
||||
q.cancel_scheduled();
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn get_scheduled_compaction_tasks(
|
||||
&self,
|
||||
timeline_id: TimelineId,
|
||||
) -> Vec<CompactOptions> {
|
||||
use itertools::Itertools;
|
||||
let guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
guard
|
||||
.get(&timeline_id)
|
||||
.map(|tline_pending_tasks| {
|
||||
tline_pending_tasks
|
||||
.iter()
|
||||
.map(|x| x.options.clone())
|
||||
.collect_vec()
|
||||
})
|
||||
.unwrap_or_default()
|
||||
) -> Vec<CompactInfoResponse> {
|
||||
let res = {
|
||||
let guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
guard.get(&timeline_id).map(|q| q.remaining_jobs())
|
||||
};
|
||||
let Some((running, remaining)) = res else {
|
||||
return Vec::new();
|
||||
};
|
||||
let mut result = Vec::new();
|
||||
if let Some((id, running)) = running {
|
||||
result.extend(running.into_compact_info_resp(id, true));
|
||||
}
|
||||
for (id, job) in remaining {
|
||||
result.extend(job.into_compact_info_resp(id, false));
|
||||
}
|
||||
result
|
||||
}
|
||||
|
||||
/// Schedule a compaction task for a timeline.
|
||||
@@ -3160,20 +3067,12 @@ impl Tenant {
|
||||
timeline_id: TimelineId,
|
||||
options: CompactOptions,
|
||||
) -> anyhow::Result<tokio::sync::oneshot::Receiver<()>> {
|
||||
let gc_guard = match self.gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
bail!("cannot run gc-compaction because gc is blocked: {}", e);
|
||||
}
|
||||
};
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
let mut guard = self.scheduled_compaction_tasks.lock().unwrap();
|
||||
let tline_pending_tasks = guard.entry(timeline_id).or_default();
|
||||
tline_pending_tasks.push_back(ScheduledCompactionTask {
|
||||
options,
|
||||
result_tx: Some(tx),
|
||||
gc_block: Some(gc_guard),
|
||||
});
|
||||
let q = guard
|
||||
.entry(timeline_id)
|
||||
.or_insert_with(|| Arc::new(GcCompactionQueue::new()));
|
||||
q.schedule_manual_compaction(options, Some(tx));
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
//!
|
||||
//! The old legacy algorithm is implemented directly in `timeline.rs`.
|
||||
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet};
|
||||
use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
|
||||
use std::ops::{Deref, Range};
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -16,10 +16,12 @@ use super::{
|
||||
|
||||
use anyhow::{anyhow, bail, Context};
|
||||
use bytes::Bytes;
|
||||
use enumset::EnumSet;
|
||||
use fail::fail_point;
|
||||
use itertools::Itertools;
|
||||
use pageserver_api::key::KEY_SIZE;
|
||||
use pageserver_api::keyspace::ShardedRange;
|
||||
use pageserver_api::models::CompactInfoResponse;
|
||||
use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId};
|
||||
use serde::Serialize;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
@@ -30,6 +32,7 @@ use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder}
|
||||
use crate::page_cache;
|
||||
use crate::statvfs::Statvfs;
|
||||
use crate::tenant::checks::check_valid_layermap;
|
||||
use crate::tenant::gc_block::GcBlock;
|
||||
use crate::tenant::remote_timeline_client::WaitCompletionError;
|
||||
use crate::tenant::storage_layer::batch_split_writer::{
|
||||
BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter,
|
||||
@@ -63,16 +66,284 @@ use super::CompactionError;
|
||||
/// Maximum number of deltas before generating an image layer in bottom-most compaction.
|
||||
const COMPACTION_DELTA_THRESHOLD: usize = 5;
|
||||
|
||||
/// A scheduled compaction task.
|
||||
pub(crate) struct ScheduledCompactionTask {
|
||||
/// It's unfortunate that we need to store a compact options struct here because the only outer
|
||||
/// API we can call here is `compact_with_options` which does a few setup calls before starting the
|
||||
/// actual compaction job... We should refactor this to store `GcCompactionJob` in the future.
|
||||
pub options: CompactOptions,
|
||||
/// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender.
|
||||
pub result_tx: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
/// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard.
|
||||
pub gc_block: Option<gc_block::Guard>,
|
||||
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
|
||||
pub struct GcCompactionJobId(pub usize);
|
||||
|
||||
impl std::fmt::Display for GcCompactionJobId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum GcCompactionQueueItem {
|
||||
Manual(CompactOptions),
|
||||
SubCompactionJob(CompactOptions),
|
||||
#[allow(dead_code)]
|
||||
UpdateL2Lsn(Lsn),
|
||||
Notify(GcCompactionJobId),
|
||||
}
|
||||
|
||||
impl GcCompactionQueueItem {
|
||||
pub fn into_compact_info_resp(
|
||||
self,
|
||||
id: GcCompactionJobId,
|
||||
running: bool,
|
||||
) -> Option<CompactInfoResponse> {
|
||||
match self {
|
||||
GcCompactionQueueItem::Manual(options) => Some(CompactInfoResponse {
|
||||
compact_key_range: options.compact_key_range,
|
||||
compact_lsn_range: options.compact_lsn_range,
|
||||
sub_compaction: options.sub_compaction,
|
||||
running,
|
||||
job_id: id.0,
|
||||
}),
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse {
|
||||
compact_key_range: options.compact_key_range,
|
||||
compact_lsn_range: options.compact_lsn_range,
|
||||
sub_compaction: options.sub_compaction,
|
||||
running,
|
||||
job_id: id.0,
|
||||
}),
|
||||
GcCompactionQueueItem::UpdateL2Lsn(_) => None,
|
||||
GcCompactionQueueItem::Notify(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct GcCompactionQueueInner {
|
||||
running: Option<(GcCompactionJobId, GcCompactionQueueItem)>,
|
||||
queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
|
||||
notify: HashMap<GcCompactionJobId, tokio::sync::oneshot::Sender<()>>,
|
||||
gc_guards: HashMap<GcCompactionJobId, gc_block::Guard>,
|
||||
last_id: GcCompactionJobId,
|
||||
}
|
||||
|
||||
impl GcCompactionQueueInner {
|
||||
fn next_id(&mut self) -> GcCompactionJobId {
|
||||
let id = self.last_id;
|
||||
self.last_id = GcCompactionJobId(id.0 + 1);
|
||||
id
|
||||
}
|
||||
}
|
||||
|
||||
/// A structure to store gc_compaction jobs.
|
||||
pub struct GcCompactionQueue {
|
||||
/// All items in the queue, and the currently-running job.
|
||||
inner: std::sync::Mutex<GcCompactionQueueInner>,
|
||||
/// Ensure only one thread is consuming the queue.
|
||||
consumer_lock: tokio::sync::Mutex<()>,
|
||||
}
|
||||
|
||||
impl GcCompactionQueue {
|
||||
pub fn new() -> Self {
|
||||
GcCompactionQueue {
|
||||
inner: std::sync::Mutex::new(GcCompactionQueueInner {
|
||||
running: None,
|
||||
queued: VecDeque::new(),
|
||||
notify: HashMap::new(),
|
||||
gc_guards: HashMap::new(),
|
||||
last_id: GcCompactionJobId(0),
|
||||
}),
|
||||
consumer_lock: tokio::sync::Mutex::new(()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn cancel_scheduled(&self) {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.queued.clear();
|
||||
guard.notify.clear();
|
||||
guard.gc_guards.clear();
|
||||
}
|
||||
|
||||
/// Schedule a manual compaction job.
|
||||
pub fn schedule_manual_compaction(
|
||||
&self,
|
||||
options: CompactOptions,
|
||||
notify: Option<tokio::sync::oneshot::Sender<()>>,
|
||||
) -> GcCompactionJobId {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
let id = guard.next_id();
|
||||
guard
|
||||
.queued
|
||||
.push_back((id, GcCompactionQueueItem::Manual(options)));
|
||||
if let Some(notify) = notify {
|
||||
guard.notify.insert(id, notify);
|
||||
}
|
||||
info!("scheduled compaction job id={}", id);
|
||||
id
|
||||
}
|
||||
|
||||
/// Trigger an auto compaction.
|
||||
#[allow(dead_code)]
|
||||
pub fn trigger_auto_compaction(&self, _: &Arc<Timeline>) {}
|
||||
|
||||
/// Notify the caller the job has finished and unblock GC.
|
||||
fn notify_and_unblock(&self, id: GcCompactionJobId) {
|
||||
info!("compaction job id={} finished", id);
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
if let Some(blocking) = guard.gc_guards.remove(&id) {
|
||||
drop(blocking)
|
||||
}
|
||||
if let Some(tx) = guard.notify.remove(&id) {
|
||||
let _ = tx.send(());
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_sub_compaction(
|
||||
&self,
|
||||
id: GcCompactionJobId,
|
||||
options: CompactOptions,
|
||||
timeline: &Arc<Timeline>,
|
||||
gc_block: &GcBlock,
|
||||
) -> Result<(), CompactionError> {
|
||||
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
|
||||
let jobs: Vec<GcCompactJob> = timeline
|
||||
.gc_compaction_split_jobs(
|
||||
GcCompactJob::from_compact_options(options.clone()),
|
||||
options.sub_compaction_max_job_size_mb,
|
||||
)
|
||||
.await
|
||||
.map_err(CompactionError::Other)?;
|
||||
if jobs.is_empty() {
|
||||
info!("no jobs to run, skipping scheduled compaction task");
|
||||
self.notify_and_unblock(id);
|
||||
} else {
|
||||
let gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let jobs_len = jobs.len();
|
||||
let mut pending_tasks = Vec::new();
|
||||
for job in jobs {
|
||||
// Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions`
|
||||
// until we do further refactors to allow directly call `compact_with_gc`.
|
||||
let mut flags: EnumSet<CompactFlags> = EnumSet::default();
|
||||
flags |= CompactFlags::EnhancedGcBottomMostCompaction;
|
||||
if job.dry_run {
|
||||
flags |= CompactFlags::DryRun;
|
||||
}
|
||||
let options = CompactOptions {
|
||||
flags,
|
||||
sub_compaction: false,
|
||||
compact_key_range: Some(job.compact_key_range.into()),
|
||||
compact_lsn_range: Some(job.compact_lsn_range.into()),
|
||||
sub_compaction_max_job_size_mb: None,
|
||||
};
|
||||
pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options));
|
||||
}
|
||||
pending_tasks.push(GcCompactionQueueItem::Notify(id));
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.gc_guards.insert(id, gc_guard);
|
||||
let mut tasks = Vec::new();
|
||||
for task in pending_tasks {
|
||||
let id = guard.next_id();
|
||||
tasks.push((id, task));
|
||||
}
|
||||
tasks.reverse();
|
||||
for item in tasks {
|
||||
guard.queued.push_front(item);
|
||||
}
|
||||
}
|
||||
info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Take a job from the queue and process it. Returns if there are still pending tasks.
|
||||
pub async fn iteration(
|
||||
&self,
|
||||
cancel: &CancellationToken,
|
||||
ctx: &RequestContext,
|
||||
gc_block: &GcBlock,
|
||||
timeline: &Arc<Timeline>,
|
||||
) -> Result<bool, CompactionError> {
|
||||
let _one_op_at_a_time_guard = self.consumer_lock.lock().await;
|
||||
let has_pending_tasks;
|
||||
let (id, item) = {
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
let Some((id, item)) = guard.queued.pop_front() else {
|
||||
return Ok(false);
|
||||
};
|
||||
guard.running = Some((id, item.clone()));
|
||||
has_pending_tasks = !guard.queued.is_empty();
|
||||
(id, item)
|
||||
};
|
||||
|
||||
match item {
|
||||
GcCompactionQueueItem::Manual(options) => {
|
||||
if !options
|
||||
.flags
|
||||
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
|
||||
{
|
||||
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", options);
|
||||
} else if options.sub_compaction {
|
||||
self.handle_sub_compaction(id, options, timeline, gc_block)
|
||||
.await?;
|
||||
} else {
|
||||
let gc_guard = match gc_block.start().await {
|
||||
Ok(guard) => guard,
|
||||
Err(e) => {
|
||||
return Err(CompactionError::Other(anyhow!(
|
||||
"cannot run gc-compaction because gc is blocked: {}",
|
||||
e
|
||||
)));
|
||||
}
|
||||
};
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.gc_guards.insert(id, gc_guard);
|
||||
}
|
||||
let _ = timeline
|
||||
.compact_with_options(cancel, options, ctx)
|
||||
.instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
|
||||
.await?;
|
||||
self.notify_and_unblock(id);
|
||||
}
|
||||
}
|
||||
GcCompactionQueueItem::SubCompactionJob(options) => {
|
||||
let _ = timeline
|
||||
.compact_with_options(cancel, options, ctx)
|
||||
.instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id))
|
||||
.await?;
|
||||
}
|
||||
GcCompactionQueueItem::Notify(id) => {
|
||||
self.notify_and_unblock(id);
|
||||
}
|
||||
GcCompactionQueueItem::UpdateL2Lsn(_) => {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
{
|
||||
let mut guard = self.inner.lock().unwrap();
|
||||
guard.running = None;
|
||||
}
|
||||
Ok(has_pending_tasks)
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
pub fn remaining_jobs(
|
||||
&self,
|
||||
) -> (
|
||||
Option<(GcCompactionJobId, GcCompactionQueueItem)>,
|
||||
VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>,
|
||||
) {
|
||||
let guard = self.inner.lock().unwrap();
|
||||
(guard.running.clone(), guard.queued.clone())
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn remaining_jobs_num(&self) -> usize {
|
||||
let guard = self.inner.lock().unwrap();
|
||||
guard.queued.len() + if guard.running.is_some() { 1 } else { 0 }
|
||||
}
|
||||
}
|
||||
|
||||
/// A job description for the gc-compaction job. This structure describes the rectangle range that the job will
|
||||
|
||||
@@ -1,26 +0,0 @@
|
||||
EXTENSION = hnsw
|
||||
EXTVERSION = 0.1.0
|
||||
|
||||
MODULE_big = hnsw
|
||||
DATA = $(wildcard *--*.sql)
|
||||
OBJS = hnsw.o hnswalg.o
|
||||
|
||||
TESTS = $(wildcard test/sql/*.sql)
|
||||
REGRESS = $(patsubst test/sql/%.sql,%,$(TESTS))
|
||||
REGRESS_OPTS = --inputdir=test --load-extension=hnsw
|
||||
|
||||
# For auto-vectorization:
|
||||
# - GCC (needs -ftree-vectorize OR -O3) - https://gcc.gnu.org/projects/tree-ssa/vectorization.html
|
||||
PG_CFLAGS += -O3
|
||||
PG_CXXFLAGS += -O3 -std=c++11
|
||||
PG_LDFLAGS += -lstdc++
|
||||
|
||||
all: $(EXTENSION)--$(EXTVERSION).sql
|
||||
|
||||
PG_CONFIG ?= pg_config
|
||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||
include $(PGXS)
|
||||
|
||||
dist:
|
||||
mkdir -p dist
|
||||
git archive --format zip --prefix=$(EXTENSION)-$(EXTVERSION)/ --output dist/$(EXTENSION)-$(EXTVERSION).zip master
|
||||
@@ -1,25 +0,0 @@
|
||||
# Revisiting the Inverted Indices for Billion-Scale Approximate Nearest Neighbors
|
||||
|
||||
This ANN extension of Postgres is based
|
||||
on [ivf-hnsw](https://github.com/dbaranchuk/ivf-hnsw.git) implementation of [HNSW](https://www.pinecone.io/learn/hnsw),
|
||||
the code for the current state-of-the-art billion-scale nearest neighbor search system presented in the paper:
|
||||
|
||||
[Revisiting the Inverted Indices for Billion-Scale Approximate Nearest Neighbors](http://openaccess.thecvf.com/content_ECCV_2018/html/Dmitry_Baranchuk_Revisiting_the_Inverted_ECCV_2018_paper.html),
|
||||
<br>
|
||||
Dmitry Baranchuk, Artem Babenko, Yury Malkov
|
||||
|
||||
# Postgres extension
|
||||
|
||||
HNSW index is hold in memory (built on demand) and it's maxial size is limited
|
||||
by `maxelements` index parameter. Another required parameter is nubmer of dimensions (if it is not specified in column type).
|
||||
Optional parameter `ef` specifies number of neighbors which are considered during index construction and search (corresponds `efConstruction` and `efSearch` parameters
|
||||
described in the article).
|
||||
|
||||
# Example of usage:
|
||||
|
||||
```
|
||||
create extension hnsw;
|
||||
create table embeddings(id integer primary key, payload real[]);
|
||||
create index on embeddings using hnsw(payload) with (maxelements=1000000, dims=100, m=32);
|
||||
select id from embeddings order by payload <-> array[1.0, 2.0,...] limit 100;
|
||||
```
|
||||
@@ -1,29 +0,0 @@
|
||||
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
|
||||
\echo Use "CREATE EXTENSION hnsw" to load this file. \quit
|
||||
|
||||
-- functions
|
||||
|
||||
CREATE FUNCTION l2_distance(real[], real[]) RETURNS real
|
||||
AS 'MODULE_PATHNAME' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
|
||||
|
||||
-- operators
|
||||
|
||||
CREATE OPERATOR <-> (
|
||||
LEFTARG = real[], RIGHTARG = real[], PROCEDURE = l2_distance,
|
||||
COMMUTATOR = '<->'
|
||||
);
|
||||
|
||||
-- access method
|
||||
|
||||
CREATE FUNCTION hnsw_handler(internal) RETURNS index_am_handler
|
||||
AS 'MODULE_PATHNAME' LANGUAGE C;
|
||||
|
||||
CREATE ACCESS METHOD hnsw TYPE INDEX HANDLER hnsw_handler;
|
||||
|
||||
COMMENT ON ACCESS METHOD hnsw IS 'hnsw index access method';
|
||||
|
||||
-- opclasses
|
||||
|
||||
CREATE OPERATOR CLASS knn_ops
|
||||
DEFAULT FOR TYPE real[] USING hnsw AS
|
||||
OPERATOR 1 <-> (real[], real[]) FOR ORDER BY float_ops;
|
||||
590
pgxn/hnsw/hnsw.c
590
pgxn/hnsw/hnsw.c
@@ -1,590 +0,0 @@
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/amapi.h"
|
||||
#include "access/generic_xlog.h"
|
||||
#include "access/relation.h"
|
||||
#include "access/reloptions.h"
|
||||
#include "access/tableam.h"
|
||||
#include "catalog/index.h"
|
||||
#include "commands/vacuum.h"
|
||||
#include "nodes/execnodes.h"
|
||||
#include "storage/bufmgr.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/selfuncs.h"
|
||||
|
||||
#include <math.h>
|
||||
#include <float.h>
|
||||
|
||||
#include "hnsw.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
|
||||
typedef struct {
|
||||
int32 vl_len_; /* varlena header (do not touch directly!) */
|
||||
int dims;
|
||||
int maxelements;
|
||||
int efConstruction;
|
||||
int efSearch;
|
||||
int M;
|
||||
} HnswOptions;
|
||||
|
||||
static relopt_kind hnsw_relopt_kind;
|
||||
|
||||
typedef struct {
|
||||
HierarchicalNSW* hnsw;
|
||||
size_t curr;
|
||||
size_t n_results;
|
||||
ItemPointer results;
|
||||
} HnswScanOpaqueData;
|
||||
|
||||
typedef HnswScanOpaqueData* HnswScanOpaque;
|
||||
|
||||
typedef struct {
|
||||
Oid relid;
|
||||
uint32 status;
|
||||
HierarchicalNSW* hnsw;
|
||||
} HnswHashEntry;
|
||||
|
||||
|
||||
#define SH_PREFIX hnsw_index
|
||||
#define SH_ELEMENT_TYPE HnswHashEntry
|
||||
#define SH_KEY_TYPE Oid
|
||||
#define SH_KEY relid
|
||||
#define SH_STORE_HASH
|
||||
#define SH_GET_HASH(tb, a) ((a)->relid)
|
||||
#define SH_HASH_KEY(tb, key) (key)
|
||||
#define SH_EQUAL(tb, a, b) ((a) == (b))
|
||||
#define SH_SCOPE static inline
|
||||
#define SH_DEFINE
|
||||
#define SH_DECLARE
|
||||
#include "lib/simplehash.h"
|
||||
|
||||
#define INDEX_HASH_SIZE 11
|
||||
|
||||
#define DEFAULT_EF_SEARCH 64
|
||||
|
||||
PGDLLEXPORT void _PG_init(void);
|
||||
|
||||
static hnsw_index_hash *hnsw_indexes;
|
||||
|
||||
/*
|
||||
* Initialize index options and variables
|
||||
*/
|
||||
void
|
||||
_PG_init(void)
|
||||
{
|
||||
hnsw_relopt_kind = add_reloption_kind();
|
||||
add_int_reloption(hnsw_relopt_kind, "dims", "Number of dimensions",
|
||||
0, 0, INT_MAX, AccessExclusiveLock);
|
||||
add_int_reloption(hnsw_relopt_kind, "maxelements", "Maximal number of elements",
|
||||
0, 0, INT_MAX, AccessExclusiveLock);
|
||||
add_int_reloption(hnsw_relopt_kind, "m", "Number of neighbors of each vertex",
|
||||
100, 0, INT_MAX, AccessExclusiveLock);
|
||||
add_int_reloption(hnsw_relopt_kind, "efconstruction", "Number of inspected neighbors during index construction",
|
||||
16, 1, INT_MAX, AccessExclusiveLock);
|
||||
add_int_reloption(hnsw_relopt_kind, "efsearch", "Number of inspected neighbors during index search",
|
||||
64, 1, INT_MAX, AccessExclusiveLock);
|
||||
hnsw_indexes = hnsw_index_create(TopMemoryContext, INDEX_HASH_SIZE, NULL);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
hnsw_build_callback(Relation index, ItemPointer tid, Datum *values,
|
||||
bool *isnull, bool tupleIsAlive, void *state)
|
||||
{
|
||||
HierarchicalNSW* hnsw = (HierarchicalNSW*) state;
|
||||
ArrayType* array;
|
||||
int n_items;
|
||||
label_t label = 0;
|
||||
|
||||
/* Skip nulls */
|
||||
if (isnull[0])
|
||||
return;
|
||||
|
||||
array = DatumGetArrayTypeP(values[0]);
|
||||
n_items = ArrayGetNItems(ARR_NDIM(array), ARR_DIMS(array));
|
||||
if (n_items != hnsw_dimensions(hnsw))
|
||||
{
|
||||
elog(ERROR, "Wrong number of dimensions: %d instead of %d expected",
|
||||
n_items, hnsw_dimensions(hnsw));
|
||||
}
|
||||
|
||||
memcpy(&label, tid, sizeof(*tid));
|
||||
hnsw_add_point(hnsw, (coord_t*)ARR_DATA_PTR(array), label);
|
||||
}
|
||||
|
||||
static void
|
||||
hnsw_populate(HierarchicalNSW* hnsw, Relation indexRel, Relation heapRel)
|
||||
{
|
||||
IndexInfo* indexInfo = BuildIndexInfo(indexRel);
|
||||
Assert(indexInfo->ii_NumIndexAttrs == 1);
|
||||
table_index_build_scan(heapRel, indexRel, indexInfo,
|
||||
true, true, hnsw_build_callback, (void *) hnsw, NULL);
|
||||
}
|
||||
|
||||
#ifdef __APPLE__
|
||||
|
||||
#include <sys/types.h>
|
||||
#include <sys/sysctl.h>
|
||||
|
||||
static void
|
||||
hnsw_check_available_memory(Size requested)
|
||||
{
|
||||
size_t total;
|
||||
if (sysctlbyname("hw.memsize", NULL, &total, NULL, 0) < 0)
|
||||
elog(ERROR, "Failed to get amount of RAM: %m");
|
||||
|
||||
if ((Size)NBuffers*BLCKSZ + requested >= total)
|
||||
elog(ERROR, "HNSW index requeries %ld bytes while only %ld are available",
|
||||
requested, total - (Size)NBuffers*BLCKSZ);
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
#include <sys/sysinfo.h>
|
||||
|
||||
static void
|
||||
hnsw_check_available_memory(Size requested)
|
||||
{
|
||||
struct sysinfo si;
|
||||
Size total;
|
||||
if (sysinfo(&si) < 0)
|
||||
elog(ERROR, "Failed to get amount of RAM: %m");
|
||||
|
||||
total = si.totalram*si.mem_unit;
|
||||
if ((Size)NBuffers*BLCKSZ + requested >= total)
|
||||
elog(ERROR, "HNSW index requeries %ld bytes while only %ld are available",
|
||||
requested, total - (Size)NBuffers*BLCKSZ);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
static HierarchicalNSW*
|
||||
hnsw_get_index(Relation indexRel, Relation heapRel)
|
||||
{
|
||||
HierarchicalNSW* hnsw;
|
||||
Oid indexoid = RelationGetRelid(indexRel);
|
||||
HnswHashEntry* entry = hnsw_index_lookup(hnsw_indexes, indexoid);
|
||||
if (entry == NULL)
|
||||
{
|
||||
size_t dims, maxelements;
|
||||
size_t M;
|
||||
size_t maxM;
|
||||
size_t size_links_level0;
|
||||
size_t size_data_per_element;
|
||||
size_t data_size;
|
||||
dsm_handle handle = indexoid << 1; /* make it even */
|
||||
void* impl_private = NULL;
|
||||
void* mapped_address = NULL;
|
||||
Size mapped_size = 0;
|
||||
Size shmem_size;
|
||||
bool exists = true;
|
||||
bool found;
|
||||
HnswOptions *opts = (HnswOptions *) indexRel->rd_options;
|
||||
if (opts == NULL || opts->maxelements == 0 || opts->dims == 0) {
|
||||
elog(ERROR, "HNSW index requires 'maxelements' and 'dims' to be specified");
|
||||
}
|
||||
dims = opts->dims;
|
||||
maxelements = opts->maxelements;
|
||||
M = opts->M;
|
||||
maxM = M * 2;
|
||||
data_size = dims * sizeof(coord_t);
|
||||
size_links_level0 = (maxM + 1) * sizeof(idx_t);
|
||||
size_data_per_element = size_links_level0 + data_size + sizeof(label_t);
|
||||
shmem_size = hnsw_sizeof() + maxelements * size_data_per_element;
|
||||
|
||||
hnsw_check_available_memory(shmem_size);
|
||||
|
||||
/* first try to attach to existed index */
|
||||
if (!dsm_impl_op(DSM_OP_ATTACH, handle, 0, &impl_private,
|
||||
&mapped_address, &mapped_size, DEBUG1))
|
||||
{
|
||||
/* index doesn't exists: try to create it */
|
||||
if (!dsm_impl_op(DSM_OP_CREATE, handle, shmem_size, &impl_private,
|
||||
&mapped_address, &mapped_size, DEBUG1))
|
||||
{
|
||||
/* We can do it under shared lock, so some other backend may
|
||||
* try to initialize index. If create is failed because index already
|
||||
* created by somebody else, then try to attach to it once again
|
||||
*/
|
||||
if (!dsm_impl_op(DSM_OP_ATTACH, handle, 0, &impl_private,
|
||||
&mapped_address, &mapped_size, ERROR))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
exists = false;
|
||||
}
|
||||
}
|
||||
Assert(mapped_size == shmem_size);
|
||||
hnsw = (HierarchicalNSW*)mapped_address;
|
||||
|
||||
if (!exists)
|
||||
{
|
||||
hnsw_init(hnsw, dims, maxelements, M, maxM, opts->efConstruction);
|
||||
hnsw_populate(hnsw, indexRel, heapRel);
|
||||
}
|
||||
entry = hnsw_index_insert(hnsw_indexes, indexoid, &found);
|
||||
Assert(!found);
|
||||
entry->hnsw = hnsw;
|
||||
}
|
||||
else
|
||||
{
|
||||
hnsw = entry->hnsw;
|
||||
}
|
||||
return hnsw;
|
||||
}
|
||||
|
||||
/*
|
||||
* Start or restart an index scan
|
||||
*/
|
||||
static IndexScanDesc
|
||||
hnsw_beginscan(Relation index, int nkeys, int norderbys)
|
||||
{
|
||||
IndexScanDesc scan = RelationGetIndexScan(index, nkeys, norderbys);
|
||||
HnswScanOpaque so = (HnswScanOpaque) palloc(sizeof(HnswScanOpaqueData));
|
||||
Relation heap = relation_open(index->rd_index->indrelid, NoLock);
|
||||
so->hnsw = hnsw_get_index(index, heap);
|
||||
relation_close(heap, NoLock);
|
||||
so->curr = 0;
|
||||
so->n_results = 0;
|
||||
so->results = NULL;
|
||||
scan->opaque = so;
|
||||
return scan;
|
||||
}
|
||||
|
||||
/*
|
||||
* Start or restart an index scan
|
||||
*/
|
||||
static void
|
||||
hnsw_rescan(IndexScanDesc scan, ScanKey keys, int nkeys, ScanKey orderbys, int norderbys)
|
||||
{
|
||||
HnswScanOpaque so = (HnswScanOpaque) scan->opaque;
|
||||
if (so->results)
|
||||
{
|
||||
pfree(so->results);
|
||||
so->results = NULL;
|
||||
}
|
||||
so->curr = 0;
|
||||
if (orderbys && scan->numberOfOrderBys > 0)
|
||||
memmove(scan->orderByData, orderbys, scan->numberOfOrderBys * sizeof(ScanKeyData));
|
||||
}
|
||||
|
||||
/*
|
||||
* Fetch the next tuple in the given scan
|
||||
*/
|
||||
static bool
|
||||
hnsw_gettuple(IndexScanDesc scan, ScanDirection dir)
|
||||
{
|
||||
HnswScanOpaque so = (HnswScanOpaque) scan->opaque;
|
||||
|
||||
/*
|
||||
* Index can be used to scan backward, but Postgres doesn't support
|
||||
* backward scan on operators
|
||||
*/
|
||||
Assert(ScanDirectionIsForward(dir));
|
||||
|
||||
if (so->curr == 0)
|
||||
{
|
||||
Datum value;
|
||||
ArrayType* array;
|
||||
int n_items;
|
||||
size_t n_results;
|
||||
label_t* results;
|
||||
HnswOptions *opts = (HnswOptions *) scan->indexRelation->rd_options;
|
||||
size_t efSearch = opts ? opts->efSearch : DEFAULT_EF_SEARCH;
|
||||
|
||||
/* Safety check */
|
||||
if (scan->orderByData == NULL)
|
||||
elog(ERROR, "cannot scan HNSW index without order");
|
||||
|
||||
/* No items will match if null */
|
||||
if (scan->orderByData->sk_flags & SK_ISNULL)
|
||||
return false;
|
||||
|
||||
value = scan->orderByData->sk_argument;
|
||||
array = DatumGetArrayTypeP(value);
|
||||
n_items = ArrayGetNItems(ARR_NDIM(array), ARR_DIMS(array));
|
||||
if (n_items != hnsw_dimensions(so->hnsw))
|
||||
{
|
||||
elog(ERROR, "Wrong number of dimensions: %d instead of %d expected",
|
||||
n_items, hnsw_dimensions(so->hnsw));
|
||||
}
|
||||
|
||||
if (!hnsw_search(so->hnsw, (coord_t*)ARR_DATA_PTR(array), efSearch, &n_results, &results))
|
||||
elog(ERROR, "HNSW index search failed");
|
||||
so->results = (ItemPointer)palloc(n_results*sizeof(ItemPointerData));
|
||||
so->n_results = n_results;
|
||||
for (size_t i = 0; i < n_results; i++)
|
||||
{
|
||||
memcpy(&so->results[i], &results[i], sizeof(so->results[i]));
|
||||
}
|
||||
free(results);
|
||||
}
|
||||
if (so->curr >= so->n_results)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
scan->xs_heaptid = so->results[so->curr++];
|
||||
scan->xs_recheckorderby = false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* End a scan and release resources
|
||||
*/
|
||||
static void
|
||||
hnsw_endscan(IndexScanDesc scan)
|
||||
{
|
||||
HnswScanOpaque so = (HnswScanOpaque) scan->opaque;
|
||||
if (so->results)
|
||||
pfree(so->results);
|
||||
pfree(so);
|
||||
scan->opaque = NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Estimate the cost of an index scan
|
||||
*/
|
||||
static void
|
||||
hnsw_costestimate(PlannerInfo *root, IndexPath *path, double loop_count,
|
||||
Cost *indexStartupCost, Cost *indexTotalCost,
|
||||
Selectivity *indexSelectivity, double *indexCorrelation
|
||||
,double *indexPages
|
||||
)
|
||||
{
|
||||
GenericCosts costs;
|
||||
|
||||
/* Never use index without order */
|
||||
if (path->indexorderbys == NULL)
|
||||
{
|
||||
*indexStartupCost = DBL_MAX;
|
||||
*indexTotalCost = DBL_MAX;
|
||||
*indexSelectivity = 0;
|
||||
*indexCorrelation = 0;
|
||||
*indexPages = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
MemSet(&costs, 0, sizeof(costs));
|
||||
|
||||
genericcostestimate(root, path, loop_count, &costs);
|
||||
|
||||
/* Startup cost and total cost are same */
|
||||
*indexStartupCost = costs.indexTotalCost;
|
||||
*indexTotalCost = costs.indexTotalCost;
|
||||
*indexSelectivity = costs.indexSelectivity;
|
||||
*indexCorrelation = costs.indexCorrelation;
|
||||
*indexPages = costs.numIndexPages;
|
||||
}
|
||||
|
||||
/*
|
||||
* Parse and validate the reloptions
|
||||
*/
|
||||
static bytea *
|
||||
hnsw_options(Datum reloptions, bool validate)
|
||||
{
|
||||
static const relopt_parse_elt tab[] = {
|
||||
{"dims", RELOPT_TYPE_INT, offsetof(HnswOptions, dims)},
|
||||
{"maxelements", RELOPT_TYPE_INT, offsetof(HnswOptions, maxelements)},
|
||||
{"efconstruction", RELOPT_TYPE_INT, offsetof(HnswOptions, efConstruction)},
|
||||
{"efsearch", RELOPT_TYPE_INT, offsetof(HnswOptions, efSearch)},
|
||||
{"m", RELOPT_TYPE_INT, offsetof(HnswOptions, M)}
|
||||
};
|
||||
|
||||
return (bytea *) build_reloptions(reloptions, validate,
|
||||
hnsw_relopt_kind,
|
||||
sizeof(HnswOptions),
|
||||
tab, lengthof(tab));
|
||||
}
|
||||
|
||||
/*
|
||||
* Validate catalog entries for the specified operator class
|
||||
*/
|
||||
static bool
|
||||
hnsw_validate(Oid opclassoid)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Build the index for a logged table
|
||||
*/
|
||||
static IndexBuildResult *
|
||||
hnsw_build(Relation heap, Relation index, IndexInfo *indexInfo)
|
||||
{
|
||||
HierarchicalNSW* hnsw = hnsw_get_index(index, heap);
|
||||
IndexBuildResult* result = (IndexBuildResult *) palloc(sizeof(IndexBuildResult));
|
||||
result->heap_tuples = result->index_tuples = hnsw_count(hnsw);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/*
|
||||
* Insert a tuple into the index
|
||||
*/
|
||||
static bool
|
||||
hnsw_insert(Relation index, Datum *values, bool *isnull, ItemPointer heap_tid,
|
||||
Relation heap, IndexUniqueCheck checkUnique,
|
||||
bool indexUnchanged,
|
||||
IndexInfo *indexInfo)
|
||||
{
|
||||
HierarchicalNSW* hnsw = hnsw_get_index(index, heap);
|
||||
Datum value;
|
||||
ArrayType* array;
|
||||
int n_items;
|
||||
label_t label = 0;
|
||||
|
||||
/* Skip nulls */
|
||||
if (isnull[0])
|
||||
return false;
|
||||
|
||||
/* Detoast value */
|
||||
value = PointerGetDatum(PG_DETOAST_DATUM(values[0]));
|
||||
array = DatumGetArrayTypeP(value);
|
||||
n_items = ArrayGetNItems(ARR_NDIM(array), ARR_DIMS(array));
|
||||
if (n_items != hnsw_dimensions(hnsw))
|
||||
{
|
||||
elog(ERROR, "Wrong number of dimensions: %d instead of %d expected",
|
||||
n_items, hnsw_dimensions(hnsw));
|
||||
}
|
||||
memcpy(&label, heap_tid, sizeof(*heap_tid));
|
||||
if (!hnsw_add_point(hnsw, (coord_t*)ARR_DATA_PTR(array), label))
|
||||
elog(ERROR, "HNSW index insert failed");
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Build the index for an unlogged table
|
||||
*/
|
||||
static void
|
||||
hnsw_buildempty(Relation index)
|
||||
{
|
||||
/* index will be constructed on dema nd when accessed */
|
||||
}
|
||||
|
||||
/*
|
||||
* Clean up after a VACUUM operation
|
||||
*/
|
||||
static IndexBulkDeleteResult *
|
||||
hnsw_vacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
|
||||
{
|
||||
Relation rel = info->index;
|
||||
|
||||
if (stats == NULL)
|
||||
return NULL;
|
||||
|
||||
stats->num_pages = RelationGetNumberOfBlocks(rel);
|
||||
|
||||
return stats;
|
||||
}
|
||||
|
||||
/*
|
||||
* Bulk delete tuples from the index
|
||||
*/
|
||||
static IndexBulkDeleteResult *
|
||||
hnsw_bulkdelete(IndexVacuumInfo *info, IndexBulkDeleteResult *stats,
|
||||
IndexBulkDeleteCallback callback, void *callback_state)
|
||||
{
|
||||
if (stats == NULL)
|
||||
stats = (IndexBulkDeleteResult *) palloc0(sizeof(IndexBulkDeleteResult));
|
||||
return stats;
|
||||
}
|
||||
|
||||
/*
|
||||
* Define index handler
|
||||
*
|
||||
* See https://www.postgresql.org/docs/current/index-api.html
|
||||
*/
|
||||
PGDLLEXPORT PG_FUNCTION_INFO_V1(hnsw_handler);
|
||||
Datum
|
||||
hnsw_handler(PG_FUNCTION_ARGS)
|
||||
{
|
||||
IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
|
||||
|
||||
amroutine->amstrategies = 0;
|
||||
amroutine->amsupport = 0;
|
||||
amroutine->amoptsprocnum = 0;
|
||||
amroutine->amcanorder = false;
|
||||
amroutine->amcanorderbyop = true;
|
||||
amroutine->amcanbackward = false; /* can change direction mid-scan */
|
||||
amroutine->amcanunique = false;
|
||||
amroutine->amcanmulticol = false;
|
||||
amroutine->amoptionalkey = true;
|
||||
amroutine->amsearcharray = false;
|
||||
amroutine->amsearchnulls = false;
|
||||
amroutine->amstorage = false;
|
||||
amroutine->amclusterable = false;
|
||||
amroutine->ampredlocks = false;
|
||||
amroutine->amcanparallel = false;
|
||||
amroutine->amcaninclude = false;
|
||||
amroutine->amusemaintenanceworkmem = false; /* not used during VACUUM */
|
||||
amroutine->amparallelvacuumoptions = VACUUM_OPTION_PARALLEL_BULKDEL;
|
||||
amroutine->amkeytype = InvalidOid;
|
||||
|
||||
/* Interface functions */
|
||||
amroutine->ambuild = hnsw_build;
|
||||
amroutine->ambuildempty = hnsw_buildempty;
|
||||
amroutine->aminsert = hnsw_insert;
|
||||
amroutine->ambulkdelete = hnsw_bulkdelete;
|
||||
amroutine->amvacuumcleanup = hnsw_vacuumcleanup;
|
||||
amroutine->amcanreturn = NULL; /* tuple not included in heapsort */
|
||||
amroutine->amcostestimate = hnsw_costestimate;
|
||||
amroutine->amoptions = hnsw_options;
|
||||
amroutine->amproperty = NULL; /* TODO AMPROP_DISTANCE_ORDERABLE */
|
||||
amroutine->ambuildphasename = NULL;
|
||||
amroutine->amvalidate = hnsw_validate;
|
||||
amroutine->amadjustmembers = NULL;
|
||||
amroutine->ambeginscan = hnsw_beginscan;
|
||||
amroutine->amrescan = hnsw_rescan;
|
||||
amroutine->amgettuple = hnsw_gettuple;
|
||||
amroutine->amgetbitmap = NULL;
|
||||
amroutine->amendscan = hnsw_endscan;
|
||||
amroutine->ammarkpos = NULL;
|
||||
amroutine->amrestrpos = NULL;
|
||||
|
||||
/* Interface functions to support parallel index scans */
|
||||
amroutine->amestimateparallelscan = NULL;
|
||||
amroutine->aminitparallelscan = NULL;
|
||||
amroutine->amparallelrescan = NULL;
|
||||
|
||||
PG_RETURN_POINTER(amroutine);
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the L2 distance between vectors
|
||||
*/
|
||||
PGDLLEXPORT PG_FUNCTION_INFO_V1(l2_distance);
|
||||
Datum
|
||||
l2_distance(PG_FUNCTION_ARGS)
|
||||
{
|
||||
ArrayType *a = PG_GETARG_ARRAYTYPE_P(0);
|
||||
ArrayType *b = PG_GETARG_ARRAYTYPE_P(1);
|
||||
int a_dim = ArrayGetNItems(ARR_NDIM(a), ARR_DIMS(a));
|
||||
int b_dim = ArrayGetNItems(ARR_NDIM(b), ARR_DIMS(b));
|
||||
dist_t distance = 0.0;
|
||||
dist_t diff;
|
||||
coord_t *ax = (coord_t*)ARR_DATA_PTR(a);
|
||||
coord_t *bx = (coord_t*)ARR_DATA_PTR(b);
|
||||
|
||||
if (a_dim != b_dim)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_DATA_EXCEPTION),
|
||||
errmsg("different array dimensions %d and %d", a_dim, b_dim)));
|
||||
}
|
||||
|
||||
for (int i = 0; i < a_dim; i++)
|
||||
{
|
||||
diff = ax[i] - bx[i];
|
||||
distance += diff * diff;
|
||||
}
|
||||
|
||||
PG_RETURN_FLOAT4((dist_t)sqrt(distance));
|
||||
}
|
||||
@@ -1,4 +0,0 @@
|
||||
comment = '** Deprecated ** Please use pg_embedding instead'
|
||||
default_version = '0.1.0'
|
||||
module_pathname = '$libdir/hnsw'
|
||||
relocatable = true
|
||||
@@ -1,15 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
typedef float coord_t;
|
||||
typedef float dist_t;
|
||||
typedef uint32_t idx_t;
|
||||
typedef uint64_t label_t;
|
||||
|
||||
typedef struct HierarchicalNSW HierarchicalNSW;
|
||||
|
||||
bool hnsw_search(HierarchicalNSW* hnsw, const coord_t *point, size_t efSearch, size_t* n_results, label_t** results);
|
||||
bool hnsw_add_point(HierarchicalNSW* hnsw, const coord_t *point, label_t label);
|
||||
void hnsw_init(HierarchicalNSW* hnsw, size_t dim, size_t maxelements, size_t M, size_t maxM, size_t efConstruction);
|
||||
int hnsw_dimensions(HierarchicalNSW* hnsw);
|
||||
size_t hnsw_count(HierarchicalNSW* hnsw);
|
||||
size_t hnsw_sizeof(void);
|
||||
@@ -1,379 +0,0 @@
|
||||
#include "hnswalg.h"
|
||||
|
||||
#if defined(__GNUC__)
|
||||
#define PORTABLE_ALIGN32 __attribute__((aligned(32)))
|
||||
#define PREFETCH(addr,hint) __builtin_prefetch(addr, 0, hint)
|
||||
#else
|
||||
#define PORTABLE_ALIGN32 __declspec(align(32))
|
||||
#define PREFETCH(addr,hint)
|
||||
#endif
|
||||
|
||||
HierarchicalNSW::HierarchicalNSW(size_t dim_, size_t maxelements_, size_t M_, size_t maxM_, size_t efConstruction_)
|
||||
{
|
||||
dim = dim_;
|
||||
data_size = dim * sizeof(coord_t);
|
||||
|
||||
efConstruction = efConstruction_;
|
||||
|
||||
maxelements = maxelements_;
|
||||
M = M_;
|
||||
maxM = maxM_;
|
||||
size_links_level0 = (maxM + 1) * sizeof(idx_t);
|
||||
size_data_per_element = size_links_level0 + data_size + sizeof(label_t);
|
||||
offset_data = size_links_level0;
|
||||
offset_label = offset_data + data_size;
|
||||
|
||||
enterpoint_node = 0;
|
||||
cur_element_count = 0;
|
||||
#ifdef __x86_64__
|
||||
use_avx2 = __builtin_cpu_supports("avx2");
|
||||
#endif
|
||||
}
|
||||
|
||||
std::priority_queue<std::pair<dist_t, idx_t>> HierarchicalNSW::searchBaseLayer(const coord_t *point, size_t ef)
|
||||
{
|
||||
std::vector<uint32_t> visited;
|
||||
visited.resize((cur_element_count + 31) >> 5);
|
||||
|
||||
std::priority_queue<std::pair<dist_t, idx_t >> topResults;
|
||||
std::priority_queue<std::pair<dist_t, idx_t >> candidateSet;
|
||||
|
||||
dist_t dist = fstdistfunc(point, getDataByInternalId(enterpoint_node));
|
||||
|
||||
topResults.emplace(dist, enterpoint_node);
|
||||
candidateSet.emplace(-dist, enterpoint_node);
|
||||
visited[enterpoint_node >> 5] = 1 << (enterpoint_node & 31);
|
||||
dist_t lowerBound = dist;
|
||||
|
||||
while (!candidateSet.empty())
|
||||
{
|
||||
std::pair<dist_t, idx_t> curr_el_pair = candidateSet.top();
|
||||
if (-curr_el_pair.first > lowerBound)
|
||||
break;
|
||||
|
||||
candidateSet.pop();
|
||||
idx_t curNodeNum = curr_el_pair.second;
|
||||
|
||||
idx_t* data = get_linklist0(curNodeNum);
|
||||
size_t size = *data++;
|
||||
|
||||
PREFETCH(getDataByInternalId(*data), 0);
|
||||
|
||||
for (size_t j = 0; j < size; ++j) {
|
||||
size_t tnum = *(data + j);
|
||||
|
||||
PREFETCH(getDataByInternalId(*(data + j + 1)), 0);
|
||||
|
||||
if (!(visited[tnum >> 5] & (1 << (tnum & 31)))) {
|
||||
visited[tnum >> 5] |= 1 << (tnum & 31);
|
||||
|
||||
dist = fstdistfunc(point, getDataByInternalId(tnum));
|
||||
|
||||
if (topResults.top().first > dist || topResults.size() < ef) {
|
||||
candidateSet.emplace(-dist, tnum);
|
||||
|
||||
PREFETCH(get_linklist0(candidateSet.top().second), 0);
|
||||
topResults.emplace(dist, tnum);
|
||||
|
||||
if (topResults.size() > ef)
|
||||
topResults.pop();
|
||||
|
||||
lowerBound = topResults.top().first;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return topResults;
|
||||
}
|
||||
|
||||
|
||||
void HierarchicalNSW::getNeighborsByHeuristic(std::priority_queue<std::pair<dist_t, idx_t>> &topResults, size_t NN)
|
||||
{
|
||||
if (topResults.size() < NN)
|
||||
return;
|
||||
|
||||
std::priority_queue<std::pair<dist_t, idx_t>> resultSet;
|
||||
std::vector<std::pair<dist_t, idx_t>> returnlist;
|
||||
|
||||
while (topResults.size() > 0) {
|
||||
resultSet.emplace(-topResults.top().first, topResults.top().second);
|
||||
topResults.pop();
|
||||
}
|
||||
|
||||
while (resultSet.size()) {
|
||||
if (returnlist.size() >= NN)
|
||||
break;
|
||||
std::pair<dist_t, idx_t> curen = resultSet.top();
|
||||
dist_t dist_to_query = -curen.first;
|
||||
resultSet.pop();
|
||||
bool good = true;
|
||||
for (std::pair<dist_t, idx_t> curen2 : returnlist) {
|
||||
dist_t curdist = fstdistfunc(getDataByInternalId(curen2.second),
|
||||
getDataByInternalId(curen.second));
|
||||
if (curdist < dist_to_query) {
|
||||
good = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (good) returnlist.push_back(curen);
|
||||
}
|
||||
for (std::pair<dist_t, idx_t> elem : returnlist)
|
||||
topResults.emplace(-elem.first, elem.second);
|
||||
}
|
||||
|
||||
void HierarchicalNSW::mutuallyConnectNewElement(const coord_t *point, idx_t cur_c,
|
||||
std::priority_queue<std::pair<dist_t, idx_t>> topResults)
|
||||
{
|
||||
getNeighborsByHeuristic(topResults, M);
|
||||
|
||||
std::vector<idx_t> res;
|
||||
res.reserve(M);
|
||||
while (topResults.size() > 0) {
|
||||
res.push_back(topResults.top().second);
|
||||
topResults.pop();
|
||||
}
|
||||
{
|
||||
idx_t* data = get_linklist0(cur_c);
|
||||
if (*data)
|
||||
throw std::runtime_error("Should be blank");
|
||||
|
||||
*data++ = res.size();
|
||||
|
||||
for (size_t idx = 0; idx < res.size(); idx++) {
|
||||
if (data[idx])
|
||||
throw std::runtime_error("Should be blank");
|
||||
data[idx] = res[idx];
|
||||
}
|
||||
}
|
||||
for (size_t idx = 0; idx < res.size(); idx++) {
|
||||
if (res[idx] == cur_c)
|
||||
throw std::runtime_error("Connection to the same element");
|
||||
|
||||
size_t resMmax = maxM;
|
||||
idx_t *ll_other = get_linklist0(res[idx]);
|
||||
idx_t sz_link_list_other = *ll_other;
|
||||
|
||||
if (sz_link_list_other > resMmax || sz_link_list_other < 0)
|
||||
throw std::runtime_error("Bad sz_link_list_other");
|
||||
|
||||
if (sz_link_list_other < resMmax) {
|
||||
idx_t *data = ll_other + 1;
|
||||
data[sz_link_list_other] = cur_c;
|
||||
*ll_other = sz_link_list_other + 1;
|
||||
} else {
|
||||
// finding the "weakest" element to replace it with the new one
|
||||
idx_t *data = ll_other + 1;
|
||||
dist_t d_max = fstdistfunc(getDataByInternalId(cur_c), getDataByInternalId(res[idx]));
|
||||
// Heuristic:
|
||||
std::priority_queue<std::pair<dist_t, idx_t>> candidates;
|
||||
candidates.emplace(d_max, cur_c);
|
||||
|
||||
for (size_t j = 0; j < sz_link_list_other; j++)
|
||||
candidates.emplace(fstdistfunc(getDataByInternalId(data[j]), getDataByInternalId(res[idx])), data[j]);
|
||||
|
||||
getNeighborsByHeuristic(candidates, resMmax);
|
||||
|
||||
size_t indx = 0;
|
||||
while (!candidates.empty()) {
|
||||
data[indx] = candidates.top().second;
|
||||
candidates.pop();
|
||||
indx++;
|
||||
}
|
||||
*ll_other = indx;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void HierarchicalNSW::addPoint(const coord_t *point, label_t label)
|
||||
{
|
||||
if (cur_element_count >= maxelements) {
|
||||
throw std::runtime_error("The number of elements exceeds the specified limit");
|
||||
}
|
||||
idx_t cur_c = cur_element_count++;
|
||||
memset((char *) get_linklist0(cur_c), 0, size_data_per_element);
|
||||
memcpy(getDataByInternalId(cur_c), point, data_size);
|
||||
memcpy(getExternalLabel(cur_c), &label, sizeof label);
|
||||
|
||||
// Do nothing for the first element
|
||||
if (cur_c != 0) {
|
||||
std::priority_queue <std::pair<dist_t, idx_t>> topResults = searchBaseLayer(point, efConstruction);
|
||||
mutuallyConnectNewElement(point, cur_c, topResults);
|
||||
}
|
||||
};
|
||||
|
||||
std::priority_queue<std::pair<dist_t, label_t>> HierarchicalNSW::searchKnn(const coord_t *query, size_t k)
|
||||
{
|
||||
std::priority_queue<std::pair<dist_t, label_t>> topResults;
|
||||
auto topCandidates = searchBaseLayer(query, k);
|
||||
while (topCandidates.size() > k) {
|
||||
topCandidates.pop();
|
||||
}
|
||||
while (!topCandidates.empty()) {
|
||||
std::pair<dist_t, idx_t> rez = topCandidates.top();
|
||||
label_t label;
|
||||
memcpy(&label, getExternalLabel(rez.second), sizeof(label));
|
||||
topResults.push(std::pair<dist_t, label_t>(rez.first, label));
|
||||
topCandidates.pop();
|
||||
}
|
||||
|
||||
return topResults;
|
||||
};
|
||||
|
||||
dist_t fstdistfunc_scalar(const coord_t *x, const coord_t *y, size_t n)
|
||||
{
|
||||
dist_t distance = 0.0;
|
||||
|
||||
for (size_t i = 0; i < n; i++)
|
||||
{
|
||||
dist_t diff = x[i] - y[i];
|
||||
distance += diff * diff;
|
||||
}
|
||||
return distance;
|
||||
|
||||
}
|
||||
|
||||
#ifdef __x86_64__
|
||||
#include <immintrin.h>
|
||||
|
||||
__attribute__((target("avx2")))
|
||||
dist_t fstdistfunc_avx2(const coord_t *x, const coord_t *y, size_t n)
|
||||
{
|
||||
const size_t TmpResSz = sizeof(__m256) / sizeof(float);
|
||||
float PORTABLE_ALIGN32 TmpRes[TmpResSz];
|
||||
size_t qty16 = n / 16;
|
||||
const float *pEnd1 = x + (qty16 * 16);
|
||||
__m256 diff, v1, v2;
|
||||
__m256 sum = _mm256_set1_ps(0);
|
||||
|
||||
while (x < pEnd1) {
|
||||
v1 = _mm256_loadu_ps(x);
|
||||
x += 8;
|
||||
v2 = _mm256_loadu_ps(y);
|
||||
y += 8;
|
||||
diff = _mm256_sub_ps(v1, v2);
|
||||
sum = _mm256_add_ps(sum, _mm256_mul_ps(diff, diff));
|
||||
|
||||
v1 = _mm256_loadu_ps(x);
|
||||
x += 8;
|
||||
v2 = _mm256_loadu_ps(y);
|
||||
y += 8;
|
||||
diff = _mm256_sub_ps(v1, v2);
|
||||
sum = _mm256_add_ps(sum, _mm256_mul_ps(diff, diff));
|
||||
}
|
||||
_mm256_store_ps(TmpRes, sum);
|
||||
float res = TmpRes[0] + TmpRes[1] + TmpRes[2] + TmpRes[3] + TmpRes[4] + TmpRes[5] + TmpRes[6] + TmpRes[7];
|
||||
return (res);
|
||||
}
|
||||
|
||||
dist_t fstdistfunc_sse(const coord_t *x, const coord_t *y, size_t n)
|
||||
{
|
||||
const size_t TmpResSz = sizeof(__m128) / sizeof(float);
|
||||
float PORTABLE_ALIGN32 TmpRes[TmpResSz];
|
||||
size_t qty16 = n / 16;
|
||||
const float *pEnd1 = x + (qty16 * 16);
|
||||
|
||||
__m128 diff, v1, v2;
|
||||
__m128 sum = _mm_set1_ps(0);
|
||||
|
||||
while (x < pEnd1) {
|
||||
v1 = _mm_loadu_ps(x);
|
||||
x += 4;
|
||||
v2 = _mm_loadu_ps(y);
|
||||
y += 4;
|
||||
diff = _mm_sub_ps(v1, v2);
|
||||
sum = _mm_add_ps(sum, _mm_mul_ps(diff, diff));
|
||||
|
||||
v1 = _mm_loadu_ps(x);
|
||||
x += 4;
|
||||
v2 = _mm_loadu_ps(y);
|
||||
y += 4;
|
||||
diff = _mm_sub_ps(v1, v2);
|
||||
sum = _mm_add_ps(sum, _mm_mul_ps(diff, diff));
|
||||
|
||||
v1 = _mm_loadu_ps(x);
|
||||
x += 4;
|
||||
v2 = _mm_loadu_ps(y);
|
||||
y += 4;
|
||||
diff = _mm_sub_ps(v1, v2);
|
||||
sum = _mm_add_ps(sum, _mm_mul_ps(diff, diff));
|
||||
|
||||
v1 = _mm_loadu_ps(x);
|
||||
x += 4;
|
||||
v2 = _mm_loadu_ps(y);
|
||||
y += 4;
|
||||
diff = _mm_sub_ps(v1, v2);
|
||||
sum = _mm_add_ps(sum, _mm_mul_ps(diff, diff));
|
||||
}
|
||||
_mm_store_ps(TmpRes, sum);
|
||||
float res = TmpRes[0] + TmpRes[1] + TmpRes[2] + TmpRes[3];
|
||||
return res;
|
||||
}
|
||||
#endif
|
||||
|
||||
dist_t HierarchicalNSW::fstdistfunc(const coord_t *x, const coord_t *y)
|
||||
{
|
||||
#ifndef __x86_64__
|
||||
return fstdistfunc_scalar(x, y, dim);
|
||||
#else
|
||||
if(use_avx2)
|
||||
return fstdistfunc_avx2(x, y, dim);
|
||||
|
||||
return fstdistfunc_sse(x, y, dim);
|
||||
#endif
|
||||
}
|
||||
|
||||
bool hnsw_search(HierarchicalNSW* hnsw, const coord_t *point, size_t efSearch, size_t* n_results, label_t** results)
|
||||
{
|
||||
try
|
||||
{
|
||||
auto result = hnsw->searchKnn(point, efSearch);
|
||||
size_t nResults = result.size();
|
||||
*results = (label_t*)malloc(nResults*sizeof(label_t));
|
||||
for (size_t i = nResults; i-- != 0;)
|
||||
{
|
||||
(*results)[i] = result.top().second;
|
||||
result.pop();
|
||||
}
|
||||
*n_results = nResults;
|
||||
return true;
|
||||
}
|
||||
catch (std::exception& x)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
bool hnsw_add_point(HierarchicalNSW* hnsw, const coord_t *point, label_t label)
|
||||
{
|
||||
try
|
||||
{
|
||||
hnsw->addPoint(point, label);
|
||||
return true;
|
||||
}
|
||||
catch (std::exception& x)
|
||||
{
|
||||
fprintf(stderr, "Catch %s\n", x.what());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void hnsw_init(HierarchicalNSW* hnsw, size_t dims, size_t maxelements, size_t M, size_t maxM, size_t efConstruction)
|
||||
{
|
||||
new ((void*)hnsw) HierarchicalNSW(dims, maxelements, M, maxM, efConstruction);
|
||||
}
|
||||
|
||||
|
||||
int hnsw_dimensions(HierarchicalNSW* hnsw)
|
||||
{
|
||||
return (int)hnsw->dim;
|
||||
}
|
||||
|
||||
size_t hnsw_count(HierarchicalNSW* hnsw)
|
||||
{
|
||||
return hnsw->cur_element_count;
|
||||
}
|
||||
|
||||
size_t hnsw_sizeof(void)
|
||||
{
|
||||
return sizeof(HierarchicalNSW);
|
||||
}
|
||||
@@ -1,69 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <stdint.h>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <map>
|
||||
#include <cmath>
|
||||
#include <queue>
|
||||
#include <stdexcept>
|
||||
|
||||
extern "C" {
|
||||
#include "hnsw.h"
|
||||
}
|
||||
|
||||
struct HierarchicalNSW
|
||||
{
|
||||
size_t maxelements;
|
||||
size_t cur_element_count;
|
||||
|
||||
idx_t enterpoint_node;
|
||||
|
||||
size_t dim;
|
||||
size_t data_size;
|
||||
size_t offset_data;
|
||||
size_t offset_label;
|
||||
size_t size_data_per_element;
|
||||
size_t M;
|
||||
size_t maxM;
|
||||
size_t size_links_level0;
|
||||
size_t efConstruction;
|
||||
|
||||
#ifdef __x86_64__
|
||||
bool use_avx2;
|
||||
#endif
|
||||
|
||||
char data_level0_memory[0]; // varying size
|
||||
|
||||
public:
|
||||
HierarchicalNSW(size_t dim, size_t maxelements, size_t M, size_t maxM, size_t efConstruction);
|
||||
~HierarchicalNSW();
|
||||
|
||||
|
||||
inline coord_t *getDataByInternalId(idx_t internal_id) const {
|
||||
return (coord_t *)&data_level0_memory[internal_id * size_data_per_element + offset_data];
|
||||
}
|
||||
|
||||
inline idx_t *get_linklist0(idx_t internal_id) const {
|
||||
return (idx_t*)&data_level0_memory[internal_id * size_data_per_element];
|
||||
}
|
||||
|
||||
inline label_t *getExternalLabel(idx_t internal_id) const {
|
||||
return (label_t *)&data_level0_memory[internal_id * size_data_per_element + offset_label];
|
||||
}
|
||||
|
||||
std::priority_queue<std::pair<dist_t, idx_t>> searchBaseLayer(const coord_t *x, size_t ef);
|
||||
|
||||
void getNeighborsByHeuristic(std::priority_queue<std::pair<dist_t, idx_t>> &topResults, size_t NN);
|
||||
|
||||
void mutuallyConnectNewElement(const coord_t *x, idx_t id, std::priority_queue<std::pair<dist_t, idx_t>> topResults);
|
||||
|
||||
void addPoint(const coord_t *point, label_t label);
|
||||
|
||||
std::priority_queue<std::pair<dist_t, label_t>> searchKnn(const coord_t *query_data, size_t k);
|
||||
|
||||
dist_t fstdistfunc(const coord_t *x, const coord_t *y);
|
||||
};
|
||||
@@ -1,28 +0,0 @@
|
||||
SET enable_seqscan = off;
|
||||
CREATE TABLE t (val real[]);
|
||||
INSERT INTO t (val) VALUES ('{0,0,0}'), ('{1,2,3}'), ('{1,1,1}'), (NULL);
|
||||
CREATE INDEX ON t USING hnsw (val) WITH (maxelements = 10, dims=3, m=3);
|
||||
INSERT INTO t (val) VALUES (array[1,2,4]);
|
||||
explain SELECT * FROM t ORDER BY val <-> array[3,3,3];
|
||||
QUERY PLAN
|
||||
--------------------------------------------------------------------
|
||||
Index Scan using t_val_idx on t (cost=4.02..8.06 rows=3 width=36)
|
||||
Order By: (val <-> '{3,3,3}'::real[])
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM t ORDER BY val <-> array[3,3,3];
|
||||
val
|
||||
---------
|
||||
{1,2,3}
|
||||
{1,2,4}
|
||||
{1,1,1}
|
||||
{0,0,0}
|
||||
(4 rows)
|
||||
|
||||
SELECT COUNT(*) FROM t;
|
||||
count
|
||||
-------
|
||||
5
|
||||
(1 row)
|
||||
|
||||
DROP TABLE t;
|
||||
@@ -1,13 +0,0 @@
|
||||
SET enable_seqscan = off;
|
||||
|
||||
CREATE TABLE t (val real[]);
|
||||
INSERT INTO t (val) VALUES ('{0,0,0}'), ('{1,2,3}'), ('{1,1,1}'), (NULL);
|
||||
CREATE INDEX ON t USING hnsw (val) WITH (maxelements = 10, dims=3, m=3);
|
||||
|
||||
INSERT INTO t (val) VALUES (array[1,2,4]);
|
||||
|
||||
explain SELECT * FROM t ORDER BY val <-> array[3,3,3];
|
||||
SELECT * FROM t ORDER BY val <-> array[3,3,3];
|
||||
SELECT COUNT(*) FROM t;
|
||||
|
||||
DROP TABLE t;
|
||||
@@ -495,6 +495,7 @@ impl Manager {
|
||||
}
|
||||
|
||||
/// Update is_active flag and returns its value.
|
||||
// Timelines marked active are pushed to the broker by the `push_loop` task.
|
||||
fn update_is_active(
|
||||
&mut self,
|
||||
is_wal_backup_required: bool,
|
||||
|
||||
@@ -61,7 +61,9 @@ pub(crate) fn is_wal_backup_required(
|
||||
state: &StateSnapshot,
|
||||
) -> bool {
|
||||
num_computes > 0 ||
|
||||
// Currently only the whole segment is offloaded, so compare segment numbers.
|
||||
// This task backups completed segments only.
|
||||
// The current partial segment is backed up by a separate task/code module (wal_backup_partial).
|
||||
// So, need for completed segment backup <=> last backup was at at older segment.
|
||||
(state.commit_lsn.segment_number(wal_seg_size) > state.backup_lsn.segment_number(wal_seg_size))
|
||||
}
|
||||
|
||||
@@ -69,6 +71,11 @@ pub(crate) fn is_wal_backup_required(
|
||||
/// is me, run (per timeline) task, if not yet. OTOH, if it is not me and task
|
||||
/// is running, kill it.
|
||||
pub(crate) async fn update_task(mgr: &mut Manager, need_backup: bool, state: &StateSnapshot) {
|
||||
// Based on the peer information received from broker, each safekeeper figures out
|
||||
// whether it, or one of the peers, is the offloader.
|
||||
// The algorithm is deterministic, so, if all peers have the same information,
|
||||
// the system converges. In unconverged state, multiple peers upload the same
|
||||
// segments, which is inefficient but safe.
|
||||
let (offloader, election_dbg_str) =
|
||||
determine_offloader(&state.peers, state.backup_lsn, mgr.tli.ttid, &mgr.conf);
|
||||
let elected_me = Some(mgr.conf.my_id) == offloader;
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
-- this sadly isn't a "true" revert of the migration, as the column is now at the end of the table.
|
||||
-- But preserving order is not a trivial operation.
|
||||
-- https://wiki.postgresql.org/wiki/Alter_column_position
|
||||
ALTER TABLE safekeepers ADD active BOOLEAN NOT NULL DEFAULT false;
|
||||
@@ -0,0 +1 @@
|
||||
ALTER TABLE safekeepers DROP active;
|
||||
@@ -1258,7 +1258,6 @@ pub(crate) struct SafekeeperPersistence {
|
||||
pub(crate) version: i64,
|
||||
pub(crate) host: String,
|
||||
pub(crate) port: i32,
|
||||
pub(crate) active: bool,
|
||||
pub(crate) http_port: i32,
|
||||
pub(crate) availability_zone_id: String,
|
||||
pub(crate) scheduling_policy: String,
|
||||
@@ -1270,7 +1269,6 @@ impl SafekeeperPersistence {
|
||||
SkSchedulingPolicy::from_str(&self.scheduling_policy).map_err(|e| {
|
||||
DatabaseError::Logical(format!("can't construct SkSchedulingPolicy: {e:?}"))
|
||||
})?;
|
||||
// omit the `active` flag on purpose: it is deprecated.
|
||||
Ok(SafekeeperDescribeResponse {
|
||||
id: NodeId(self.id as u64),
|
||||
region_id: self.region_id.clone(),
|
||||
@@ -1295,7 +1293,8 @@ pub(crate) struct SafekeeperUpsert {
|
||||
pub(crate) version: i64,
|
||||
pub(crate) host: String,
|
||||
pub(crate) port: i32,
|
||||
pub(crate) active: bool,
|
||||
/// The active flag will not be stored in the database and will be ignored.
|
||||
pub(crate) active: Option<bool>,
|
||||
pub(crate) http_port: i32,
|
||||
pub(crate) availability_zone_id: String,
|
||||
}
|
||||
@@ -1311,7 +1310,6 @@ impl SafekeeperUpsert {
|
||||
version: self.version,
|
||||
host: &self.host,
|
||||
port: self.port,
|
||||
active: self.active,
|
||||
http_port: self.http_port,
|
||||
availability_zone_id: &self.availability_zone_id,
|
||||
// None means a wish to not update this column. We expose abilities to update it via other means.
|
||||
@@ -1328,7 +1326,6 @@ struct InsertUpdateSafekeeper<'a> {
|
||||
version: i64,
|
||||
host: &'a str,
|
||||
port: i32,
|
||||
active: bool,
|
||||
http_port: i32,
|
||||
availability_zone_id: &'a str,
|
||||
scheduling_policy: Option<&'a str>,
|
||||
|
||||
@@ -36,7 +36,6 @@ diesel::table! {
|
||||
version -> Int8,
|
||||
host -> Text,
|
||||
port -> Int4,
|
||||
active -> Bool,
|
||||
http_port -> Int4,
|
||||
availability_zone_id -> Text,
|
||||
scheduling_policy -> Varchar,
|
||||
|
||||
@@ -131,7 +131,6 @@ PAGESERVER_GLOBAL_METRICS: tuple[str, ...] = (
|
||||
"pageserver_getpage_reconstruct_seconds_sum",
|
||||
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
*histogram("pageserver_smgr_query_seconds_global"),
|
||||
*histogram("pageserver_layers_visited_per_read_global"),
|
||||
*histogram("pageserver_getpage_get_reconstruct_data_seconds"),
|
||||
*histogram("pageserver_wait_lsn_seconds"),
|
||||
*histogram("pageserver_remote_operation_seconds"),
|
||||
|
||||
@@ -84,9 +84,6 @@ page_cache_size=10
|
||||
log.info("Checking layer access metrics ...")
|
||||
|
||||
layer_access_metric_names = [
|
||||
"pageserver_layers_visited_per_read_global_sum",
|
||||
"pageserver_layers_visited_per_read_global_count",
|
||||
"pageserver_layers_visited_per_read_global_bucket",
|
||||
"pageserver_layers_visited_per_vectored_read_global_sum",
|
||||
"pageserver_layers_visited_per_vectored_read_global_count",
|
||||
"pageserver_layers_visited_per_vectored_read_global_bucket",
|
||||
@@ -97,12 +94,6 @@ page_cache_size=10
|
||||
layer_access_metrics = metrics.query_all(name)
|
||||
log.info(f"Got metrics: {layer_access_metrics}")
|
||||
|
||||
non_vectored_sum = metrics.query_one("pageserver_layers_visited_per_read_global_sum")
|
||||
non_vectored_count = metrics.query_one("pageserver_layers_visited_per_read_global_count")
|
||||
if non_vectored_count.value != 0:
|
||||
non_vectored_average = non_vectored_sum.value / non_vectored_count.value
|
||||
else:
|
||||
non_vectored_average = 0
|
||||
vectored_sum = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_sum")
|
||||
vectored_count = metrics.query_one("pageserver_layers_visited_per_vectored_read_global_count")
|
||||
if vectored_count.value > 0:
|
||||
@@ -113,11 +104,10 @@ page_cache_size=10
|
||||
assert vectored_sum.value == 0
|
||||
vectored_average = 0
|
||||
|
||||
log.info(f"{non_vectored_average=} {vectored_average=}")
|
||||
log.info(f"{vectored_average=}")
|
||||
|
||||
# The upper bound for average number of layer visits below (8)
|
||||
# was chosen empirically for this workload.
|
||||
assert non_vectored_average < 8
|
||||
assert vectored_average < 8
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user