Compare commits

..

2 Commits

Author SHA1 Message Date
Vlad Lazar
ffbf39d00e trigger ci 2024-07-26 11:52:36 +01:00
Vlad Lazar
a37a2af69e wip 2024-07-26 11:52:03 +01:00
65 changed files with 404 additions and 1970 deletions

View File

@@ -14,8 +14,11 @@ inputs:
api_host:
description: 'Neon API host'
default: console-stage.neon.build
provisioner:
description: 'k8s-pod or k8s-neonvm'
default: 'k8s-pod'
compute_units:
description: '[Min, Max] compute units'
description: '[Min, Max] compute units; Min and Max are used for k8s-neonvm with autoscaling, for k8s-pod values Min and Max should be equal'
default: '[1, 1]'
outputs:
@@ -34,6 +37,10 @@ runs:
# A shell without `set -x` to not to expose password/dsn in logs
shell: bash -euo pipefail {0}
run: |
if [ "${PROVISIONER}" == "k8s-pod" ] && [ "${MIN_CU}" != "${MAX_CU}" ]; then
echo >&2 "For k8s-pod provisioner MIN_CU should be equal to MAX_CU"
fi
project=$(curl \
"https://${API_HOST}/api/v2/projects" \
--fail \
@@ -45,7 +52,7 @@ runs:
\"name\": \"Created by actions/neon-project-create; GITHUB_RUN_ID=${GITHUB_RUN_ID}\",
\"pg_version\": ${POSTGRES_VERSION},
\"region_id\": \"${REGION_ID}\",
\"provisioner\": \"k8s-neonvm\",
\"provisioner\": \"${PROVISIONER}\",
\"autoscaling_limit_min_cu\": ${MIN_CU},
\"autoscaling_limit_max_cu\": ${MAX_CU},
\"settings\": { }
@@ -68,5 +75,6 @@ runs:
API_KEY: ${{ inputs.api_key }}
REGION_ID: ${{ inputs.region_id }}
POSTGRES_VERSION: ${{ inputs.postgres_version }}
PROVISIONER: ${{ inputs.provisioner }}
MIN_CU: ${{ fromJSON(inputs.compute_units)[0] }}
MAX_CU: ${{ fromJSON(inputs.compute_units)[1] }}

View File

@@ -131,8 +131,8 @@ runs:
exit 1
fi
if [[ "${{ inputs.run_in_parallel }}" == "true" ]]; then
# -n sets the number of parallel processes that pytest-xdist will run
EXTRA_PARAMS="-n12 $EXTRA_PARAMS"
# -n16 uses sixteen processes to run tests via pytest-xdist
EXTRA_PARAMS="-n16 $EXTRA_PARAMS"
# --dist=loadgroup points tests marked with @pytest.mark.xdist_group
# to the same worker to make @pytest.mark.order work with xdist

View File

@@ -278,6 +278,9 @@ jobs:
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty
BUILD_TAG: ${{ inputs.build-tag }}
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_IMPL: vectored
PAGESERVER_GET_IMPL: vectored
PAGESERVER_VALIDATE_VEC_GET: true
# Temporary disable this step until we figure out why it's so flaky
# Ref https://github.com/neondatabase/neon/issues/4540

View File

@@ -63,9 +63,11 @@ jobs:
- DEFAULT_PG_VERSION: 16
PLATFORM: "neon-staging"
region_id: ${{ github.event.inputs.region_id || 'aws-us-east-2' }}
provisioner: 'k8s-pod'
- DEFAULT_PG_VERSION: 16
PLATFORM: "azure-staging"
region_id: 'azure-eastus2'
provisioner: 'k8s-neonvm'
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "300"
TEST_PG_BENCH_SCALES_MATRIX: "10,100"
@@ -98,6 +100,7 @@ jobs:
region_id: ${{ matrix.region_id }}
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
provisioner: ${{ matrix.provisioner }}
- name: Run benchmark
uses: ./.github/actions/run-python-test-set
@@ -213,11 +216,11 @@ jobs:
# Create matrices for the benchmarking jobs, so we run benchmarks on rds only once a week (on Saturday)
#
# Available platforms:
# - neonvm-captest-new: Freshly created project (1 CU)
# - neonvm-captest-freetier: Use freetier-sized compute (0.25 CU)
# - neon-captest-new: Freshly created project (1 CU)
# - neon-captest-freetier: Use freetier-sized compute (0.25 CU)
# - neonvm-captest-azure-new: Freshly created project (1 CU) in azure region
# - neonvm-captest-azure-freetier: Use freetier-sized compute (0.25 CU) in azure region
# - neonvm-captest-reuse: Reusing existing project
# - neon-captest-reuse: Reusing existing project
# - rds-aurora: Aurora Postgres Serverless v2 with autoscaling from 0.5 to 2 ACUs
# - rds-postgres: RDS Postgres db.m5.large instance (2 vCPU, 8 GiB) with gp3 EBS storage
env:
@@ -242,16 +245,18 @@ jobs:
"'"$region_id_default"'"
],
"platform": [
"neonvm-captest-new",
"neonvm-captest-reuse",
"neon-captest-new",
"neon-captest-reuse",
"neonvm-captest-new"
],
"db_size": [ "10gb" ],
"include": [{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-freetier", "db_size": "3gb" },
"include": [{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neon-captest-freetier", "db_size": "3gb" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neon-captest-new", "db_size": "50gb" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-freetier", "db_size": "3gb" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-new", "db_size": "50gb" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-freetier", "db_size": "3gb" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "10gb" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "50gb" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-freetier", "db_size": "3gb" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "10gb" },
{ "pg_version": 16, "region_id": "azure-eastus2", "platform": "neonvm-azure-captest-new", "db_size": "50gb" },
{ "pg_version": 16, "region_id": "'"$region_id_default"'", "platform": "neonvm-captest-sharding-reuse", "db_size": "50gb" }]
}'
@@ -266,7 +271,7 @@ jobs:
run: |
matrix='{
"platform": [
"neonvm-captest-reuse"
"neon-captest-reuse"
]
}'
@@ -282,7 +287,7 @@ jobs:
run: |
matrix='{
"platform": [
"neonvm-captest-reuse"
"neon-captest-reuse"
],
"scale": [
"10"
@@ -333,7 +338,7 @@ jobs:
prefix: latest
- name: Create Neon Project
if: contains(fromJson('["neonvm-captest-new", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
if: contains(fromJson('["neon-captest-new", "neon-captest-freetier", "neonvm-captest-new", "neonvm-captest-freetier", "neonvm-azure-captest-freetier", "neonvm-azure-captest-new"]'), matrix.platform)
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
@@ -341,18 +346,19 @@ jobs:
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
compute_units: ${{ (contains(matrix.platform, 'captest-freetier') && '[0.25, 0.25]') || '[1, 1]' }}
provisioner: ${{ (contains(matrix.platform, 'neonvm-') && 'k8s-neonvm') || 'k8s-pod' }}
- name: Set up Connection String
id: set-up-connstr
run: |
case "${PLATFORM}" in
neonvm-captest-reuse)
neon-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CONNSTR }}
;;
neonvm-captest-sharding-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_SHARDING_CONNSTR }}
;;
neonvm-captest-new | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
neon-captest-new | neon-captest-freetier | neonvm-captest-new | neonvm-captest-freetier | neonvm-azure-captest-new | neonvm-azure-captest-freetier)
CONNSTR=${{ steps.create-neon-project.outputs.dsn }}
;;
rds-aurora)
@@ -436,9 +442,9 @@ jobs:
fail-fast: false
matrix:
include:
- PLATFORM: "neonvm-captest-pgvector"
- PLATFORM: "neon-captest-pgvector"
- PLATFORM: "azure-captest-pgvector"
env:
TEST_PG_BENCH_DURATIONS_MATRIX: "15m"
TEST_PG_BENCH_SCALES_MATRIX: "1"
@@ -480,7 +486,7 @@ jobs:
id: set-up-connstr
run: |
case "${PLATFORM}" in
neonvm-captest-pgvector)
neon-captest-pgvector)
CONNSTR=${{ secrets.BENCHMARK_PGVECTOR_CONNSTR }}
;;
azure-captest-pgvector)
@@ -579,7 +585,7 @@ jobs:
id: set-up-connstr
run: |
case "${PLATFORM}" in
neonvm-captest-reuse)
neon-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_CAPTEST_CLICKBENCH_10M_CONNSTR }}
;;
rds-aurora)
@@ -589,7 +595,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_RDS_POSTGRES_CLICKBENCH_10M_CONNSTR }}
;;
*)
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neonvm-captest-reuse', 'rds-aurora', or 'rds-postgres'"
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -666,7 +672,7 @@ jobs:
- name: Get Connstring Secret Name
run: |
case "${PLATFORM}" in
neonvm-captest-reuse)
neon-captest-reuse)
ENV_PLATFORM=CAPTEST_TPCH
;;
rds-aurora)
@@ -676,7 +682,7 @@ jobs:
ENV_PLATFORM=RDS_AURORA_TPCH
;;
*)
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neonvm-captest-reuse', 'rds-aurora', or 'rds-postgres'"
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac
@@ -753,7 +759,7 @@ jobs:
id: set-up-connstr
run: |
case "${PLATFORM}" in
neonvm-captest-reuse)
neon-captest-reuse)
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_CAPTEST_CONNSTR }}
;;
rds-aurora)
@@ -763,7 +769,7 @@ jobs:
CONNSTR=${{ secrets.BENCHMARK_USER_EXAMPLE_RDS_POSTGRES_CONNSTR }}
;;
*)
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neonvm-captest-reuse', 'rds-aurora', or 'rds-postgres'"
echo >&2 "Unknown PLATFORM=${PLATFORM}. Allowed only 'neon-captest-reuse', 'rds-aurora', or 'rds-postgres'"
exit 1
;;
esac

View File

@@ -286,6 +286,9 @@ jobs:
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
TEST_RESULT_CONNSTR: "${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}"
PAGESERVER_VIRTUAL_FILE_IO_ENGINE: tokio-epoll-uring
PAGESERVER_GET_VECTORED_IMPL: vectored
PAGESERVER_GET_IMPL: vectored
PAGESERVER_VALIDATE_VEC_GET: false
# XXX: no coverage data handling here, since benchmarks are run on release builds,
# while coverage is currently collected for the debug ones
@@ -833,9 +836,6 @@ jobs:
rm -rf .docker-custom
promote-images:
permissions:
contents: read # This is required for actions/checkout
id-token: write # This is required for Azure Login to work.
needs: [ check-permissions, tag, test-images, vm-compute-node-image ]
runs-on: ubuntu-22.04
@@ -862,28 +862,6 @@ jobs:
neondatabase/vm-compute-node-${version}:${{ needs.tag.outputs.build-tag }}
done
- name: Azure login
if: github.ref_name == 'main'
uses: azure/login@6c251865b4e6290e7b78be643ea2d005bc51f69a # @v2.1.1
with:
client-id: ${{ secrets.AZURE_DEV_CLIENT_ID }}
tenant-id: ${{ secrets.AZURE_TENANT_ID }}
subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
- name: Login to ACR
if: github.ref_name == 'main'
run: |
az acr login --name=neoneastus2
- name: Copy docker images to ACR-dev
if: github.ref_name == 'main'
run: |
for image in neon compute-tools {vm-,}compute-node-{v14,v15,v16}; do
docker buildx imagetools create \
-t neoneastus2.azurecr.io/neondatabase/${image}:${{ needs.tag.outputs.build-tag }} \
neondatabase/${image}:${{ needs.tag.outputs.build-tag }}
done
- name: Add latest tag to images
if: github.ref_name == 'main'
run: |

View File

@@ -13,7 +13,6 @@ on:
paths:
- '.github/workflows/pg-clients.yml'
- 'test_runner/pg_clients/**'
- 'test_runner/logical_repl/**'
- 'poetry.lock'
workflow_dispatch:
@@ -50,77 +49,6 @@ jobs:
image-tag: ${{ needs.check-build-tools-image.outputs.image-tag }}
secrets: inherit
test-logical-replication:
needs: [ build-build-tools-image ]
runs-on: ubuntu-22.04
container:
image: ${{ needs.build-build-tools-image.outputs.image }}
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init --user root
services:
clickhouse:
image: clickhouse/clickhouse-server:24.6.3.64
ports:
- 9000:9000
- 8123:8123
steps:
- uses: actions/checkout@v4
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
- name: Create Neon Project
id: create-neon-project
uses: ./.github/actions/neon-project-create
with:
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
postgres_version: ${{ env.DEFAULT_PG_VERSION }}
- name: Run tests
uses: ./.github/actions/run-python-test-set
with:
build_type: remote
test_selection: logical_repl
run_in_parallel: false
extra_params: -m remote_cluster
pg_version: ${{ env.DEFAULT_PG_VERSION }}
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
- name: Delete Neon Project
if: always()
uses: ./.github/actions/neon-project-delete
with:
project_id: ${{ steps.create-neon-project.outputs.project_id }}
api_key: ${{ secrets.NEON_STAGING_API_KEY }}
- name: Create Allure report
if: ${{ !cancelled() }}
id: create-allure-report
uses: ./.github/actions/allure-report-generate
with:
store-test-results-into-db: true
env:
REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
- name: Post to a Slack channel
if: github.event.schedule && failure()
uses: slackapi/slack-github-action@v1
with:
channel-id: "C06KHQVQ7U3" # on-call-qa-staging-stream
slack-message: |
Testing the logical replication: <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|${{ job.status }}> (<${{ steps.create-allure-report.outputs.report-url }}|test report>)
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
test-postgres-client-libs:
needs: [ build-build-tools-image ]
runs-on: ubuntu-22.04

View File

@@ -1,13 +1,13 @@
/compute_tools/ @neondatabase/control-plane @neondatabase/compute
/storage_controller @neondatabase/storage
/libs/pageserver_api/ @neondatabase/storage
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/storage
/libs/postgres_ffi/ @neondatabase/compute @neondatabase/safekeepers
/libs/remote_storage/ @neondatabase/storage
/libs/safekeeper_api/ @neondatabase/storage
/libs/safekeeper_api/ @neondatabase/safekeepers
/libs/vm_monitor/ @neondatabase/autoscaling
/pageserver/ @neondatabase/storage
/pgxn/ @neondatabase/compute
/pgxn/neon/ @neondatabase/compute @neondatabase/storage
/pgxn/neon/ @neondatabase/compute @neondatabase/safekeepers
/proxy/ @neondatabase/proxy
/safekeeper/ @neondatabase/storage
/safekeeper/ @neondatabase/safekeepers
/vendor/ @neondatabase/compute

2
Cargo.lock generated
View File

@@ -1672,7 +1672,6 @@ checksum = "62d6dcd069e7b5fe49a302411f759d4cf1cf2c27fe798ef46fb8baefc053dd2b"
dependencies = [
"bitflags 2.4.1",
"byteorder",
"chrono",
"diesel_derives",
"itoa",
"pq-sys",
@@ -5719,7 +5718,6 @@ dependencies = [
"aws-config",
"bytes",
"camino",
"chrono",
"clap",
"control_plane",
"diesel",

View File

@@ -313,5 +313,3 @@ To get more familiar with this aspect, refer to:
- Read [CONTRIBUTING.md](/CONTRIBUTING.md) to learn about project code style and practices.
- To get familiar with a source tree layout, use [sourcetree.md](/docs/sourcetree.md).
- To learn more about PostgreSQL internals, check http://www.interdb.jp/pg/index.html
.

View File

@@ -514,6 +514,7 @@ impl LocalEnv {
#[derive(serde::Serialize, serde::Deserialize)]
// (allow unknown fields, unlike PageServerConf)
struct PageserverConfigTomlSubset {
id: NodeId,
listen_pg_addr: String,
listen_http_addr: String,
pg_auth_type: AuthType,
@@ -525,30 +526,18 @@ impl LocalEnv {
.with_context(|| format!("read {:?}", config_toml_path))?,
)
.context("parse pageserver.toml")?;
let identity_toml_path = dentry.path().join("identity.toml");
#[derive(serde::Serialize, serde::Deserialize)]
struct IdentityTomlSubset {
id: NodeId,
}
let identity_toml: IdentityTomlSubset = toml_edit::de::from_str(
&std::fs::read_to_string(&identity_toml_path)
.with_context(|| format!("read {:?}", identity_toml_path))?,
)
.context("parse identity.toml")?;
let PageserverConfigTomlSubset {
id: config_toml_id,
listen_pg_addr,
listen_http_addr,
pg_auth_type,
http_auth_type,
} = config_toml;
let IdentityTomlSubset {
id: identity_toml_id,
} = identity_toml;
let conf = PageServerConf {
id: {
anyhow::ensure!(
identity_toml_id == id,
"id mismatch: identity.toml:id={identity_toml_id} pageserver_(.*) id={id}",
config_toml_id == id,
"id mismatch: config_toml.id={config_toml_id} id={id}",
);
id
},

View File

@@ -127,13 +127,10 @@ impl PageServerNode {
}
// Apply the user-provided overrides
overrides.push({
let mut doc =
toml_edit::ser::to_document(&conf).expect("we deserialized this from toml earlier");
// `id` is written out to `identity.toml` instead of `pageserver.toml`
doc.remove("id").expect("it's part of the struct");
doc.to_string()
});
overrides.push(
toml_edit::ser::to_string_pretty(&conf)
.expect("we deserialized this from toml earlier"),
);
// Turn `overrides` into a toml document.
// TODO: above code is legacy code, it should be refactored to use toml_edit directly.

View File

@@ -1,5 +1,5 @@
use std::str::FromStr;
use std::time::{Duration, Instant};
use std::time::Instant;
/// Request/response types for the storage controller
/// API (`/control/v1` prefix). Implemented by the server
@@ -294,42 +294,6 @@ pub enum PlacementPolicy {
#[derive(Serialize, Deserialize, Debug)]
pub struct TenantShardMigrateResponse {}
/// Metadata health record posted from scrubber.
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthRecord {
pub tenant_shard_id: TenantShardId,
pub healthy: bool,
pub last_scrubbed_at: chrono::DateTime<chrono::Utc>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthUpdateRequest {
pub healthy_tenant_shards: Vec<TenantShardId>,
pub unhealthy_tenant_shards: Vec<TenantShardId>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthUpdateResponse {}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthListUnhealthyResponse {
pub unhealthy_tenant_shards: Vec<TenantShardId>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthListOutdatedRequest {
#[serde(with = "humantime_serde")]
pub not_scrubbed_for: Duration,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct MetadataHealthListOutdatedResponse {
pub health_records: Vec<MetadataHealthRecord>,
}
#[cfg(test)]
mod test {
use super::*;

View File

@@ -355,8 +355,7 @@ impl RemoteStorage for AzureBlobStorage {
.blobs()
.map(|k| ListingObject{
key: self.name_to_relative_path(&k.name),
last_modified: k.properties.last_modified.into(),
size: k.properties.content_length,
last_modified: k.properties.last_modified.into()
}
);

View File

@@ -153,7 +153,6 @@ pub enum ListingMode {
pub struct ListingObject {
pub key: RemotePath,
pub last_modified: SystemTime,
pub size: u64,
}
#[derive(Default)]
@@ -195,7 +194,7 @@ pub trait RemoteStorage: Send + Sync + 'static {
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> + Send;
) -> impl Stream<Item = Result<Listing, DownloadError>>;
async fn list(
&self,
@@ -352,10 +351,10 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &'a CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a + Send {
) -> impl Stream<Item = Result<Listing, DownloadError>> + 'a {
match self {
Self::LocalFs(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel))
as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>> + Send>>,
as Pin<Box<dyn Stream<Item = Result<Listing, DownloadError>>>>,
Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),

View File

@@ -368,7 +368,6 @@ impl RemoteStorage for LocalFs {
key: k.clone(),
// LocalFs is just for testing, so just specify a dummy time
last_modified: SystemTime::now(),
size: 0,
})
}
})
@@ -412,7 +411,6 @@ impl RemoteStorage for LocalFs {
key: RemotePath::from_string(&relative_key).unwrap(),
// LocalFs is just for testing
last_modified: SystemTime::now(),
size: 0,
});
}
}

View File

@@ -565,12 +565,9 @@ impl RemoteStorage for S3Bucket {
}
};
let size = object.size.unwrap_or(0) as u64;
result.keys.push(ListingObject{
key,
last_modified,
size,
last_modified
});
if let Some(mut mk) = max_keys {
assert!(mk > 0);

View File

@@ -114,7 +114,7 @@ impl RemoteStorage for UnreliableWrapper {
mode: ListingMode,
max_keys: Option<NonZeroU32>,
cancel: &CancellationToken,
) -> impl Stream<Item = Result<Listing, DownloadError>> + Send {
) -> impl Stream<Item = Result<Listing, DownloadError>> {
async_stream::stream! {
self.attempt(RemoteOp::ListPrefixes(prefix.cloned()))
.map_err(DownloadError::Other)?;

View File

@@ -18,20 +18,20 @@ const STORAGE_TOKEN_ALGORITHM: Algorithm = Algorithm::EdDSA;
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum Scope {
/// Provides access to all data for a specific tenant (specified in `struct Claims` below)
// Provides access to all data for a specific tenant (specified in `struct Claims` below)
// TODO: join these two?
Tenant,
/// Provides blanket access to all tenants on the pageserver plus pageserver-wide APIs.
/// Should only be used e.g. for status check/tenant creation/list.
// Provides blanket access to all tenants on the pageserver plus pageserver-wide APIs.
// Should only be used e.g. for status check/tenant creation/list.
PageServerApi,
/// Provides blanket access to all data on the safekeeper plus safekeeper-wide APIs.
/// Should only be used e.g. for status check.
/// Currently also used for connection from any pageserver to any safekeeper.
// Provides blanket access to all data on the safekeeper plus safekeeper-wide APIs.
// Should only be used e.g. for status check.
// Currently also used for connection from any pageserver to any safekeeper.
SafekeeperData,
/// The scope used by pageservers in upcalls to storage controller and cloud control plane
// The scope used by pageservers in upcalls to storage controller and cloud control plane
#[serde(rename = "generations_api")]
GenerationsApi,
/// Allows access to control plane managment API and some storage controller endpoints.
// Allows access to control plane managment API and some storage controller endpoints.
Admin,
/// Allows access to storage controller APIs used by the scrubber, to interrogate the state

View File

@@ -52,7 +52,7 @@ pub mod defaults {
use pageserver_api::models::ImageCompressionAlgorithm;
pub use storage_broker::DEFAULT_ENDPOINT as BROKER_DEFAULT_ENDPOINT;
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "300 s";
pub const DEFAULT_WAIT_LSN_TIMEOUT: &str = "60 s";
pub const DEFAULT_WAL_REDO_TIMEOUT: &str = "60 s";
pub const DEFAULT_SUPERUSER: &str = "cloud_admin";
@@ -83,16 +83,16 @@ pub mod defaults {
#[cfg(not(target_os = "linux"))]
pub const DEFAULT_VIRTUAL_FILE_IO_ENGINE: &str = "std-fs";
pub const DEFAULT_GET_VECTORED_IMPL: &str = "vectored";
pub const DEFAULT_GET_VECTORED_IMPL: &str = "sequential";
pub const DEFAULT_GET_IMPL: &str = "vectored";
pub const DEFAULT_GET_IMPL: &str = "legacy";
pub const DEFAULT_MAX_VECTORED_READ_BYTES: usize = 128 * 1024; // 128 KiB
pub const DEFAULT_IMAGE_COMPRESSION: ImageCompressionAlgorithm =
ImageCompressionAlgorithm::Disabled;
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = false;
pub const DEFAULT_VALIDATE_VECTORED_GET: bool = true;
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
@@ -356,6 +356,8 @@ struct PageServerConfigBuilder {
auth_validation_public_key_path: BuilderValue<Option<Utf8PathBuf>>,
remote_storage_config: BuilderValue<Option<RemoteStorageConfig>>,
id: BuilderValue<NodeId>,
broker_endpoint: BuilderValue<Uri>,
broker_keepalive_interval: BuilderValue<Duration>,
@@ -404,8 +406,11 @@ struct PageServerConfigBuilder {
}
impl PageServerConfigBuilder {
fn new() -> Self {
Self::default()
fn new(node_id: NodeId) -> Self {
let mut this = Self::default();
this.id(node_id);
this
}
#[inline(always)]
@@ -433,6 +438,7 @@ impl PageServerConfigBuilder {
pg_auth_type: Set(AuthType::Trust),
auth_validation_public_key_path: Set(None),
remote_storage_config: Set(None),
id: NotSet,
broker_endpoint: Set(storage_broker::DEFAULT_ENDPOINT
.parse()
.expect("failed to parse default broker endpoint")),
@@ -562,6 +568,10 @@ impl PageServerConfigBuilder {
self.broker_keepalive_interval = BuilderValue::Set(broker_keepalive_interval)
}
pub fn id(&mut self, node_id: NodeId) {
self.id = BuilderValue::Set(node_id)
}
pub fn log_format(&mut self, log_format: LogFormat) {
self.log_format = BuilderValue::Set(log_format)
}
@@ -673,7 +683,7 @@ impl PageServerConfigBuilder {
self.l0_flush = BuilderValue::Set(value);
}
pub fn build(self, id: NodeId) -> anyhow::Result<PageServerConf> {
pub fn build(self) -> anyhow::Result<PageServerConf> {
let default = Self::default_values();
macro_rules! conf {
@@ -706,6 +716,7 @@ impl PageServerConfigBuilder {
pg_auth_type,
auth_validation_public_key_path,
remote_storage_config,
id,
broker_endpoint,
broker_keepalive_interval,
log_format,
@@ -733,7 +744,6 @@ impl PageServerConfigBuilder {
}
CUSTOM LOGIC
{
id: id,
// TenantConf is handled separately
default_tenant_conf: TenantConf::default(),
concurrent_tenant_warmup: ConfigurableSemaphore::new({
@@ -883,7 +893,7 @@ impl PageServerConf {
toml: &Document,
workdir: &Utf8Path,
) -> anyhow::Result<Self> {
let mut builder = PageServerConfigBuilder::new();
let mut builder = PageServerConfigBuilder::new(node_id);
builder.workdir(workdir.to_owned());
let mut t_conf = TenantConfOpt::default();
@@ -914,6 +924,8 @@ impl PageServerConf {
"tenant_config" => {
t_conf = TenantConfOpt::try_from(item.to_owned()).context(format!("failed to parse: '{key}'"))?;
}
"id" => {}, // Ignoring `id` field in pageserver.toml - using identity.toml as the source of truth
// Logging is not set up yet, so we can't do it.
"broker_endpoint" => builder.broker_endpoint(parse_toml_string(key, item)?.parse().context("failed to parse broker endpoint")?),
"broker_keepalive_interval" => builder.broker_keepalive_interval(parse_toml_duration(key, item)?),
"log_format" => builder.log_format(
@@ -1006,7 +1018,7 @@ impl PageServerConf {
}
}
let mut conf = builder.build(node_id).context("invalid config")?;
let mut conf = builder.build().context("invalid config")?;
if conf.http_auth_type == AuthType::NeonJWT || conf.pg_auth_type == AuthType::NeonJWT {
let auth_validation_public_key_path = conf
@@ -1243,6 +1255,7 @@ max_file_descriptors = 333
# initial superuser role name to use when creating a new tenant
initial_superuser_name = 'zzzz'
id = 10
metric_collection_interval = '222 s'
metric_collection_endpoint = 'http://localhost:80/metrics'
@@ -1259,8 +1272,9 @@ background_task_maximum_delay = '334 s'
let (workdir, pg_distrib_dir) = prepare_fs(&tempdir)?;
let broker_endpoint = storage_broker::DEFAULT_ENDPOINT;
// we have to create dummy values to overcome the validation errors
let config_string =
format!("pg_distrib_dir='{pg_distrib_dir}'\nbroker_endpoint = '{broker_endpoint}'",);
let config_string = format!(
"pg_distrib_dir='{pg_distrib_dir}'\nid=10\nbroker_endpoint = '{broker_endpoint}'",
);
let toml = config_string.parse()?;
let parsed_config = PageServerConf::parse_and_validate(NodeId(10), &toml, &workdir)
@@ -1565,6 +1579,7 @@ broker_endpoint = '{broker_endpoint}'
r#"pg_distrib_dir = "{pg_distrib_dir}"
metric_collection_endpoint = "http://sample.url"
metric_collection_interval = "10min"
id = 222
[disk_usage_based_eviction]
max_usage_pct = 80
@@ -1634,6 +1649,7 @@ threshold = "20m"
r#"pg_distrib_dir = "{pg_distrib_dir}"
metric_collection_endpoint = "http://sample.url"
metric_collection_interval = "10min"
id = 222
[tenant_config]
evictions_low_residence_duration_metric_threshold = "20m"

View File

@@ -2129,24 +2129,14 @@ async fn secondary_download_handler(
let timeout = wait.unwrap_or(Duration::MAX);
let result = tokio::time::timeout(
let status = match tokio::time::timeout(
timeout,
state.secondary_controller.download_tenant(tenant_shard_id),
)
.await;
let progress = secondary_tenant.progress.lock().unwrap().clone();
let status = match result {
Ok(Ok(())) => {
if progress.layers_downloaded >= progress.layers_total {
// Download job ran to completion
StatusCode::OK
} else {
// Download dropped out without errors because it ran out of time budget
StatusCode::ACCEPTED
}
}
.await
{
// Download job ran to completion.
Ok(Ok(())) => StatusCode::OK,
// Edge case: downloads aren't usually fallible: things like a missing heatmap are considered
// okay. We could get an error here in the unlikely edge case that the tenant
// was detached between our check above and executing the download job.
@@ -2156,6 +2146,8 @@ async fn secondary_download_handler(
Err(_) => StatusCode::ACCEPTED,
};
let progress = secondary_tenant.progress.lock().unwrap().clone();
json_response(status, progress)
}

View File

@@ -2,23 +2,13 @@ use std::{num::NonZeroUsize, sync::Arc};
use crate::tenant::ephemeral_file;
#[derive(Debug, PartialEq, Eq, Clone, serde::Deserialize)]
#[derive(Default, Debug, PartialEq, Eq, Clone, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum L0FlushConfig {
#[default]
PageCached,
#[serde(rename_all = "snake_case")]
Direct {
max_concurrency: NonZeroUsize,
},
}
impl Default for L0FlushConfig {
fn default() -> Self {
Self::Direct {
// TODO: using num_cpus results in different peak memory usage on different instance types.
max_concurrency: NonZeroUsize::new(usize::max(1, num_cpus::get())).unwrap(),
}
}
Direct { max_concurrency: NonZeroUsize },
}
#[derive(Clone)]

View File

@@ -613,23 +613,7 @@ pub(crate) static CIRCUIT_BREAKERS_UNBROKEN: Lazy<IntCounter> = Lazy::new(|| {
pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_compression_image_in_bytes_total",
"Size of data written into image layers before compression"
)
.expect("failed to define a metric")
});
pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_compression_image_in_bytes_considered",
"Size of potentially compressible data written into image layers before compression"
)
.expect("failed to define a metric")
});
pub(crate) static COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"pageserver_compression_image_in_bytes_chosen",
"Size of data whose compressed form was written into image layers"
"Size of uncompressed data written into image layers"
)
.expect("failed to define a metric")
});

View File

@@ -102,7 +102,8 @@ use std::fmt::Debug;
use std::fmt::Display;
use std::fs;
use std::fs::File;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::{Duration, Instant};
@@ -1226,29 +1227,11 @@ impl Tenant {
Ok(timeline_preloads)
}
pub(crate) async fn apply_timeline_archival_config(
pub async fn apply_timeline_archival_config(
&self,
timeline_id: TimelineId,
state: TimelineArchivalState,
_timeline_id: TimelineId,
_config: TimelineArchivalState,
) -> anyhow::Result<()> {
let timeline = self
.get_timeline(timeline_id, false)
.context("Cannot apply timeline archival config to inexistent timeline")?;
let upload_needed = timeline
.remote_client
.schedule_index_upload_for_timeline_archival_state(state)?;
if upload_needed {
const MAX_WAIT: Duration = Duration::from_secs(10);
let Ok(v) =
tokio::time::timeout(MAX_WAIT, timeline.remote_client.wait_completion()).await
else {
tracing::warn!("reached timeout for waiting on upload queue");
bail!("reached timeout for upload queue flush");
};
v?;
}
Ok(())
}
@@ -1633,23 +1616,21 @@ impl Tenant {
/// This function is periodically called by compactor task.
/// Also it can be explicitly requested per timeline through page server
/// api's 'compact' command.
///
/// Returns whether we have pending compaction task.
async fn compaction_iteration(
&self,
cancel: &CancellationToken,
ctx: &RequestContext,
) -> Result<bool, timeline::CompactionError> {
) -> Result<(), timeline::CompactionError> {
// Don't start doing work during shutdown, or when broken, we do not need those in the logs
if !self.is_active() {
return Ok(false);
return Ok(());
}
{
let conf = self.tenant_conf.load();
if !conf.location.may_delete_layers_hint() || !conf.location.may_upload_layers_hint() {
info!("Skipping compaction in location state {:?}", conf.location);
return Ok(false);
return Ok(());
}
}
@@ -1676,13 +1657,11 @@ impl Tenant {
// Before doing any I/O work, check our circuit breaker
if self.compaction_circuit_breaker.lock().unwrap().is_broken() {
info!("Skipping compaction due to previous failures");
return Ok(false);
return Ok(());
}
let mut has_pending_task = false;
for (timeline_id, timeline) in &timelines_to_compact {
has_pending_task |= timeline
timeline
.compact(cancel, EnumSet::empty(), ctx)
.instrument(info_span!("compact_timeline", %timeline_id))
.await
@@ -1702,7 +1681,7 @@ impl Tenant {
.unwrap()
.success(&CIRCUIT_BREAKERS_UNBROKEN);
Ok(has_pending_task)
Ok(())
}
// Call through to all timelines to freeze ephemeral layers if needed. Usually

View File

@@ -28,12 +28,6 @@ use crate::virtual_file::VirtualFile;
use std::cmp::min;
use std::io::{Error, ErrorKind};
#[derive(Copy, Clone, Debug)]
pub struct CompressionInfo {
pub written_compressed: bool,
pub compressed_size: Option<usize>,
}
impl<'a> BlockCursor<'a> {
/// Read a blob into a new buffer.
pub async fn read_blob(
@@ -279,10 +273,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
srcbuf: B,
ctx: &RequestContext,
) -> (B::Buf, Result<u64, Error>) {
let (buf, res) = self
.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
.await;
(buf, res.map(|(off, _compression_info)| off))
self.write_blob_maybe_compressed(srcbuf, ctx, ImageCompressionAlgorithm::Disabled)
.await
}
/// Write a blob of data. Returns the offset that it was written to,
@@ -292,12 +284,8 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
srcbuf: B,
ctx: &RequestContext,
algorithm: ImageCompressionAlgorithm,
) -> (B::Buf, Result<(u64, CompressionInfo), Error>) {
) -> (B::Buf, Result<u64, Error>) {
let offset = self.offset;
let mut compression_info = CompressionInfo {
written_compressed: false,
compressed_size: None,
};
let len = srcbuf.bytes_init();
@@ -340,9 +328,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
encoder.write_all(&slice[..]).await.unwrap();
encoder.shutdown().await.unwrap();
let compressed = encoder.into_inner();
compression_info.compressed_size = Some(compressed.len());
if compressed.len() < len {
compression_info.written_compressed = true;
let compressed_len = compressed.len();
compressed_buf = Some(compressed);
(BYTE_ZSTD, compressed_len, slice.into_inner())
@@ -373,7 +359,7 @@ impl<const BUFFERED: bool> BlobWriter<BUFFERED> {
} else {
self.write_all(srcbuf, ctx).await
};
(srcbuf, res.map(|_| (offset, compression_info)))
(srcbuf, res.map(|_| offset))
}
}
@@ -430,14 +416,12 @@ pub(crate) mod tests {
let mut wtr = BlobWriter::<BUFFERED>::new(file, 0);
for blob in blobs.iter() {
let (_, res) = if compression {
let res = wtr
.write_blob_maybe_compressed(
blob.clone(),
ctx,
ImageCompressionAlgorithm::Zstd { level: Some(1) },
)
.await;
(res.0, res.1.map(|(off, _)| off))
wtr.write_blob_maybe_compressed(
blob.clone(),
ctx,
ImageCompressionAlgorithm::Zstd { level: Some(1) },
)
.await
} else {
wtr.write_blob(blob.clone(), ctx).await
};

View File

@@ -1384,32 +1384,34 @@ impl TenantManager {
tenant_shard_id: TenantShardId,
) -> Result<(), DeleteTenantError> {
let remote_path = remote_tenant_path(&tenant_shard_id);
let mut keys_stream = self.resources.remote_storage.list_streaming(
Some(&remote_path),
remote_storage::ListingMode::NoDelimiter,
None,
&self.cancel,
);
while let Some(chunk) = keys_stream.next().await {
let keys = match chunk {
Ok(listing) => listing.keys,
Err(remote_storage::DownloadError::Cancelled) => {
return Err(DeleteTenantError::Cancelled)
}
Err(remote_storage::DownloadError::NotFound) => return Ok(()),
Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))),
};
if keys.is_empty() {
tracing::info!("Remote storage already deleted");
} else {
tracing::info!("Deleting {} keys from remote storage", keys.len());
let keys = keys.into_iter().map(|o| o.key).collect::<Vec<_>>();
self.resources
.remote_storage
.delete_objects(&keys, &self.cancel)
.await?;
let keys = match self
.resources
.remote_storage
.list(
Some(&remote_path),
remote_storage::ListingMode::NoDelimiter,
None,
&self.cancel,
)
.await
{
Ok(listing) => listing.keys,
Err(remote_storage::DownloadError::Cancelled) => {
return Err(DeleteTenantError::Cancelled)
}
Err(remote_storage::DownloadError::NotFound) => return Ok(()),
Err(other) => return Err(DeleteTenantError::Other(anyhow::anyhow!(other))),
};
if keys.is_empty() {
tracing::info!("Remote storage already deleted");
} else {
tracing::info!("Deleting {} keys from remote storage", keys.len());
let keys = keys.into_iter().map(|o| o.key).collect::<Vec<_>>();
self.resources
.remote_storage
.delete_objects(&keys, &self.cancel)
.await?;
}
Ok(())

View File

@@ -187,7 +187,7 @@ use camino::Utf8Path;
use chrono::{NaiveDateTime, Utc};
pub(crate) use download::download_initdb_tar_zst;
use pageserver_api::models::{AuxFilePolicy, TimelineArchivalState};
use pageserver_api::models::AuxFilePolicy;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use scopeguard::ScopeGuard;
use tokio_util::sync::CancellationToken;
@@ -457,17 +457,6 @@ impl RemoteTimelineClient {
.unwrap_or(false)
}
/// Returns whether the timeline is archived.
/// Return None if the remote index_part hasn't been downloaded yet.
pub(crate) fn is_archived(&self) -> Option<bool> {
self.upload_queue
.lock()
.unwrap()
.initialized_mut()
.map(|q| q.clean.0.archived_at.is_some())
.ok()
}
fn update_remote_physical_size_gauge(&self, current_remote_index_part: Option<&IndexPart>) {
let size: u64 = if let Some(current_remote_index_part) = current_remote_index_part {
current_remote_index_part
@@ -628,7 +617,7 @@ impl RemoteTimelineClient {
Ok(())
}
/// Launch an index-file upload operation in the background, with only the `aux_file_policy` flag updated.
/// Launch an index-file upload operation in the background, with only aux_file_policy flag updated.
pub(crate) fn schedule_index_upload_for_aux_file_policy_update(
self: &Arc<Self>,
last_aux_file_policy: Option<AuxFilePolicy>,
@@ -639,48 +628,6 @@ impl RemoteTimelineClient {
self.schedule_index_upload(upload_queue)?;
Ok(())
}
/// Launch an index-file upload operation in the background, with only the `archived_at` field updated.
///
/// Returns whether it is required to wait for the queue to be empty to ensure that the change is uploaded,
/// so either if the change is already sitting in the queue, but not commited yet, or the change has not
/// been in the queue yet.
pub(crate) fn schedule_index_upload_for_timeline_archival_state(
self: &Arc<Self>,
state: TimelineArchivalState,
) -> anyhow::Result<bool> {
let mut guard = self.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut()?;
/// Returns Some(_) if a change is needed, and Some(true) if it's a
/// change needed to set archived_at.
fn need_change(
archived_at: &Option<NaiveDateTime>,
state: TimelineArchivalState,
) -> Option<bool> {
match (archived_at, state) {
(Some(_), TimelineArchivalState::Archived)
| (None, TimelineArchivalState::Unarchived) => {
// Nothing to do
tracing::info!("intended state matches present state");
None
}
(None, TimelineArchivalState::Archived) => Some(true),
(Some(_), TimelineArchivalState::Unarchived) => Some(false),
}
}
let need_upload_scheduled = need_change(&upload_queue.dirty.archived_at, state);
if let Some(archived_at_set) = need_upload_scheduled {
let intended_archived_at = archived_at_set.then(|| Utc::now().naive_utc());
upload_queue.dirty.archived_at = intended_archived_at;
self.schedule_index_upload(upload_queue)?;
}
let need_wait = need_change(&upload_queue.clean.0.archived_at, state).is_some();
Ok(need_wait)
}
///
/// Launch an index-file upload operation in the background, if necessary.
///

View File

@@ -32,10 +32,6 @@ pub struct IndexPart {
#[serde(skip_serializing_if = "Option::is_none")]
pub deleted_at: Option<NaiveDateTime>,
#[serde(default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub archived_at: Option<NaiveDateTime>,
/// Per layer file name metadata, which can be present for a present or missing layer file.
///
/// Older versions of `IndexPart` will not have this property or have only a part of metadata
@@ -84,11 +80,10 @@ impl IndexPart {
/// - 5: lineage was added
/// - 6: last_aux_file_policy is added.
/// - 7: metadata_bytes is no longer written, but still read
/// - 8: added `archived_at`
const LATEST_VERSION: usize = 8;
const LATEST_VERSION: usize = 7;
// Versions we may see when reading from a bucket.
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8];
pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7];
pub const FILE_NAME: &'static str = "index_part.json";
@@ -99,7 +94,6 @@ impl IndexPart {
disk_consistent_lsn: metadata.disk_consistent_lsn(),
metadata,
deleted_at: None,
archived_at: None,
lineage: Default::default(),
last_aux_file_policy: None,
}
@@ -290,7 +284,6 @@ mod tests {
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
deleted_at: None,
archived_at: None,
lineage: Lineage::default(),
last_aux_file_policy: None,
};
@@ -333,7 +326,6 @@ mod tests {
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
deleted_at: None,
archived_at: None,
lineage: Lineage::default(),
last_aux_file_policy: None,
};
@@ -377,7 +369,6 @@ mod tests {
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
archived_at: None,
lineage: Lineage::default(),
last_aux_file_policy: None,
};
@@ -424,7 +415,6 @@ mod tests {
])
.unwrap(),
deleted_at: None,
archived_at: None,
lineage: Lineage::default(),
last_aux_file_policy: None,
};
@@ -466,7 +456,6 @@ mod tests {
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
archived_at: None,
lineage: Lineage::default(),
last_aux_file_policy: None,
};
@@ -507,7 +496,6 @@ mod tests {
disk_consistent_lsn: Lsn::from_str("0/15A7618").unwrap(),
metadata: TimelineMetadata::from_bytes(&[226,88,25,241,0,46,0,4,0,0,0,0,1,90,118,24,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,78,244,32,0,0,0,0,1,78,244,32,0,0,0,16,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
deleted_at: None,
archived_at: None,
lineage: Lineage {
reparenting_history_truncated: false,
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
@@ -557,7 +545,6 @@ mod tests {
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::from_bytes(&[113,11,159,210,0,54,0,4,0,0,0,0,1,105,96,232,1,0,0,0,0,1,105,96,112,0,0,0,0,0,0,0,0,0,0,0,0,0,1,105,96,112,0,0,0,0,1,105,96,112,0,0,0,14,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]).unwrap(),
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
archived_at: None,
lineage: Lineage {
reparenting_history_truncated: false,
reparenting_history: vec![TimelineId::from_str("e1bfd8c633d713d279e6fcd2bcc15b6d").unwrap()],
@@ -616,63 +603,6 @@ mod tests {
14,
).with_recalculated_checksum().unwrap(),
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
archived_at: None,
lineage: Default::default(),
last_aux_file_policy: Default::default(),
};
let part = IndexPart::from_s3_bytes(example.as_bytes()).unwrap();
assert_eq!(part, expected);
}
#[test]
fn v8_indexpart_is_parsed() {
let example = r#"{
"version": 8,
"layer_metadata":{
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 },
"000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 }
},
"disk_consistent_lsn":"0/16960E8",
"metadata": {
"disk_consistent_lsn": "0/16960E8",
"prev_record_lsn": "0/1696070",
"ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e",
"ancestor_lsn": "0/0",
"latest_gc_cutoff_lsn": "0/1696070",
"initdb_lsn": "0/1696070",
"pg_version": 14
},
"deleted_at": "2023-07-31T09:00:00.123",
"archived_at": "2023-04-29T09:00:00.123"
}"#;
let expected = IndexPart {
version: 8,
layer_metadata: HashMap::from([
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata {
file_size: 25600000,
generation: Generation::none(),
shard: ShardIndex::unsharded()
}),
("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata {
file_size: 9007199254741001,
generation: Generation::none(),
shard: ShardIndex::unsharded()
})
]),
disk_consistent_lsn: "0/16960E8".parse::<Lsn>().unwrap(),
metadata: TimelineMetadata::new(
Lsn::from_str("0/16960E8").unwrap(),
Some(Lsn::from_str("0/1696070").unwrap()),
Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()),
Lsn::INVALID,
Lsn::from_str("0/1696070").unwrap(),
Lsn::from_str("0/1696070").unwrap(),
14,
).with_recalculated_checksum().unwrap(),
deleted_at: Some(parse_naive_datetime("2023-07-31T09:00:00.123000000")),
archived_at: Some(parse_naive_datetime("2023-04-29T09:00:00.123000000")),
lineage: Default::default(),
last_aux_file_policy: Default::default(),
};

View File

@@ -307,10 +307,12 @@ impl DeltaLayer {
.with_context(|| format!("Failed to load delta layer {}", self.path()))
}
async fn load_inner(&self, ctx: &RequestContext) -> anyhow::Result<Arc<DeltaLayerInner>> {
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
let path = self.path();
let loaded = DeltaLayerInner::load(&path, None, None, ctx).await?;
let loaded = DeltaLayerInner::load(&path, None, None, ctx)
.await
.and_then(|res| res)?;
// not production code
let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
@@ -467,7 +469,7 @@ impl DeltaLayerWriterInner {
.write_blob_maybe_compressed(val, ctx, compression)
.await;
let off = match res {
Ok((off, _)) => off,
Ok(off) => off,
Err(e) => return (val, Err(anyhow::anyhow!(e))),
};
@@ -758,24 +760,27 @@ impl DeltaLayerInner {
&self.layer_lsn_range
}
/// Returns nested result following Result<Result<_, OpErr>, Critical>:
/// - inner has the success or transient failure
/// - outer has the permanent failure
pub(super) async fn load(
path: &Utf8Path,
summary: Option<Summary>,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
.await
.context("open layer file")?;
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path, ctx).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader
.read_blk(0, ctx)
.await
.context("read first block")?;
let summary_blk = match block_reader.read_blk(0, ctx).await {
Ok(blk) => blk,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
};
// TODO: this should be an assertion instead; see ImageLayerInner::load
let actual_summary =
@@ -797,7 +802,7 @@ impl DeltaLayerInner {
}
}
Ok(DeltaLayerInner {
Ok(Ok(DeltaLayerInner {
file,
file_id,
index_start_blk: actual_summary.index_start_blk,
@@ -805,7 +810,7 @@ impl DeltaLayerInner {
max_vectored_read_bytes,
layer_key_range: actual_summary.key_range,
layer_lsn_range: actual_summary.lsn_range,
})
}))
}
pub(super) async fn get_value_reconstruct_data(

View File

@@ -265,8 +265,9 @@ impl ImageLayer {
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
let path = self.path();
let loaded =
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx).await?;
let loaded = ImageLayerInner::load(&path, self.desc.image_layer_lsn(), None, None, ctx)
.await
.and_then(|res| res)?;
// not production code
let actual_layer_name = LayerName::from_str(path.file_name().unwrap()).unwrap();
@@ -384,16 +385,17 @@ impl ImageLayerInner {
summary: Option<Summary>,
max_vectored_read_bytes: Option<MaxVectoredReadBytes>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path, ctx)
.await
.context("open layer file")?;
) -> Result<Result<Self, anyhow::Error>, anyhow::Error> {
let file = match VirtualFile::open(path, ctx).await {
Ok(file) => file,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("open layer file"))),
};
let file_id = page_cache::next_file_id();
let block_reader = FileBlockReader::new(&file, file_id);
let summary_blk = block_reader
.read_blk(0, ctx)
.await
.context("read first block")?;
let summary_blk = match block_reader.read_blk(0, ctx).await {
Ok(blk) => blk,
Err(e) => return Ok(Err(anyhow::Error::new(e).context("read first block"))),
};
// length is the only way how this could fail, so it's not actually likely at all unless
// read_blk returns wrong sized block.
@@ -418,7 +420,7 @@ impl ImageLayerInner {
}
}
Ok(ImageLayerInner {
Ok(Ok(ImageLayerInner {
index_start_blk: actual_summary.index_start_blk,
index_root_blk: actual_summary.index_root_blk,
lsn,
@@ -426,7 +428,7 @@ impl ImageLayerInner {
file_id,
max_vectored_read_bytes,
key_range: actual_summary.key_range,
})
}))
}
pub(super) async fn get_value_reconstruct_data(
@@ -734,14 +736,6 @@ struct ImageLayerWriterInner {
// Total uncompressed bytes passed into put_image
uncompressed_bytes: u64,
// Like `uncompressed_bytes`,
// but only of images we might consider for compression
uncompressed_bytes_eligible: u64,
// Like `uncompressed_bytes`, but only of images
// where we have chosen their compressed form
uncompressed_bytes_chosen: u64,
blob_writer: BlobWriter<false>,
tree: DiskBtreeBuilder<BlockBuf, KEY_SIZE>,
}
@@ -798,8 +792,6 @@ impl ImageLayerWriterInner {
tree: tree_builder,
blob_writer,
uncompressed_bytes: 0,
uncompressed_bytes_eligible: 0,
uncompressed_bytes_chosen: 0,
};
Ok(writer)
@@ -818,22 +810,13 @@ impl ImageLayerWriterInner {
) -> anyhow::Result<()> {
ensure!(self.key_range.contains(&key));
let compression = self.conf.image_compression;
let uncompressed_len = img.len() as u64;
self.uncompressed_bytes += uncompressed_len;
self.uncompressed_bytes += img.len() as u64;
let (_img, res) = self
.blob_writer
.write_blob_maybe_compressed(img, ctx, compression)
.await;
// TODO: re-use the buffer for `img` further upstack
let (off, compression_info) = res?;
if compression_info.compressed_size.is_some() {
// The image has been considered for compression at least
self.uncompressed_bytes_eligible += uncompressed_len;
}
if compression_info.written_compressed {
// The image has been compressed
self.uncompressed_bytes_chosen += uncompressed_len;
}
let off = res?;
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
@@ -856,9 +839,6 @@ impl ImageLayerWriterInner {
// Calculate compression ratio
let compressed_size = self.blob_writer.size() - PAGE_SZ as u64; // Subtract PAGE_SZ for header
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES.inc_by(self.uncompressed_bytes);
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CONSIDERED
.inc_by(self.uncompressed_bytes_eligible);
crate::metrics::COMPRESSION_IMAGE_INPUT_BYTES_CHOSEN.inc_by(self.uncompressed_bytes_chosen);
crate::metrics::COMPRESSION_IMAGE_OUTPUT_BYTES.inc_by(compressed_size);
let mut file = self.blob_writer.into_inner();

View File

@@ -1651,9 +1651,8 @@ impl Drop for DownloadedLayer {
}
impl DownloadedLayer {
/// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`].
/// Failure to load the layer is sticky, i.e., future `get()` calls will return
/// the initial load failure immediately.
/// Initializes the `DeltaLayerInner` or `ImageLayerInner` within [`LayerKind`], or fails to
/// initialize it permanently.
///
/// `owner` parameter is a strong reference at the same `LayerInner` as the
/// `DownloadedLayer::owner` would be when upgraded. Given how this method ends up called,
@@ -1684,7 +1683,7 @@ impl DownloadedLayer {
ctx,
)
.await
.map(LayerKind::Delta)
.map(|res| res.map(LayerKind::Delta))
} else {
let lsn = owner.desc.image_layer_lsn();
let summary = Some(image_layer::Summary::expected(
@@ -1701,29 +1700,32 @@ impl DownloadedLayer {
ctx,
)
.await
.map(LayerKind::Image)
.map(|res| res.map(LayerKind::Image))
};
match res {
Ok(layer) => Ok(layer),
Err(err) => {
Ok(Ok(layer)) => Ok(Ok(layer)),
Ok(Err(transient)) => Err(transient),
Err(permanent) => {
LAYER_IMPL_METRICS.inc_permanent_loading_failures();
// We log this message once over the lifetime of `Self`
// => Ok and good to log backtrace and path here.
tracing::error!(
"layer load failed, assuming permanent failure: {}: {err:?}",
owner.path
);
Err(err)
// TODO(#5815): we are not logging all errors, so temporarily log them **once**
// here as well
let permanent = permanent.context("load layer");
tracing::error!("layer loading failed permanently: {permanent:#}");
Ok(Err(permanent))
}
}
};
self.kind
.get_or_init(init)
.await
.get_or_try_init(init)
// return transient errors using `?`
.await?
.as_ref()
// We already logged the full backtrace above, once. Don't repeat that here.
.map_err(|e| anyhow::anyhow!("layer load failed earlier: {e}"))
.map_err(|e| {
// errors are not clonabled, cannot but stringify
// test_broken_timeline matches this string
anyhow::anyhow!("layer loading failed: {e:#}")
})
}
async fn get_value_reconstruct_data(
@@ -1758,11 +1760,7 @@ impl DownloadedLayer {
) -> Result<(), GetVectoredError> {
use LayerKind::*;
match self
.get(owner, ctx)
.await
.map_err(GetVectoredError::Other)?
{
match self.get(owner, ctx).await.map_err(GetVectoredError::from)? {
Delta(d) => {
d.get_values_reconstruct_data(keyspace, lsn_range, reconstruct_data, ctx)
.await

View File

@@ -210,28 +210,24 @@ async fn compaction_loop(tenant: Arc<Tenant>, cancel: CancellationToken) {
Duration::from_secs(10)
} else {
// Run compaction
match tenant.compaction_iteration(&cancel, &ctx).await {
Err(e) => {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
log_compaction_error(
&e,
error_run_count,
&wait_duration,
cancel.is_cancelled(),
);
wait_duration
}
Ok(has_pending_task) => {
error_run_count = 0;
// schedule the next compaction immediately in case there is a pending compaction task
if has_pending_task { Duration::from_secs(0) } else { period }
}
if let Err(e) = tenant.compaction_iteration(&cancel, &ctx).await {
let wait_duration = backoff::exponential_backoff_duration_seconds(
error_run_count + 1,
1.0,
MAX_BACKOFF_SECS,
);
error_run_count += 1;
let wait_duration = Duration::from_secs_f64(wait_duration);
log_compaction_error(
&e,
error_run_count,
&wait_duration,
cancel.is_cancelled(),
);
wait_duration
} else {
error_run_count = 0;
period
}
};

View File

@@ -1769,14 +1769,13 @@ impl Timeline {
}
}
/// Outermost timeline compaction operation; downloads needed layers. Returns whether we have pending
/// compaction tasks.
/// Outermost timeline compaction operation; downloads needed layers.
pub(crate) async fn compact(
self: &Arc<Self>,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
) -> Result<(), CompactionError> {
// most likely the cancellation token is from background task, but in tests it could be the
// request task as well.
@@ -1796,8 +1795,8 @@ impl Timeline {
// compaction task goes over it's period (20s) which is quite often in production.
let (_guard, _permit) = tokio::select! {
tuple = prepare => { tuple },
_ = self.cancel.cancelled() => return Ok(false),
_ = cancel.cancelled() => return Ok(false),
_ = self.cancel.cancelled() => return Ok(()),
_ = cancel.cancelled() => return Ok(()),
};
let last_record_lsn = self.get_last_record_lsn();
@@ -1805,14 +1804,11 @@ impl Timeline {
// Last record Lsn could be zero in case the timeline was just created
if !last_record_lsn.is_valid() {
warn!("Skipping compaction for potentially just initialized timeline, it has invalid last record lsn: {last_record_lsn}");
return Ok(false);
return Ok(());
}
match self.get_compaction_algorithm_settings().kind {
CompactionAlgorithm::Tiered => {
self.compact_tiered(cancel, ctx).await?;
Ok(false)
}
CompactionAlgorithm::Tiered => self.compact_tiered(cancel, ctx).await,
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await,
}
}
@@ -2001,11 +1997,6 @@ impl Timeline {
self.current_state() == TimelineState::Active
}
#[allow(unused)]
pub(crate) fn is_archived(&self) -> Option<bool> {
self.remote_client.is_archived()
}
pub(crate) fn is_stopping(&self) -> bool {
self.current_state() == TimelineState::Stopping
}

View File

@@ -102,19 +102,17 @@ impl KeyHistoryRetention {
impl Timeline {
/// TODO: cancellation
///
/// Returns whether the compaction has pending tasks.
pub(crate) async fn compact_legacy(
self: &Arc<Self>,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
) -> Result<(), CompactionError> {
if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
self.compact_with_gc(cancel, ctx)
return self
.compact_with_gc(cancel, ctx)
.await
.map_err(CompactionError::Other)?;
return Ok(false);
.map_err(CompactionError::Other);
}
// High level strategy for compaction / image creation:
@@ -162,7 +160,7 @@ impl Timeline {
// Define partitioning schema if needed
// FIXME: the match should only cover repartitioning, not the next steps
let (partition_count, has_pending_tasks) = match self
let partition_count = match self
.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
@@ -179,35 +177,30 @@ impl Timeline {
// 2. Compact
let timer = self.metrics.compact_time_histo.start_timer();
let fully_compacted = self.compact_level0(target_file_size, ctx).await?;
self.compact_level0(target_file_size, ctx).await?;
timer.stop_and_record();
// 3. Create new image layers for partitions that have been modified
// "enough".
let mut partitioning = dense_partitioning;
partitioning
.parts
.extend(sparse_partitioning.into_dense().parts);
let image_layers = self
.create_image_layers(
&partitioning,
lsn,
if flags.contains(CompactFlags::ForceImageLayerCreation) {
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
},
&image_ctx,
)
.await?;
// 3. Create new image layers for partitions that have been modified
// "enough". Skip image layer creation if L0 compaction cannot keep up.
if fully_compacted {
let image_layers = self
.create_image_layers(
&partitioning,
lsn,
if flags.contains(CompactFlags::ForceImageLayerCreation) {
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
},
&image_ctx,
)
.await?;
self.upload_new_image_layers(image_layers)?;
} else {
info!("skipping image layer generation due to L0 compaction did not include all layers.");
}
(partitioning.parts.len(), !fully_compacted)
self.upload_new_image_layers(image_layers)?;
partitioning.parts.len()
}
Err(err) => {
// no partitioning? This is normal, if the timeline was just created
@@ -219,7 +212,7 @@ impl Timeline {
if !self.cancel.is_cancelled() {
tracing::error!("could not compact, repartitioning keyspace failed: {err:?}");
}
(1, false)
1
}
};
@@ -232,7 +225,7 @@ impl Timeline {
self.compact_shard_ancestors(rewrite_max, ctx).await?;
}
Ok(has_pending_tasks)
Ok(())
}
/// Check for layers that are elegible to be rewritten:
@@ -439,16 +432,15 @@ impl Timeline {
}
/// Collect a bunch of Level 0 layer files, and compact and reshuffle them as
/// as Level 1 files. Returns whether the L0 layers are fully compacted.
/// as Level 1 files.
async fn compact_level0(
self: &Arc<Self>,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
) -> Result<(), CompactionError> {
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
fully_compacted,
} = {
let phase1_span = info_span!("compact_level0_phase1");
let ctx = ctx.attached_child();
@@ -471,12 +463,12 @@ impl Timeline {
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do
return Ok(true);
return Ok(());
}
self.finish_compact_batch(&new_layers, &Vec::new(), &deltas_to_compact)
.await?;
Ok(fully_compacted)
Ok(())
}
/// Level0 files first phase of compaction, explained in the [`Self::compact_legacy`] comment.
@@ -543,8 +535,6 @@ impl Timeline {
) as u64
* std::cmp::max(self.get_checkpoint_distance(), DEFAULT_CHECKPOINT_DISTANCE);
let mut fully_compacted = true;
deltas_to_compact.push(
first_level0_delta
.download_and_keep_resident()
@@ -572,7 +562,6 @@ impl Timeline {
"L0 compaction picker hit max delta layer size limit: {}",
delta_size_limit
);
fully_compacted = false;
// Proceed with compaction, but only a subset of L0s
break;
@@ -934,7 +923,6 @@ impl Timeline {
.into_iter()
.map(|x| x.drop_eviction_guard())
.collect::<Vec<_>>(),
fully_compacted,
})
}
}
@@ -943,9 +931,6 @@ impl Timeline {
struct CompactLevel0Phase1Result {
new_layers: Vec<ResidentLayer>,
deltas_to_compact: Vec<Layer>,
// Whether we have included all L0 layers, or selected only part of them due to the
// L0 compaction size limit.
fully_compacted: bool,
}
#[derive(Default)]

153
poetry.lock generated
View File

@@ -870,96 +870,6 @@ files = [
[package.dependencies]
colorama = {version = "*", markers = "platform_system == \"Windows\""}
[[package]]
name = "clickhouse-connect"
version = "0.7.17"
description = "ClickHouse Database Core Driver for Python, Pandas, and Superset"
optional = false
python-versions = "~=3.8"
files = [
{file = "clickhouse-connect-0.7.17.tar.gz", hash = "sha256:854f1f9f3e024e7f89ae5d57cd3289d7a4c3dc91a9f24c4d233014f0ea19cb2d"},
{file = "clickhouse_connect-0.7.17-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:aca36f5f28be1ada2981fce87724bbf451f267c918015baec59e527de3c9c882"},
{file = "clickhouse_connect-0.7.17-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:66209e4634f457604c263bea176336079d26c284e251e68a8435b0b80c1a25ff"},
{file = "clickhouse_connect-0.7.17-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e4d86c5a561a2a99321c8b4af22257461b8e67142f34cfea6e70f39b45b1f406"},
{file = "clickhouse_connect-0.7.17-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d200c9afa2725a96f9f3718221f641276b80c11bf504d8a2fbaafb5a05b2f0d3"},
{file = "clickhouse_connect-0.7.17-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:004d867b1005445a46e6742db1054bf2a717a451372663b46e09b5e9e90a31e3"},
{file = "clickhouse_connect-0.7.17-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:4ef94a4a8e008882259151833c3c47cfbb9c8f08de0f100aaf3b95c366dcfb24"},
{file = "clickhouse_connect-0.7.17-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:ee732c3df50c8b07d16b5836ff85e6b84569922455c03837c3add5cf1388fe1f"},
{file = "clickhouse_connect-0.7.17-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:d9dbe1235465bb946e24b90b0ca5b8800b5d645acb2d7d6ee819448c3e2fd959"},
{file = "clickhouse_connect-0.7.17-cp310-cp310-win32.whl", hash = "sha256:e5db0d68dfb63db0297d44dc91406bcfd7d333708d7cd55086c8550fbf870b78"},
{file = "clickhouse_connect-0.7.17-cp310-cp310-win_amd64.whl", hash = "sha256:800750f568c097ea312887785025006d6098bffd8ed2dd6a57048fb3ced6d778"},
{file = "clickhouse_connect-0.7.17-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:4eb390623b3d15dc9cda78f5c68f83ef9ad11743797e70af8fabc384b015a73c"},
{file = "clickhouse_connect-0.7.17-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:35f172ca950f218f63072024c81d5b4ff6e5399620c255506c321ccc7b17c9a5"},
{file = "clickhouse_connect-0.7.17-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ae7918f060f7576fc931c692e0122b1b07576fabd81444af22e1f8582300d200"},
{file = "clickhouse_connect-0.7.17-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ff2881b93c7a1afb9c99fb59ad5fd666850421325d0931e2b77f3f4ba872303d"},
{file = "clickhouse_connect-0.7.17-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:8a4d9b4f97271addf66aadbaf7f154f19a0ad6c22026d575a995c55ebd8576db"},
{file = "clickhouse_connect-0.7.17-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:e431469b1ff2d5c3e4c406d55c6afdf7102f5d2524c2ceb5481b94ac24412aa3"},
{file = "clickhouse_connect-0.7.17-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:2b6f80115176559f181a6b3ecad11aa3d70ef6014c3d2905b90fcef3f27d25c2"},
{file = "clickhouse_connect-0.7.17-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:d8ac694f40dfafc8a3cc877116b4bc73e8877ebf66d4d96ee092484ee4c0b481"},
{file = "clickhouse_connect-0.7.17-cp311-cp311-win32.whl", hash = "sha256:78b7a3f6b0fad4eaf8afb5f9a2e855bde53e82ea5804960e9cf779538f4606a1"},
{file = "clickhouse_connect-0.7.17-cp311-cp311-win_amd64.whl", hash = "sha256:efd390cc045334ecc3f2a9c18cc07c041d0288b145967805fdcab65abeefa75f"},
{file = "clickhouse_connect-0.7.17-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:9228334a17dc0a7842222f54ba5b89fc563532424aad4f66be799df70ab37e9f"},
{file = "clickhouse_connect-0.7.17-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:e432a42bb788bda77e88eda2774392a60fbbb5ee2a79cb2881d182d26c45fe49"},
{file = "clickhouse_connect-0.7.17-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c85152ed2879965ee1fa2bd5e31fb27d281fd5f50d6e86a401efd95cd85b29ef"},
{file = "clickhouse_connect-0.7.17-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:29a126104aa5e11df570cbd89fca4988784084602ba77d17b2396b334c54fd75"},
{file = "clickhouse_connect-0.7.17-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:882d8f9570549258e6eb6a97915fbf64ed29fe395d5e360866ea8d42c8283a35"},
{file = "clickhouse_connect-0.7.17-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:06ebf99111171442f462fb8b357364c3e276da3e8f8557b2e8fee9eb55ab37d1"},
{file = "clickhouse_connect-0.7.17-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:e0cf6f99b2777b0d164bf8b65ec39104cdc0789a56bcb52d98289bbd6f5cc70e"},
{file = "clickhouse_connect-0.7.17-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:ee46c508fddfff3b7ac52326788e0c6dd8dfb416b6d7e02e5d30e8110749dac2"},
{file = "clickhouse_connect-0.7.17-cp312-cp312-win32.whl", hash = "sha256:eb708b590a37d56b069a6088254ffa55d73b8cb65527339df81ef03fe67ffdec"},
{file = "clickhouse_connect-0.7.17-cp312-cp312-win_amd64.whl", hash = "sha256:17f00dccddaeaf43733faa1fa21f7d24641454a73669fda862545ba7c88627f5"},
{file = "clickhouse_connect-0.7.17-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:ab5d4b37a6dcc39e94c63beac0f22d9dda914f5eb865d166c64cf04dfadb7d16"},
{file = "clickhouse_connect-0.7.17-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:32aa90387f45f34cbc5a984789ed4c12760a3c0056c190ab0123ceafc36b1002"},
{file = "clickhouse_connect-0.7.17-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:21277b6bdd6c8ff14170bfcd52125c5c39f442ec4bafbb643ad7d0ca915f0029"},
{file = "clickhouse_connect-0.7.17-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ca68d8b7dee3fb4e7229e06152f5b0faaccafb4c87d9c2d48fa5bd117a3cc1c0"},
{file = "clickhouse_connect-0.7.17-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:841c56282102b2fba1e0b332bb1c7a0c50992fbc321746af8d3e0e6ca2450e8b"},
{file = "clickhouse_connect-0.7.17-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:8d7ffde5a4b95d8fe9ed38e08e504e497310e3d7a17691bd40bf65734648fdfc"},
{file = "clickhouse_connect-0.7.17-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:055960086b6b92b6e44f5ba04c81c40c10b038588e4b3908b033c99f66125332"},
{file = "clickhouse_connect-0.7.17-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:36491fec63ceb8503b6344c23477647030139f346b749dc5ee672c505939dbbe"},
{file = "clickhouse_connect-0.7.17-cp38-cp38-win32.whl", hash = "sha256:8779a907e026db32e6bc0bc0c8d5de0e2e3afd166afc2d4adcc0603399af5539"},
{file = "clickhouse_connect-0.7.17-cp38-cp38-win_amd64.whl", hash = "sha256:309854fa197885c6278438ddd032ab52e6fec56f162074e343c3635ca7266078"},
{file = "clickhouse_connect-0.7.17-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:e8009f94550178dc971aeb4f8787ba7a5b473c22647490428b7229f540a51d2b"},
{file = "clickhouse_connect-0.7.17-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:70f8422f407b13a404b3670fd097855abd5adaf890c710d6678d2b46ab61ac48"},
{file = "clickhouse_connect-0.7.17-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:082783eb1e8baf7b3465dd045132dc5cb5a91432c899dc4e19891c5f782d8d23"},
{file = "clickhouse_connect-0.7.17-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c1c30aad2a9c7584c4ee19e646a087b3bbd2d4daab3d88a2afeeae1a7f6febf9"},
{file = "clickhouse_connect-0.7.17-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fc8e245a9f4f0dce39f155e626405f60f1d3cf4d1e52dd2c793ea6b603ca111b"},
{file = "clickhouse_connect-0.7.17-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:802372cb8a69c9ffdf4260e9f01616c8601ba531825ed6f08834827e0b880cd1"},
{file = "clickhouse_connect-0.7.17-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:193a60271a3b105cdbde96fb20b40eab8a50fca3bb1f397546f7a18b53d9aa9c"},
{file = "clickhouse_connect-0.7.17-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:59d58932916792fdbd09cb961a245a0c2d87b07b8296f9138915b998f4522941"},
{file = "clickhouse_connect-0.7.17-cp39-cp39-win32.whl", hash = "sha256:3cfd0edabb589f640636a97ffc38d1b3d760faef208d44e50829cc1ad3f0d3e5"},
{file = "clickhouse_connect-0.7.17-cp39-cp39-win_amd64.whl", hash = "sha256:5661b4629aac228481219abf2e149119af1a71d897f191665e182d9d192d7033"},
{file = "clickhouse_connect-0.7.17-pp310-pypy310_pp73-macosx_10_9_x86_64.whl", hash = "sha256:7429d309109e7e4a70fd867d69fcfea9ddcb1a1e910caa6b0e2c3776b71f4613"},
{file = "clickhouse_connect-0.7.17-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e5ae619151006da84a0b1585a9bcc81be32459d8061aeb2e116bad5bbaa7d108"},
{file = "clickhouse_connect-0.7.17-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ec0c84a0880621cb2389656a89886ef3133f0b3f8dc016eee6f25bbb49ff6f70"},
{file = "clickhouse_connect-0.7.17-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:705464c23f821666b76f8f619cf2870225156276562756b3933aaa24708e0ff8"},
{file = "clickhouse_connect-0.7.17-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:1822016f4b769e89264fe26cefe0bc5e50e4c3ca0747d89bb52d57dc4f1e5ffb"},
{file = "clickhouse_connect-0.7.17-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:6c92b0c342c1fbfa666010e8175e05026dc570a7ef91d8fa81ce503180f318aa"},
{file = "clickhouse_connect-0.7.17-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d2e106536540e906c3c866f8615fcf870a9a77c1bfab9ef4b042febfd2fdb953"},
{file = "clickhouse_connect-0.7.17-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bac9a32e62384b4341ba51a451084eb3b00c6e59aaac1499145dd8b897cb585c"},
{file = "clickhouse_connect-0.7.17-pp38-pypy38_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0feed93b9912b7862a8c41be1febcd44b68a824a5c1059b19d5c567afdaa6273"},
{file = "clickhouse_connect-0.7.17-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:2e2dd6db52e799f065fd565143fde5a872cfe903de1bee7775bc3a349856a790"},
{file = "clickhouse_connect-0.7.17-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:ed13add5d579a5960155f3000420544368501c9703d2fb94f103b4a6126081f6"},
{file = "clickhouse_connect-0.7.17-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c257a23ed3bf1858593fb03927d9d073fbbdfa24dc2afee537c3314bd66b4e24"},
{file = "clickhouse_connect-0.7.17-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d47866f64cbdc2d5cc4f8a7a8c49e3ee90c9e487091b9eda7c3a3576418e1cbe"},
{file = "clickhouse_connect-0.7.17-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9b850e2f17e0a0b5a37d996d3fb728050227489d64d271d678d166abea94f26e"},
{file = "clickhouse_connect-0.7.17-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:349682288987dc84ac7695f7cd6b510be8d0ec0eee7c1b72dbf2146b4e9efdb8"},
]
[package.dependencies]
certifi = "*"
lz4 = "*"
pytz = "*"
urllib3 = ">=1.26"
zstandard = "*"
[package.extras]
arrow = ["pyarrow"]
numpy = ["numpy"]
orjson = ["orjson"]
pandas = ["pandas"]
sqlalchemy = ["sqlalchemy (>1.3.21,<2.0)"]
tzlocal = ["tzlocal (>=4.0)"]
[[package]]
name = "colorama"
version = "0.4.5"
@@ -1560,56 +1470,6 @@ files = [
{file = "lazy_object_proxy-1.10.0-pp310.pp311.pp312.pp38.pp39-none-any.whl", hash = "sha256:80fa48bd89c8f2f456fc0765c11c23bf5af827febacd2f523ca5bc1893fcc09d"},
]
[[package]]
name = "lz4"
version = "4.3.3"
description = "LZ4 Bindings for Python"
optional = false
python-versions = ">=3.8"
files = [
{file = "lz4-4.3.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b891880c187e96339474af2a3b2bfb11a8e4732ff5034be919aa9029484cd201"},
{file = "lz4-4.3.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:222a7e35137d7539c9c33bb53fcbb26510c5748779364014235afc62b0ec797f"},
{file = "lz4-4.3.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f76176492ff082657ada0d0f10c794b6da5800249ef1692b35cf49b1e93e8ef7"},
{file = "lz4-4.3.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1d18718f9d78182c6b60f568c9a9cec8a7204d7cb6fad4e511a2ef279e4cb05"},
{file = "lz4-4.3.3-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6cdc60e21ec70266947a48839b437d46025076eb4b12c76bd47f8e5eb8a75dcc"},
{file = "lz4-4.3.3-cp310-cp310-win32.whl", hash = "sha256:c81703b12475da73a5d66618856d04b1307e43428a7e59d98cfe5a5d608a74c6"},
{file = "lz4-4.3.3-cp310-cp310-win_amd64.whl", hash = "sha256:43cf03059c0f941b772c8aeb42a0813d68d7081c009542301637e5782f8a33e2"},
{file = "lz4-4.3.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:30e8c20b8857adef7be045c65f47ab1e2c4fabba86a9fa9a997d7674a31ea6b6"},
{file = "lz4-4.3.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:2f7b1839f795315e480fb87d9bc60b186a98e3e5d17203c6e757611ef7dcef61"},
{file = "lz4-4.3.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:edfd858985c23523f4e5a7526ca6ee65ff930207a7ec8a8f57a01eae506aaee7"},
{file = "lz4-4.3.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0e9c410b11a31dbdc94c05ac3c480cb4b222460faf9231f12538d0074e56c563"},
{file = "lz4-4.3.3-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d2507ee9c99dbddd191c86f0e0c8b724c76d26b0602db9ea23232304382e1f21"},
{file = "lz4-4.3.3-cp311-cp311-win32.whl", hash = "sha256:f180904f33bdd1e92967923a43c22899e303906d19b2cf8bb547db6653ea6e7d"},
{file = "lz4-4.3.3-cp311-cp311-win_amd64.whl", hash = "sha256:b14d948e6dce389f9a7afc666d60dd1e35fa2138a8ec5306d30cd2e30d36b40c"},
{file = "lz4-4.3.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:e36cd7b9d4d920d3bfc2369840da506fa68258f7bb176b8743189793c055e43d"},
{file = "lz4-4.3.3-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:31ea4be9d0059c00b2572d700bf2c1bc82f241f2c3282034a759c9a4d6ca4dc2"},
{file = "lz4-4.3.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33c9a6fd20767ccaf70649982f8f3eeb0884035c150c0b818ea660152cf3c809"},
{file = "lz4-4.3.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bca8fccc15e3add173da91be8f34121578dc777711ffd98d399be35487c934bf"},
{file = "lz4-4.3.3-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e7d84b479ddf39fe3ea05387f10b779155fc0990125f4fb35d636114e1c63a2e"},
{file = "lz4-4.3.3-cp312-cp312-win32.whl", hash = "sha256:337cb94488a1b060ef1685187d6ad4ba8bc61d26d631d7ba909ee984ea736be1"},
{file = "lz4-4.3.3-cp312-cp312-win_amd64.whl", hash = "sha256:5d35533bf2cee56f38ced91f766cd0038b6abf46f438a80d50c52750088be93f"},
{file = "lz4-4.3.3-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:363ab65bf31338eb364062a15f302fc0fab0a49426051429866d71c793c23394"},
{file = "lz4-4.3.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:0a136e44a16fc98b1abc404fbabf7f1fada2bdab6a7e970974fb81cf55b636d0"},
{file = "lz4-4.3.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:abc197e4aca8b63f5ae200af03eb95fb4b5055a8f990079b5bdf042f568469dd"},
{file = "lz4-4.3.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:56f4fe9c6327adb97406f27a66420b22ce02d71a5c365c48d6b656b4aaeb7775"},
{file = "lz4-4.3.3-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f0e822cd7644995d9ba248cb4b67859701748a93e2ab7fc9bc18c599a52e4604"},
{file = "lz4-4.3.3-cp38-cp38-win32.whl", hash = "sha256:24b3206de56b7a537eda3a8123c644a2b7bf111f0af53bc14bed90ce5562d1aa"},
{file = "lz4-4.3.3-cp38-cp38-win_amd64.whl", hash = "sha256:b47839b53956e2737229d70714f1d75f33e8ac26e52c267f0197b3189ca6de24"},
{file = "lz4-4.3.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:6756212507405f270b66b3ff7f564618de0606395c0fe10a7ae2ffcbbe0b1fba"},
{file = "lz4-4.3.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:ee9ff50557a942d187ec85462bb0960207e7ec5b19b3b48949263993771c6205"},
{file = "lz4-4.3.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2b901c7784caac9a1ded4555258207d9e9697e746cc8532129f150ffe1f6ba0d"},
{file = "lz4-4.3.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b6d9ec061b9eca86e4dcc003d93334b95d53909afd5a32c6e4f222157b50c071"},
{file = "lz4-4.3.3-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f4c7bf687303ca47d69f9f0133274958fd672efaa33fb5bcde467862d6c621f0"},
{file = "lz4-4.3.3-cp39-cp39-win32.whl", hash = "sha256:054b4631a355606e99a42396f5db4d22046a3397ffc3269a348ec41eaebd69d2"},
{file = "lz4-4.3.3-cp39-cp39-win_amd64.whl", hash = "sha256:eac9af361e0d98335a02ff12fb56caeb7ea1196cf1a49dbf6f17828a131da807"},
{file = "lz4-4.3.3.tar.gz", hash = "sha256:01fe674ef2889dbb9899d8a67361e0c4a2c833af5aeb37dd505727cf5d2a131e"},
]
[package.extras]
docs = ["sphinx (>=1.6.0)", "sphinx-bootstrap-theme"]
flake8 = ["flake8"]
tests = ["psutil", "pytest (!=3.3.0)", "pytest-cov"]
[[package]]
name = "markupsafe"
version = "2.1.1"
@@ -2501,17 +2361,6 @@ files = [
[package.dependencies]
six = ">=1.5"
[[package]]
name = "pytz"
version = "2024.1"
description = "World timezone definitions, modern and historical"
optional = false
python-versions = "*"
files = [
{file = "pytz-2024.1-py2.py3-none-any.whl", hash = "sha256:328171f4e3623139da4983451950b28e95ac706e13f3f2630a879749e7a8b319"},
{file = "pytz-2024.1.tar.gz", hash = "sha256:2a29735ea9c18baf14b448846bde5a48030ed267578472d8955cd0e7443a9812"},
]
[[package]]
name = "pywin32"
version = "301"
@@ -3357,4 +3206,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.9"
content-hash = "7cee6a8c30bc7f4bfb0a87c6bad3952dfb4da127fad853d2710a93ac3eab8a00"
content-hash = "16ebd6a46768be7f67dbdb4ee5903b167d94edc9965f29252f038c67e9e907b0"

View File

@@ -41,7 +41,6 @@ zstandard = "^0.21.0"
httpx = {extras = ["http2"], version = "^0.26.0"}
pytest-repeat = "^0.9.3"
websockets = "^12.0"
clickhouse-connect = "^0.7.16"
[tool.poetry.group.dev.dependencies]
mypy = "==1.3.0"

View File

@@ -143,12 +143,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
self.tenant_id.unwrap_or(TenantId::from([0u8; 16])),
self.timeline_id.unwrap_or(TimelineId::from([0u8; 16])),
);
tracing::Span::current()
.record("ttid", tracing::field::display(ttid))
.record(
"application_name",
tracing::field::debug(self.appname.clone()),
);
tracing::Span::current().record("ttid", tracing::field::display(ttid));
Ok(())
} else {

View File

@@ -43,7 +43,7 @@ pub async fn task_main(
error!("connection handler exited: {}", err);
}
}
.instrument(info_span!("", cid = %conn_id, ttid = field::Empty, application_name = field::Empty)),
.instrument(info_span!("", cid = %conn_id, ttid = field::Empty)),
);
}
}

View File

@@ -18,7 +18,6 @@ anyhow.workspace = true
aws-config.workspace = true
bytes.workspace = true
camino.workspace = true
chrono.workspace = true
clap.workspace = true
fail.workspace = true
futures.workspace = true
@@ -45,12 +44,7 @@ scopeguard.workspace = true
strum.workspace = true
strum_macros.workspace = true
diesel = { version = "2.1.4", features = [
"serde_json",
"postgres",
"r2d2",
"chrono",
] }
diesel = { version = "2.1.4", features = ["serde_json", "postgres", "r2d2"] }
diesel_migrations = { version = "2.1.0" }
r2d2 = { version = "0.8.10" }
@@ -58,3 +52,4 @@ utils = { path = "../libs/utils/" }
metrics = { path = "../libs/metrics/" }
control_plane = { path = "../control_plane" }
workspace_hack = { version = "0.1", path = "../workspace_hack" }

View File

@@ -1 +0,0 @@
DROP TABLE metadata_health;

View File

@@ -1,14 +0,0 @@
CREATE TABLE metadata_health (
tenant_id VARCHAR NOT NULL,
shard_number INTEGER NOT NULL,
shard_count INTEGER NOT NULL,
PRIMARY KEY(tenant_id, shard_number, shard_count),
-- Rely on cascade behavior for delete
FOREIGN KEY(tenant_id, shard_number, shard_count) REFERENCES tenant_shards ON DELETE CASCADE,
healthy BOOLEAN NOT NULL DEFAULT TRUE,
last_scrubbed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
INSERT INTO metadata_health(tenant_id, shard_number, shard_count)
SELECT tenant_id, shard_number, shard_count FROM tenant_shards;

View File

@@ -3,18 +3,14 @@ use crate::metrics::{
METRICS_REGISTRY,
};
use crate::reconciler::ReconcileError;
use crate::service::{LeadershipStatus, Service, STARTUP_RECONCILE_TIMEOUT};
use crate::service::{Service, STARTUP_RECONCILE_TIMEOUT};
use anyhow::Context;
use futures::Future;
use hyper::header::CONTENT_TYPE;
use hyper::{Body, Request, Response};
use hyper::{StatusCode, Uri};
use metrics::{BuildInfo, NeonMetrics};
use pageserver_api::controller_api::{
MetadataHealthListOutdatedRequest, MetadataHealthListOutdatedResponse,
MetadataHealthListUnhealthyResponse, MetadataHealthUpdateRequest, MetadataHealthUpdateResponse,
TenantCreateRequest,
};
use pageserver_api::controller_api::TenantCreateRequest;
use pageserver_api::models::{
TenantConfigRequest, TenantLocationConfigRequest, TenantShardSplitRequest,
TenantTimeTravelRequest, TimelineCreateRequest,
@@ -564,51 +560,6 @@ async fn handle_cancel_node_fill(req: Request<Body>) -> Result<Response<Body>, A
json_response(StatusCode::ACCEPTED, ())
}
async fn handle_metadata_health_update(mut req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Scrubber)?;
let update_req = json_request::<MetadataHealthUpdateRequest>(&mut req).await?;
let state = get_state(&req);
state.service.metadata_health_update(update_req).await?;
json_response(StatusCode::OK, MetadataHealthUpdateResponse {})
}
async fn handle_metadata_health_list_unhealthy(
req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
let unhealthy_tenant_shards = state.service.metadata_health_list_unhealthy().await?;
json_response(
StatusCode::OK,
MetadataHealthListUnhealthyResponse {
unhealthy_tenant_shards,
},
)
}
async fn handle_metadata_health_list_outdated(
mut req: Request<Body>,
) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let list_outdated_req = json_request::<MetadataHealthListOutdatedRequest>(&mut req).await?;
let state = get_state(&req);
let health_records = state
.service
.metadata_health_list_outdated(list_outdated_req.not_scrubbed_for)
.await?;
json_response(
StatusCode::OK,
MetadataHealthListOutdatedResponse { health_records },
)
}
async fn handle_tenant_shard_split(
service: Arc<Service>,
mut req: Request<Body>,
@@ -656,13 +607,6 @@ async fn handle_tenant_update_policy(mut req: Request<Body>) -> Result<Response<
)
}
async fn handle_step_down(req: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permissions(&req, Scope::Admin)?;
let state = get_state(&req);
json_response(StatusCode::OK, state.service.step_down().await)
}
async fn handle_tenant_drop(req: Request<Body>) -> Result<Response<Body>, ApiError> {
let tenant_id: TenantId = parse_request_param(&req, "tenant_id")?;
check_permissions(&req, Scope::PageServerApi)?;
@@ -790,47 +734,6 @@ struct RequestMeta {
at: Instant,
}
pub fn prologue_leadership_status_check_middleware<
B: hyper::body::HttpBody + Send + Sync + 'static,
>() -> Middleware<B, ApiError> {
Middleware::pre(move |req| async move {
let state = get_state(&req);
let leadership_status = state.service.get_leadership_status();
enum AllowedRoutes<'a> {
All,
Some(Vec<&'a str>),
}
let allowed_routes = match leadership_status {
LeadershipStatus::Leader => AllowedRoutes::All,
LeadershipStatus::SteppedDown => {
// TODO: does it make sense to allow /status here?
AllowedRoutes::Some(["/control/v1/step_down", "/status", "/metrics"].to_vec())
}
LeadershipStatus::Candidate => {
AllowedRoutes::Some(["/ready", "/status", "/metrics"].to_vec())
}
};
let uri = req.uri().to_string();
match allowed_routes {
AllowedRoutes::All => Ok(req),
AllowedRoutes::Some(allowed) if allowed.contains(&uri.as_str()) => Ok(req),
_ => {
tracing::info!(
"Request {} not allowed due to current leadership state",
req.uri()
);
Err(ApiError::ResourceUnavailable(
format!("Current leadership status is {leadership_status}").into(),
))
}
}
})
}
fn prologue_metrics_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
) -> Middleware<B, ApiError> {
Middleware::pre(move |req| async move {
@@ -917,7 +820,6 @@ pub fn make_router(
build_info: BuildInfo,
) -> RouterBuilder<hyper::Body, ApiError> {
let mut router = endpoint::make_router()
.middleware(prologue_leadership_status_check_middleware())
.middleware(prologue_metrics_middleware())
.middleware(epilogue_metrics_middleware());
if auth.is_some() {
@@ -1036,28 +938,6 @@ pub fn make_router(
RequestName("control_v1_cancel_node_fill"),
)
})
// Metadata health operations
.post("/control/v1/metadata_health/update", |r| {
named_request_span(
r,
handle_metadata_health_update,
RequestName("control_v1_metadata_health_update"),
)
})
.get("/control/v1/metadata_health/unhealthy", |r| {
named_request_span(
r,
handle_metadata_health_list_unhealthy,
RequestName("control_v1_metadata_health_list_unhealthy"),
)
})
.post("/control/v1/metadata_health/outdated", |r| {
named_request_span(
r,
handle_metadata_health_list_outdated,
RequestName("control_v1_metadata_health_list_outdated"),
)
})
// TODO(vlad): endpoint for cancelling drain and fill
// Tenant Shard operations
.put("/control/v1/tenant/:tenant_shard_id/migrate", |r| {
@@ -1091,9 +971,6 @@ pub fn make_router(
RequestName("control_v1_tenant_policy"),
)
})
.put("/control/v1/step_down", |r| {
named_request_span(r, handle_step_down, RequestName("control_v1_step_down"))
})
// Tenant operations
// The ^/v1/ endpoints act as a "Virtual Pageserver", enabling shard-naive clients to call into
// this service to manage tenants that actually consist of many tenant shards, as if they are a single entity.

View File

@@ -13,10 +13,7 @@ use metrics::NeonMetrics;
use once_cell::sync::Lazy;
use std::sync::Mutex;
use crate::{
persistence::{DatabaseError, DatabaseOperation},
service::LeadershipStatus,
};
use crate::persistence::{DatabaseError, DatabaseOperation};
pub(crate) static METRICS_REGISTRY: Lazy<StorageControllerMetrics> =
Lazy::new(StorageControllerMetrics::default);
@@ -84,8 +81,6 @@ pub(crate) struct StorageControllerMetricGroup {
#[metric(metadata = histogram::Thresholds::exponential_buckets(0.1, 2.0))]
pub(crate) storage_controller_database_query_latency:
measured::HistogramVec<DatabaseQueryLatencyLabelGroupSet, 5>,
pub(crate) storage_controller_leadership_status: measured::GaugeVec<LeadershipStatusGroupSet>,
}
impl StorageControllerMetrics {
@@ -161,12 +156,6 @@ pub(crate) struct DatabaseQueryLatencyLabelGroup {
pub(crate) operation: DatabaseOperation,
}
#[derive(measured::LabelGroup)]
#[label(set = LeadershipStatusGroupSet)]
pub(crate) struct LeadershipStatusGroup {
pub(crate) status: LeadershipStatus,
}
#[derive(FixedCardinalityLabel, Clone, Copy)]
pub(crate) enum ReconcileOutcome {
#[label(rename = "ok")]

View File

@@ -8,7 +8,6 @@ use self::split_state::SplitState;
use diesel::pg::PgConnection;
use diesel::prelude::*;
use diesel::Connection;
use pageserver_api::controller_api::MetadataHealthRecord;
use pageserver_api::controller_api::ShardSchedulingPolicy;
use pageserver_api::controller_api::{NodeSchedulingPolicy, PlacementPolicy};
use pageserver_api::models::TenantConfig;
@@ -91,10 +90,6 @@ pub(crate) enum DatabaseOperation {
UpdateTenantShard,
DeleteTenant,
UpdateTenantConfig,
UpdateMetadataHealth,
ListMetadataHealth,
ListMetadataHealthUnhealthy,
ListMetadataHealthOutdated,
}
#[must_use]
@@ -312,32 +307,15 @@ impl Persistence {
&self,
shards: Vec<TenantShardPersistence>,
) -> DatabaseResult<()> {
use crate::schema::metadata_health;
use crate::schema::tenant_shards;
let now = chrono::Utc::now();
let metadata_health_records = shards
.iter()
.map(|t| MetadataHealthPersistence {
tenant_id: t.tenant_id.clone(),
shard_number: t.shard_number,
shard_count: t.shard_count,
healthy: true,
last_scrubbed_at: now,
})
.collect::<Vec<_>>();
use crate::schema::tenant_shards::dsl::*;
self.with_measured_conn(
DatabaseOperation::InsertTenantShards,
move |conn| -> DatabaseResult<()> {
diesel::insert_into(tenant_shards::table)
.values(&shards)
.execute(conn)?;
diesel::insert_into(metadata_health::table)
.values(&metadata_health_records)
.execute(conn)?;
for tenant in &shards {
diesel::insert_into(tenant_shards)
.values(tenant)
.execute(conn)?;
}
Ok(())
},
)
@@ -351,10 +329,10 @@ impl Persistence {
self.with_measured_conn(
DatabaseOperation::DeleteTenant,
move |conn| -> DatabaseResult<()> {
// `metadata_health` status (if exists) is also deleted based on the cascade behavior.
diesel::delete(tenant_shards)
.filter(tenant_id.eq(del_tenant_id.to_string()))
.execute(conn)?;
Ok(())
},
)
@@ -697,94 +675,6 @@ impl Persistence {
)
.await
}
/// Stores all the latest metadata health updates durably. Updates existing entry on conflict.
///
/// **Correctness:** `metadata_health_updates` should all belong the tenant shards managed by the storage controller.
#[allow(dead_code)]
pub(crate) async fn update_metadata_health_records(
&self,
healthy_records: Vec<MetadataHealthPersistence>,
unhealthy_records: Vec<MetadataHealthPersistence>,
now: chrono::DateTime<chrono::Utc>,
) -> DatabaseResult<()> {
use crate::schema::metadata_health::dsl::*;
self.with_measured_conn(
DatabaseOperation::UpdateMetadataHealth,
move |conn| -> DatabaseResult<_> {
diesel::insert_into(metadata_health)
.values(&healthy_records)
.on_conflict((tenant_id, shard_number, shard_count))
.do_update()
.set((healthy.eq(true), last_scrubbed_at.eq(now)))
.execute(conn)?;
diesel::insert_into(metadata_health)
.values(&unhealthy_records)
.on_conflict((tenant_id, shard_number, shard_count))
.do_update()
.set((healthy.eq(false), last_scrubbed_at.eq(now)))
.execute(conn)?;
Ok(())
},
)
.await
}
/// Lists all the metadata health records.
#[allow(dead_code)]
pub(crate) async fn list_metadata_health_records(
&self,
) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
self.with_measured_conn(
DatabaseOperation::ListMetadataHealth,
move |conn| -> DatabaseResult<_> {
Ok(
crate::schema::metadata_health::table
.load::<MetadataHealthPersistence>(conn)?,
)
},
)
.await
}
/// Lists all the metadata health records that is unhealthy.
#[allow(dead_code)]
pub(crate) async fn list_unhealthy_metadata_health_records(
&self,
) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
use crate::schema::metadata_health::dsl::*;
self.with_measured_conn(
DatabaseOperation::ListMetadataHealthUnhealthy,
move |conn| -> DatabaseResult<_> {
Ok(crate::schema::metadata_health::table
.filter(healthy.eq(false))
.load::<MetadataHealthPersistence>(conn)?)
},
)
.await
}
/// Lists all the metadata health records that have not been updated since an `earlier` time.
#[allow(dead_code)]
pub(crate) async fn list_outdated_metadata_health_records(
&self,
earlier: chrono::DateTime<chrono::Utc>,
) -> DatabaseResult<Vec<MetadataHealthPersistence>> {
use crate::schema::metadata_health::dsl::*;
self.with_measured_conn(
DatabaseOperation::ListMetadataHealthOutdated,
move |conn| -> DatabaseResult<_> {
let query = metadata_health.filter(last_scrubbed_at.lt(earlier));
let res = query.load::<MetadataHealthPersistence>(conn)?;
Ok(res)
},
)
.await
}
}
/// Parts of [`crate::tenant_shard::TenantShard`] that are stored durably
@@ -854,59 +744,3 @@ pub(crate) struct NodePersistence {
pub(crate) listen_pg_addr: String,
pub(crate) listen_pg_port: i32,
}
/// Tenant metadata health status that are stored durably.
#[derive(Queryable, Selectable, Insertable, Serialize, Deserialize, Clone, Eq, PartialEq)]
#[diesel(table_name = crate::schema::metadata_health)]
pub(crate) struct MetadataHealthPersistence {
#[serde(default)]
pub(crate) tenant_id: String,
#[serde(default)]
pub(crate) shard_number: i32,
#[serde(default)]
pub(crate) shard_count: i32,
pub(crate) healthy: bool,
pub(crate) last_scrubbed_at: chrono::DateTime<chrono::Utc>,
}
impl MetadataHealthPersistence {
pub fn new(
tenant_shard_id: TenantShardId,
healthy: bool,
last_scrubbed_at: chrono::DateTime<chrono::Utc>,
) -> Self {
let tenant_id = tenant_shard_id.tenant_id.to_string();
let shard_number = tenant_shard_id.shard_number.0 as i32;
let shard_count = tenant_shard_id.shard_count.literal() as i32;
MetadataHealthPersistence {
tenant_id,
shard_number,
shard_count,
healthy,
last_scrubbed_at,
}
}
#[allow(dead_code)]
pub(crate) fn get_tenant_shard_id(&self) -> Result<TenantShardId, hex::FromHexError> {
Ok(TenantShardId {
tenant_id: TenantId::from_str(self.tenant_id.as_str())?,
shard_number: ShardNumber(self.shard_number as u8),
shard_count: ShardCount::new(self.shard_count as u8),
})
}
}
impl From<MetadataHealthPersistence> for MetadataHealthRecord {
fn from(value: MetadataHealthPersistence) -> Self {
MetadataHealthRecord {
tenant_shard_id: value
.get_tenant_shard_id()
.expect("stored tenant id should be valid"),
healthy: value.healthy,
last_scrubbed_at: value.last_scrubbed_at,
}
}
}

View File

@@ -12,7 +12,6 @@ use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use utils::failpoint_support;
use utils::generation::Generation;
use utils::id::{NodeId, TimelineId};
use utils::lsn::Lsn;
@@ -750,8 +749,6 @@ impl Reconciler {
self.location_config(&node, conf, None, false).await?;
}
failpoint_support::sleep_millis_async!("sleep-on-reconcile-epilogue");
Ok(())
}

View File

@@ -1,15 +1,5 @@
// @generated automatically by Diesel CLI.
diesel::table! {
metadata_health (tenant_id, shard_number, shard_count) {
tenant_id -> Varchar,
shard_number -> Int4,
shard_count -> Int4,
healthy -> Bool,
last_scrubbed_at -> Timestamptz,
}
}
diesel::table! {
nodes (node_id) {
node_id -> Int8,
@@ -36,4 +26,4 @@ diesel::table! {
}
}
diesel::allow_tables_to_appear_in_same_query!(metadata_health, nodes, tenant_shards,);
diesel::allow_tables_to_appear_in_same_query!(nodes, tenant_shards,);

View File

@@ -15,8 +15,7 @@ use crate::{
},
compute_hook::NotifyError,
id_lock_map::{trace_exclusive_lock, trace_shared_lock, IdLockMap, TracingExclusiveGuard},
metrics::LeadershipStatusGroup,
persistence::{AbortShardSplitStatus, MetadataHealthPersistence, TenantFilter},
persistence::{AbortShardSplitStatus, TenantFilter},
reconciler::{ReconcileError, ReconcileUnits},
scheduler::{MaySchedule, ScheduleContext, ScheduleMode},
tenant_shard::{
@@ -33,11 +32,11 @@ use futures::{stream::FuturesUnordered, StreamExt};
use itertools::Itertools;
use pageserver_api::{
controller_api::{
MetadataHealthRecord, MetadataHealthUpdateRequest, NodeAvailability, NodeRegisterRequest,
NodeSchedulingPolicy, PlacementPolicy, ShardSchedulingPolicy, TenantCreateRequest,
TenantCreateResponse, TenantCreateResponseShard, TenantDescribeResponse,
TenantDescribeResponseShard, TenantLocateResponse, TenantPolicyRequest,
TenantShardMigrateRequest, TenantShardMigrateResponse, UtilizationScore,
NodeAvailability, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
ShardSchedulingPolicy, TenantCreateRequest, TenantCreateResponse,
TenantCreateResponseShard, TenantDescribeResponse, TenantDescribeResponseShard,
TenantLocateResponse, TenantPolicyRequest, TenantShardMigrateRequest,
TenantShardMigrateResponse, UtilizationScore,
},
models::{SecondaryProgress, TenantConfigRequest, TopTenantShardsRequest},
};
@@ -82,7 +81,6 @@ use crate::{
ReconcilerWaiter, TenantShard,
},
};
use serde::{Deserialize, Serialize};
// For operations that should be quick, like attaching a new tenant
const SHORT_RECONCILE_TIMEOUT: Duration = Duration::from_secs(5);
@@ -133,24 +131,6 @@ enum NodeOperations {
Delete,
}
/// The leadership status for the storage controller process.
/// Allowed transitions are:
/// 1. Leader -> SteppedDown
/// 2. Candidate -> Leader
#[derive(Copy, Clone, strum_macros::Display, measured::FixedCardinalityLabel)]
#[strum(serialize_all = "snake_case")]
pub(crate) enum LeadershipStatus {
/// This is the steady state where the storage controller can produce
/// side effects in the cluster.
Leader,
/// We've been notified to step down by another candidate. No reconciliations
/// take place in this state.
SteppedDown,
/// Initial state for a new storage controller instance. Will attempt to assume leadership.
#[allow(unused)]
Candidate,
}
pub const RECONCILER_CONCURRENCY_DEFAULT: usize = 128;
// Depth of the channel used to enqueue shards for reconciliation when they can't do it immediately.
@@ -160,8 +140,6 @@ const MAX_DELAYED_RECONCILES: usize = 10000;
// Top level state available to all HTTP handlers
struct ServiceState {
leadership_status: LeadershipStatus,
tenants: BTreeMap<TenantShardId, TenantShard>,
nodes: Arc<HashMap<NodeId, Node>>,
@@ -224,21 +202,7 @@ impl ServiceState {
scheduler: Scheduler,
delayed_reconcile_rx: tokio::sync::mpsc::Receiver<TenantShardId>,
) -> Self {
let status = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_leadership_status;
status.set(
LeadershipStatusGroup {
status: LeadershipStatus::Leader,
},
1,
);
Self {
// TODO: Starting up as Leader is a transient state. Once we enable rolling
// upgrades on the k8s side, we should start up as Candidate.
leadership_status: LeadershipStatus::Leader,
tenants,
nodes: Arc::new(nodes),
scheduler,
@@ -256,37 +220,6 @@ impl ServiceState {
) {
(&mut self.nodes, &mut self.tenants, &mut self.scheduler)
}
fn get_leadership_status(&self) -> LeadershipStatus {
self.leadership_status
}
fn step_down(&mut self) {
self.leadership_status = LeadershipStatus::SteppedDown;
let status = &crate::metrics::METRICS_REGISTRY
.metrics_group
.storage_controller_leadership_status;
status.set(
LeadershipStatusGroup {
status: LeadershipStatus::SteppedDown,
},
1,
);
status.set(
LeadershipStatusGroup {
status: LeadershipStatus::Leader,
},
0,
);
status.set(
LeadershipStatusGroup {
status: LeadershipStatus::Candidate,
},
0,
);
}
}
#[derive(Clone)]
@@ -470,30 +403,11 @@ struct ShardUpdate {
generation: Option<Generation>,
}
enum StopReconciliationsReason {
ShuttingDown,
SteppingDown,
}
impl std::fmt::Display for StopReconciliationsReason {
fn fmt(&self, writer: &mut std::fmt::Formatter) -> std::fmt::Result {
let s = match self {
Self::ShuttingDown => "Shutting down",
Self::SteppingDown => "Stepping down",
};
write!(writer, "{}", s)
}
}
pub(crate) enum ReconcileResultRequest {
ReconcileResult(ReconcileResult),
Stop,
}
// TODO: move this into the storcon peer client when that gets added
#[derive(Serialize, Deserialize, Debug, Default)]
pub(crate) struct GlobalObservedState(HashMap<TenantShardId, ObservedState>);
impl Service {
pub fn get_config(&self) -> &Config {
&self.config
@@ -5689,22 +5603,17 @@ impl Service {
Ok(std::cmp::max(waiter_count, reconciles_spawned))
}
async fn stop_reconciliations(&self, reason: StopReconciliationsReason) {
pub async fn shutdown(&self) {
// Cancel all on-going reconciles and wait for them to exit the gate.
tracing::info!("{reason}: cancelling and waiting for in-flight reconciles");
tracing::info!("Shutting down: cancelling and waiting for in-flight reconciles");
self.reconcilers_cancel.cancel();
self.reconcilers_gate.close().await;
// Signal the background loop in [`Service::process_results`] to exit once
// it has proccessed the results from all the reconciles we cancelled earlier.
tracing::info!("{reason}: processing results from previously in-flight reconciles");
tracing::info!("Shutting down: processing results from previously in-flight reconciles");
self.result_tx.send(ReconcileResultRequest::Stop).ok();
self.result_tx.closed().await;
}
pub async fn shutdown(&self) {
self.stop_reconciliations(StopReconciliationsReason::ShuttingDown)
.await;
// Background tasks hold gate guards: this notifies them of the cancellation and
// waits for them all to complete.
@@ -6094,89 +6003,4 @@ impl Service {
Ok(())
}
/// Updates scrubber metadata health check results.
pub(crate) async fn metadata_health_update(
&self,
update_req: MetadataHealthUpdateRequest,
) -> Result<(), ApiError> {
let now = chrono::offset::Utc::now();
let (healthy_records, unhealthy_records) = {
let locked = self.inner.read().unwrap();
let healthy_records = update_req
.healthy_tenant_shards
.into_iter()
// Retain only health records associated with tenant shards managed by storage controller.
.filter(|tenant_shard_id| locked.tenants.contains_key(tenant_shard_id))
.map(|tenant_shard_id| MetadataHealthPersistence::new(tenant_shard_id, true, now))
.collect();
let unhealthy_records = update_req
.unhealthy_tenant_shards
.into_iter()
.filter(|tenant_shard_id| locked.tenants.contains_key(tenant_shard_id))
.map(|tenant_shard_id| MetadataHealthPersistence::new(tenant_shard_id, false, now))
.collect();
(healthy_records, unhealthy_records)
};
self.persistence
.update_metadata_health_records(healthy_records, unhealthy_records, now)
.await?;
Ok(())
}
/// Lists the tenant shards that has unhealthy metadata status.
pub(crate) async fn metadata_health_list_unhealthy(
&self,
) -> Result<Vec<TenantShardId>, ApiError> {
let result = self
.persistence
.list_unhealthy_metadata_health_records()
.await?
.iter()
.map(|p| p.get_tenant_shard_id().unwrap())
.collect();
Ok(result)
}
/// Lists the tenant shards that have not been scrubbed for some duration.
pub(crate) async fn metadata_health_list_outdated(
&self,
not_scrubbed_for: Duration,
) -> Result<Vec<MetadataHealthRecord>, ApiError> {
let earlier = chrono::offset::Utc::now() - not_scrubbed_for;
let result = self
.persistence
.list_outdated_metadata_health_records(earlier)
.await?
.into_iter()
.map(|record| record.into())
.collect();
Ok(result)
}
pub(crate) fn get_leadership_status(&self) -> LeadershipStatus {
self.inner.read().unwrap().get_leadership_status()
}
pub(crate) async fn step_down(&self) -> GlobalObservedState {
tracing::info!("Received step down request from peer");
self.inner.write().unwrap().step_down();
// TODO: would it make sense to have a time-out for this?
self.stop_reconciliations(StopReconciliationsReason::SteppingDown)
.await;
let mut global_observed = GlobalObservedState::default();
let locked = self.inner.read().unwrap();
for (tid, tenant_shard) in locked.tenants.iter() {
global_observed
.0
.insert(*tid, tenant_shard.observed.clone());
}
global_observed
}
}

View File

@@ -18,7 +18,7 @@ use pageserver_api::{
models::{LocationConfig, LocationConfigMode, TenantConfig},
shard::{ShardIdentity, TenantShardId},
};
use serde::{Deserialize, Serialize};
use serde::Serialize;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{instrument, Instrument};
@@ -284,7 +284,7 @@ impl Drop for IntentState {
}
}
#[derive(Default, Clone, Serialize, Deserialize, Debug)]
#[derive(Default, Clone, Serialize)]
pub(crate) struct ObservedState {
pub(crate) locations: HashMap<NodeId, ObservedStateLocation>,
}
@@ -298,7 +298,7 @@ pub(crate) struct ObservedState {
/// what it is (e.g. we failed partway through configuring it)
/// * Instance exists with conf==Some: this tells us what we last successfully configured on this node,
/// and that configuration will still be present unless something external interfered.
#[derive(Clone, Serialize, Deserialize, Debug)]
#[derive(Clone, Serialize)]
pub(crate) struct ObservedStateLocation {
/// If None, it means we do not know the status of this shard's location on this node, but
/// we know that we might have some state on this node.

View File

@@ -87,8 +87,7 @@ pub(crate) async fn branch_cleanup_and_check_errors(
.push(format!("index_part.json version: {}", index_part.version()))
}
let mut newest_versions = IndexPart::KNOWN_VERSIONS.iter().rev().take(2);
if !newest_versions.any(|ip| ip == &index_part.version()) {
if &index_part.version() != IndexPart::KNOWN_VERSIONS.last().unwrap() {
info!(
"index_part.json version is not latest: {}",
index_part.version()

View File

@@ -1,13 +1,10 @@
use std::pin::pin;
use futures::{StreamExt, TryStreamExt};
use pageserver::tenant::storage_layer::LayerName;
use remote_storage::ListingMode;
use serde::{Deserialize, Serialize};
use crate::{
checks::parse_layer_object_name, init_remote_generic, metadata_stream::stream_tenants_generic,
stream_objects_with_retries, BucketConfig, NodeKind,
checks::parse_layer_object_name, init_remote, list_objects_with_retries,
metadata_stream::stream_tenants, BucketConfig, NodeKind,
};
#[derive(Serialize, Deserialize, Clone, Copy, PartialEq, Eq)]
@@ -50,38 +47,45 @@ pub async fn find_large_objects(
ignore_deltas: bool,
concurrency: usize,
) -> anyhow::Result<LargeObjectListing> {
let (remote_client, target) =
init_remote_generic(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = pin!(stream_tenants_generic(&remote_client, &target));
let (s3_client, target) = init_remote(bucket_config.clone(), NodeKind::Pageserver).await?;
let tenants = std::pin::pin!(stream_tenants(&s3_client, &target));
let objects_stream = tenants.map_ok(|tenant_shard_id| {
let mut tenant_root = target.tenant_root(&tenant_shard_id);
let remote_client = remote_client.clone();
let s3_client = s3_client.clone();
async move {
let mut objects = Vec::new();
let mut total_objects_ctr = 0u64;
// We want the objects and not just common prefixes
tenant_root.delimiter.clear();
let mut objects_stream = pin!(stream_objects_with_retries(
&remote_client,
ListingMode::NoDelimiter,
&tenant_root
));
while let Some(listing) = objects_stream.next().await {
let listing = listing?;
for obj in listing.keys.iter().filter(|obj| min_size <= obj.size) {
let key = obj.key.to_string();
let mut continuation_token = None;
loop {
let fetch_response =
list_objects_with_retries(&s3_client, &tenant_root, continuation_token.clone())
.await?;
for obj in fetch_response.contents().iter().filter(|o| {
if let Some(obj_size) = o.size {
min_size as i64 <= obj_size
} else {
false
}
}) {
let key = obj.key().expect("couldn't get key").to_owned();
let kind = LargeObjectKind::from_key(&key);
if ignore_deltas && kind == LargeObjectKind::DeltaLayer {
continue;
}
objects.push(LargeObject {
key,
size: obj.size,
size: obj.size.unwrap() as u64,
kind,
})
}
total_objects_ctr += listing.keys.len() as u64;
total_objects_ctr += fetch_response.contents().len() as u64;
match fetch_response.next_continuation_token {
Some(new_token) => continuation_token = Some(new_token),
None => break,
}
}
Ok((tenant_shard_id, objects, total_objects_ctr))

View File

@@ -5,7 +5,6 @@
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};
use anyhow::Context;
@@ -19,7 +18,7 @@ use utils::id::TenantId;
use crate::{
cloud_admin_api::{CloudAdminApiClient, MaybeDeleted, ProjectData},
init_remote, init_remote_generic, list_objects_with_retries,
init_remote, init_remote_generic,
metadata_stream::{stream_tenant_timelines, stream_tenants},
BucketConfig, ConsoleConfig, NodeKind, TenantShardTimelineId, TraversingDepth,
};
@@ -28,11 +27,6 @@ use crate::{
enum GarbageReason {
DeletedInConsole,
MissingInConsole,
// The remaining data relates to a known deletion issue, and we're sure that purging this
// will not delete any real data, for example https://github.com/neondatabase/neon/pull/7928 where
// there is nothing in a tenant path apart from a heatmap file.
KnownBug,
}
#[derive(Serialize, Deserialize, Debug)]
@@ -78,15 +72,6 @@ impl GarbageList {
}
}
/// If an entity has been identified as requiring purge due to a known bug, e.g.
/// a particular type of object left behind after an incomplete deletion.
fn append_buggy(&mut self, entity: GarbageEntity) {
self.items.push(GarbageItem {
entity,
reason: GarbageReason::KnownBug,
});
}
/// Return true if appended, false if not. False means the result was not garbage.
fn maybe_append<T>(&mut self, entity: GarbageEntity, result: Option<T>) -> bool
where
@@ -234,71 +219,6 @@ async fn find_garbage_inner(
assert!(project.tenant == tenant_shard_id.tenant_id);
}
// Special case: If it's missing in console, check for known bugs that would enable us to conclusively
// identify it as purge-able anyway
if console_result.is_none() {
let timelines = stream_tenant_timelines(&s3_client, &target, tenant_shard_id)
.await?
.collect::<Vec<_>>()
.await;
if timelines.is_empty() {
// No timelines, but a heatmap: the deletion bug where we deleted everything but heatmaps
let tenant_objects = list_objects_with_retries(
&s3_client,
&target.tenant_root(&tenant_shard_id),
None,
)
.await?;
let object = tenant_objects.contents.as_ref().unwrap().first().unwrap();
if object.key.as_ref().unwrap().ends_with("heatmap-v1.json") {
tracing::info!("Tenant {tenant_shard_id}: is missing in console and is only a heatmap (known historic deletion bug)");
garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id));
continue;
} else {
tracing::info!("Tenant {tenant_shard_id} is missing in console and contains one object: {}", object.key.as_ref().unwrap());
}
} else {
// A console-unknown tenant with timelines: check if these timelines only contain initdb.tar.zst, from the initial
// rollout of WAL DR in which we never deleted these.
let mut any_non_initdb = false;
for timeline_r in timelines {
let timeline = timeline_r?;
let timeline_objects = list_objects_with_retries(
&s3_client,
&target.timeline_root(&timeline),
None,
)
.await?;
if timeline_objects
.common_prefixes
.as_ref()
.map(|v| v.len())
.unwrap_or(0)
> 0
{
// Sub-paths? Unexpected
any_non_initdb = true;
} else {
let object = timeline_objects.contents.as_ref().unwrap().first().unwrap();
if object.key.as_ref().unwrap().ends_with("initdb.tar.zst") {
tracing::info!("Timeline {timeline} contains only initdb.tar.zst");
} else {
any_non_initdb = true;
}
}
}
if any_non_initdb {
tracing::info!("Tenant {tenant_shard_id}: is missing in console and contains timelines, one or more of which are more than just initdb");
} else {
tracing::info!("Tenant {tenant_shard_id}: is missing in console and contains only timelines that only contain initdb");
garbage.append_buggy(GarbageEntity::Tenant(tenant_shard_id));
continue;
}
}
}
if garbage.maybe_append(GarbageEntity::Tenant(tenant_shard_id), console_result) {
tracing::debug!("Tenant {tenant_shard_id} is garbage");
} else {
@@ -429,6 +349,9 @@ pub async fn get_timeline_objects(
tracing::debug!("Listing objects in timeline {ttid}");
let timeline_root = super::remote_timeline_path_id(&ttid);
// TODO: apply extra validation based on object modification time. Don't purge
// timelines whose index_part.json has been touched recently.
let list = s3_client
.list(
Some(&timeline_root),
@@ -499,7 +422,6 @@ impl DeletionProgressTracker {
pub async fn purge_garbage(
input_path: String,
mode: PurgeMode,
min_age: Duration,
dry_run: bool,
) -> anyhow::Result<()> {
let list_bytes = tokio::fs::read(&input_path).await?;
@@ -510,7 +432,7 @@ pub async fn purge_garbage(
input_path
);
let (remote_client, _target) =
let remote_client =
init_remote_generic(garbage_list.bucket_config.clone(), garbage_list.node_kind).await?;
assert_eq!(
@@ -537,7 +459,6 @@ pub async fn purge_garbage(
.filter(|i| match (&mode, &i.reason) {
(PurgeMode::DeletedAndMissing, _) => true,
(PurgeMode::DeletedOnly, GarbageReason::DeletedInConsole) => true,
(PurgeMode::DeletedOnly, GarbageReason::KnownBug) => true,
(PurgeMode::DeletedOnly, GarbageReason::MissingInConsole) => false,
});
@@ -566,37 +487,6 @@ pub async fn purge_garbage(
let mut progress_tracker = DeletionProgressTracker::default();
while let Some(result) = get_objects_results.next().await {
let mut object_list = result?;
// Extra safety check: even if a collection of objects is garbage, check max() of modification
// times before purging, so that if we incorrectly marked a live tenant as garbage then we would
// notice that its index has been written recently and would omit deleting it.
if object_list.is_empty() {
// Simplify subsequent code by ensuring list always has at least one item
// Usually, this only occurs if there is parallel deletions racing us, as there is no empty prefixes
continue;
}
let max_mtime = object_list.iter().map(|o| o.last_modified).max().unwrap();
let age = max_mtime.elapsed();
match age {
Err(_) => {
tracing::warn!("Bad last_modified time");
continue;
}
Ok(a) if a < min_age => {
// Failed age check. This doesn't mean we did something wrong: a tenant might really be garbage and recently
// written, but out of an abundance of caution we still don't purge it.
tracing::info!(
"Skipping tenant with young objects {}..{}",
object_list.first().as_ref().unwrap().key,
object_list.last().as_ref().unwrap().key
);
continue;
}
Ok(_) => {
// Passed age check
}
}
objects_to_delete.append(&mut object_list);
if objects_to_delete.len() >= MAX_KEYS_PER_DELETE {
do_delete(

View File

@@ -22,18 +22,16 @@ use aws_sdk_s3::Client;
use camino::{Utf8Path, Utf8PathBuf};
use clap::ValueEnum;
use futures::{Stream, StreamExt};
use pageserver::tenant::remote_timeline_client::{remote_tenant_path, remote_timeline_path};
use pageserver::tenant::TENANTS_SEGMENT_NAME;
use pageserver_api::shard::TenantShardId;
use remote_storage::{
GenericRemoteStorage, Listing, ListingMode, RemotePath, RemoteStorageConfig, RemoteStorageKind,
S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, S3Config,
DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT,
};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
use tokio_util::sync::CancellationToken;
use tracing::error;
use tracing_appender::non_blocking::WorkerGuard;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
@@ -321,35 +319,27 @@ fn default_prefix_in_bucket(node_kind: NodeKind) -> &'static str {
}
}
fn make_root_target(
bucket_name: String,
prefix_in_bucket: String,
node_kind: NodeKind,
) -> RootTarget {
let s3_target = S3Target {
bucket_name,
prefix_in_bucket,
delimiter: "/".to_string(),
};
match node_kind {
NodeKind::Pageserver => RootTarget::Pageserver(s3_target),
NodeKind::Safekeeper => RootTarget::Safekeeper(s3_target),
}
}
async fn init_remote(
bucket_config: BucketConfig,
node_kind: NodeKind,
) -> anyhow::Result<(Arc<Client>, RootTarget)> {
let bucket_region = Region::new(bucket_config.region);
let delimiter = "/".to_string();
let s3_client = Arc::new(init_s3_client(bucket_region).await);
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
let s3_root = make_root_target(
bucket_config.bucket,
bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
node_kind,
);
let s3_root = match node_kind {
NodeKind::Pageserver => RootTarget::Pageserver(S3Target {
bucket_name: bucket_config.bucket,
prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
delimiter,
}),
NodeKind::Safekeeper => RootTarget::Safekeeper(S3Target {
bucket_name: bucket_config.bucket,
prefix_in_bucket: bucket_config.prefix_in_bucket.unwrap_or(default_prefix),
delimiter,
}),
};
Ok((s3_client, s3_root))
}
@@ -357,12 +347,12 @@ async fn init_remote(
async fn init_remote_generic(
bucket_config: BucketConfig,
node_kind: NodeKind,
) -> anyhow::Result<(GenericRemoteStorage, RootTarget)> {
) -> anyhow::Result<GenericRemoteStorage> {
let endpoint = env::var("AWS_ENDPOINT_URL").ok();
let default_prefix = default_prefix_in_bucket(node_kind).to_string();
let prefix_in_bucket = Some(bucket_config.prefix_in_bucket.unwrap_or(default_prefix));
let storage = S3Config {
bucket_name: bucket_config.bucket.clone(),
bucket_name: bucket_config.bucket,
bucket_region: bucket_config.region,
prefix_in_bucket,
endpoint,
@@ -376,13 +366,7 @@ async fn init_remote_generic(
storage: RemoteStorageKind::AwsS3(storage),
timeout: RemoteStorageConfig::DEFAULT_TIMEOUT,
};
// We already pass the prefix to the remote client above
let prefix_in_root_target = String::new();
let s3_root = make_root_target(bucket_config.bucket, prefix_in_root_target, node_kind);
let client = GenericRemoteStorage::from_config(&storage_config).await?;
Ok((client, s3_root))
GenericRemoteStorage::from_config(&storage_config).await
}
async fn list_objects_with_retries(
@@ -420,44 +404,6 @@ async fn list_objects_with_retries(
Err(anyhow!("unreachable unless MAX_RETRIES==0"))
}
fn stream_objects_with_retries<'a>(
storage_client: &'a GenericRemoteStorage,
listing_mode: ListingMode,
s3_target: &'a S3Target,
) -> impl Stream<Item = Result<Listing, anyhow::Error>> + 'a {
async_stream::stream! {
let mut trial = 0;
let cancel = CancellationToken::new();
let prefix_str = &s3_target
.prefix_in_bucket
.strip_prefix("/")
.unwrap_or(&s3_target.prefix_in_bucket);
let prefix = RemotePath::from_string(prefix_str)?;
let mut list_stream =
storage_client.list_streaming(Some(&prefix), listing_mode, None, &cancel);
while let Some(res) = list_stream.next().await {
if let Err(err) = res {
let yield_err = if err.is_permanent() {
true
} else {
let backoff_time = 1 << trial.max(5);
tokio::time::sleep(Duration::from_secs(backoff_time)).await;
trial += 1;
trial == MAX_RETRIES - 1
};
if yield_err {
yield Err(err)
.with_context(|| format!("Failed to list objects {MAX_RETRIES} times"));
break;
}
} else {
trial = 0;
yield res.map_err(anyhow::Error::from);
}
}
}
}
async fn download_object_with_retries(
s3_client: &Client,
bucket_name: &str,

View File

@@ -50,8 +50,6 @@ enum Command {
input_path: String,
#[arg(short, long, default_value_t = PurgeMode::DeletedOnly)]
mode: PurgeMode,
#[arg(long = "min-age")]
min_age: humantime::Duration,
},
#[command(verbatim_doc_comment)]
ScanMetadata {
@@ -198,11 +196,9 @@ async fn main() -> anyhow::Result<()> {
let console_config = ConsoleConfig::from_env()?;
find_garbage(bucket_config, console_config, depth, node_kind, output_path).await
}
Command::PurgeGarbage {
input_path,
mode,
min_age,
} => purge_garbage(input_path, mode, min_age.into(), !cli.delete).await,
Command::PurgeGarbage { input_path, mode } => {
purge_garbage(input_path, mode, !cli.delete).await
}
Command::TenantSnapshot {
tenant_id,
output_path,

View File

@@ -1,41 +1,12 @@
use std::str::FromStr;
use anyhow::{anyhow, Context};
use anyhow::Context;
use async_stream::{stream, try_stream};
use aws_sdk_s3::{types::ObjectIdentifier, Client};
use futures::StreamExt;
use remote_storage::{GenericRemoteStorage, ListingMode};
use tokio_stream::Stream;
use crate::{
list_objects_with_retries, stream_objects_with_retries, RootTarget, S3Target,
TenantShardTimelineId,
};
use crate::{list_objects_with_retries, RootTarget, S3Target, TenantShardTimelineId};
use pageserver_api::shard::TenantShardId;
use utils::id::{TenantId, TimelineId};
/// Given a remote storage and a target, output a stream of TenantIds discovered via listing prefixes
pub fn stream_tenants_generic<'a>(
remote_client: &'a GenericRemoteStorage,
target: &'a RootTarget,
) -> impl Stream<Item = anyhow::Result<TenantShardId>> + 'a {
try_stream! {
let tenants_target = target.tenants_root();
let mut tenants_stream =
std::pin::pin!(stream_objects_with_retries(remote_client, ListingMode::WithDelimiter, &tenants_target));
while let Some(chunk) = tenants_stream.next().await {
let chunk = chunk?;
let entry_ids = chunk.prefixes.iter()
.map(|prefix| prefix.get_path().file_name().ok_or_else(|| anyhow!("no final component in path '{prefix}'")));
for dir_name_res in entry_ids {
let dir_name = dir_name_res?;
let id = TenantShardId::from_str(dir_name)?;
yield id;
}
}
}
}
/// Given an S3 bucket, output a stream of TenantIds discovered via ListObjectsv2
pub fn stream_tenants<'a>(
s3_client: &'a Client,

View File

@@ -81,7 +81,7 @@ should go.
Useful parameters and commands:
`--preserve-database-files` to preserve pageserver (layer) and safekeer (segment) timeline files on disk
after running a test suite. Such files might be large, so removed by default; but might be useful for debugging or creation of svg images with layer file contents. If `NeonEnvBuilder#preserve_database_files` set to `True` for a particular test, the whole `repo` directory will be attached to Allure report (thus uploaded to S3) as `everything.tar.zst` for this test.
after running a test suite. Such files might be large, so removed by default; but might be useful for debugging or creation of svg images with layer file contents.
Let stdout, stderr and `INFO` log messages go to the terminal instead of capturing them:
`./scripts/pytest -s --log-cli-level=INFO ...`

View File

@@ -222,8 +222,6 @@ class NeonBenchmarker:
function by the zenbenchmark fixture
"""
PROPERTY_PREFIX = "neon_benchmarker_"
def __init__(self, property_recorder: Callable[[str, object], None]):
# property recorder here is a pytest fixture provided by junitxml module
# https://docs.pytest.org/en/6.2.x/reference.html#pytest.junitxml.record_property
@@ -240,7 +238,7 @@ class NeonBenchmarker:
Record a benchmark result.
"""
# just to namespace the value
name = f"{self.PROPERTY_PREFIX}_{metric_name}"
name = f"neon_benchmarker_{metric_name}"
self.property_recorder(
name,
{
@@ -251,18 +249,6 @@ class NeonBenchmarker:
},
)
@classmethod
def records(
cls, user_properties: list[tuple[str, object]]
) -> Iterator[tuple[str, dict[str, object]]]:
"""
Yield all records related to benchmarks
"""
for property_name, recorded_property in user_properties:
if property_name.startswith(cls.PROPERTY_PREFIX):
assert isinstance(recorded_property, dict)
yield recorded_property["name"], recorded_property
@contextmanager
def record_duration(self, metric_name: str) -> Iterator[None]:
"""
@@ -439,11 +425,10 @@ def zenbenchmark(
yield benchmarker
results = {}
for _, recorded_property in NeonBenchmarker.records(request.node.user_properties):
for _, recorded_property in request.node.user_properties:
name = recorded_property["name"]
value = str(recorded_property["value"])
unit = str(recorded_property["unit"]).strip()
if unit != "":
if (unit := recorded_property["unit"].strip()) != "":
value += f" {unit}"
results[name] = value
@@ -492,7 +477,7 @@ def pytest_terminal_summary(
for test_report in terminalreporter.stats.get("passed", []):
result_entry = []
for _, recorded_property in NeonBenchmarker.records(test_report.user_properties):
for _, recorded_property in test_report.user_properties:
if not is_header_printed:
terminalreporter.section("Benchmark results", "-")
is_header_printed = True

View File

@@ -449,7 +449,6 @@ class TokenScope(str, Enum):
GENERATIONS_API = "generations_api"
SAFEKEEPER_DATA = "safekeeperdata"
TENANT = "tenant"
SCRUBBER = "scrubber"
class NeonEnvBuilder:
@@ -543,6 +542,21 @@ class NeonEnvBuilder:
f"Overriding pageserver default compaction algorithm to {self.pageserver_default_tenant_config_compaction_algorithm}"
)
self.pageserver_get_vectored_impl: Optional[str] = None
if os.getenv("PAGESERVER_GET_VECTORED_IMPL", "") == "vectored":
self.pageserver_get_vectored_impl = "vectored"
log.debug('Overriding pageserver get_vectored_impl config to "vectored"')
self.pageserver_get_impl: Optional[str] = None
if os.getenv("PAGESERVER_GET_IMPL", "") == "vectored":
self.pageserver_get_impl = "vectored"
log.debug('Overriding pageserver get_impl config to "vectored"')
self.pageserver_validate_vectored_get: Optional[bool] = None
if (validate := os.getenv("PAGESERVER_VALIDATE_VEC_GET")) is not None:
self.pageserver_validate_vectored_get = bool(validate)
log.debug(f'Overriding pageserver validate_vectored_get config to "{validate}"')
self.pageserver_aux_file_policy = pageserver_aux_file_policy
self.safekeeper_extra_opts = safekeeper_extra_opts
@@ -1143,6 +1157,12 @@ class NeonEnv:
}
if self.pageserver_virtual_file_io_engine is not None:
ps_cfg["virtual_file_io_engine"] = self.pageserver_virtual_file_io_engine
if config.pageserver_get_vectored_impl is not None:
ps_cfg["get_vectored_impl"] = config.pageserver_get_vectored_impl
if config.pageserver_get_impl is not None:
ps_cfg["get_impl"] = config.pageserver_get_impl
if config.pageserver_validate_vectored_get is not None:
ps_cfg["validate_vectored_get"] = config.pageserver_validate_vectored_get
if config.pageserver_default_tenant_config_compaction_algorithm is not None:
tenant_config = ps_cfg.setdefault("tenant_config", {})
tenant_config[
@@ -1395,7 +1415,7 @@ def _shared_simple_env(
pg_distrib_dir=pg_distrib_dir,
pg_version=pg_version,
run_id=run_id,
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
test_name=request.node.name,
test_output_dir=test_output_dir,
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
@@ -1442,7 +1462,6 @@ def neon_env_builder(
pageserver_virtual_file_io_engine: str,
pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]],
pageserver_aux_file_policy: Optional[AuxFileStore],
record_property: Callable[[str, object], None],
) -> Iterator[NeonEnvBuilder]:
"""
Fixture to create a Neon environment for test.
@@ -1471,7 +1490,7 @@ def neon_env_builder(
pg_version=pg_version,
broker=default_broker,
run_id=run_id,
preserve_database_files=cast(bool, pytestconfig.getoption("--preserve-database-files")),
preserve_database_files=pytestconfig.getoption("--preserve-database-files"),
pageserver_virtual_file_io_engine=pageserver_virtual_file_io_engine,
test_name=request.node.name,
test_output_dir=test_output_dir,
@@ -1480,9 +1499,6 @@ def neon_env_builder(
pageserver_default_tenant_config_compaction_algorithm=pageserver_default_tenant_config_compaction_algorithm,
) as builder:
yield builder
# Propogate `preserve_database_files` to make it possible to use in other fixtures,
# like `test_output_dir` fixture for attaching all database files to Allure report.
record_property("preserve_database_files", builder.preserve_database_files)
@dataclass
@@ -2587,62 +2603,6 @@ class NeonStorageController(MetricsGetter, LogUtils):
time.sleep(backoff)
def metadata_health_update(self, healthy: List[TenantShardId], unhealthy: List[TenantShardId]):
body: Dict[str, Any] = {
"healthy_tenant_shards": [str(t) for t in healthy],
"unhealthy_tenant_shards": [str(t) for t in unhealthy],
}
self.request(
"POST",
f"{self.env.storage_controller_api}/control/v1/metadata_health/update",
json=body,
headers=self.headers(TokenScope.SCRUBBER),
)
def metadata_health_list_unhealthy(self):
response = self.request(
"GET",
f"{self.env.storage_controller_api}/control/v1/metadata_health/unhealthy",
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
def metadata_health_list_outdated(self, duration: str):
body: Dict[str, Any] = {"not_scrubbed_for": duration}
response = self.request(
"POST",
f"{self.env.storage_controller_api}/control/v1/metadata_health/outdated",
json=body,
headers=self.headers(TokenScope.ADMIN),
)
return response.json()
def metadata_health_is_healthy(self, outdated_duration: str = "1h") -> bool:
"""Metadata is healthy if there is no unhealthy or outdated health records."""
unhealthy = self.metadata_health_list_unhealthy()
outdated = self.metadata_health_list_outdated(outdated_duration)
healthy = (
len(unhealthy["unhealthy_tenant_shards"]) == 0 and len(outdated["health_records"]) == 0
)
if not healthy:
log.info(f"{unhealthy=}, {outdated=}")
return healthy
def step_down(self):
log.info("Asking storage controller to step down")
response = self.request(
"PUT",
f"{self.env.storage_controller_api}/control/v1/step_down",
headers=self.headers(TokenScope.ADMIN),
)
response.raise_for_status()
return response.json()
def configure_failpoints(self, config_strings: Tuple[str, str] | List[Tuple[str, str]]):
if isinstance(config_strings, tuple):
pairs = [config_strings]
@@ -4528,16 +4488,7 @@ def test_output_dir(
yield test_dir
preserve_database_files = False
for k, v in request.node.user_properties:
# NB: the neon_env_builder fixture uses this fixture (test_output_dir).
# So, neon_env_builder's cleanup runs before here.
# The cleanup propagates NeonEnvBuilder.preserve_database_files into this user property.
if k == "preserve_database_files":
assert isinstance(v, bool)
preserve_database_files = v
allure_attach_from_dir(test_dir, preserve_database_files)
allure_attach_from_dir(test_dir)
class FileAndThreadLock:

View File

@@ -240,18 +240,9 @@ ATTACHMENT_NAME_REGEX: re.Pattern = re.compile( # type: ignore[type-arg]
)
def allure_attach_from_dir(dir: Path, preserve_database_files: bool = False):
def allure_attach_from_dir(dir: Path):
"""Attach all non-empty files from `dir` that matches `ATTACHMENT_NAME_REGEX` to Allure report"""
if preserve_database_files:
zst_file = dir.with_suffix(".tar.zst")
with zst_file.open("wb") as zst:
cctx = zstandard.ZstdCompressor()
with cctx.stream_writer(zst) as compressor:
with tarfile.open(fileobj=compressor, mode="w") as tar:
tar.add(dir, arcname="")
allure.attach.file(zst_file, "everything.tar.zst", "application/zstd", "tar.zst")
for attachment in Path(dir).glob("**/*"):
if ATTACHMENT_NAME_REGEX.fullmatch(attachment.name) and attachment.stat().st_size > 0:
name = str(attachment.relative_to(dir))

View File

@@ -1,88 +0,0 @@
"""
Test the logical replication in Neon with the different consumers
"""
import hashlib
import time
import clickhouse_connect
import psycopg2
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import RemotePostgres
from fixtures.utils import wait_until
def query_clickhouse(
client,
query: str,
digest: str,
) -> None:
"""
Run the query on the client
return answer if successful, raise an exception otherwise
"""
log.debug("Query: %s", query)
res = client.query(query)
log.debug(res.result_rows)
m = hashlib.sha1()
m.update(repr(tuple(res.result_rows)).encode())
hash_res = m.hexdigest()
log.debug("Hash: %s", hash_res)
if hash_res == digest:
return
raise ValueError("Hash mismatch")
@pytest.mark.remote_cluster
def test_clickhouse(remote_pg: RemotePostgres):
"""
Test the logical replication having ClickHouse as a client
"""
conn_options = remote_pg.conn_options()
for _ in range(5):
try:
conn = psycopg2.connect(remote_pg.connstr())
except psycopg2.OperationalError as perr:
log.debug(perr)
time.sleep(1)
else:
break
raise TimeoutError
cur = conn.cursor()
cur.execute("DROP TABLE IF EXISTS table1")
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
conn.commit()
client = clickhouse_connect.get_client(host="clickhouse")
client.command("SET allow_experimental_database_materialized_postgresql=1")
client.command(
"CREATE DATABASE db1_postgres ENGINE = "
f"MaterializedPostgreSQL('{conn_options['host']}', "
f"'{conn_options['dbname']}', "
f"'{conn_options['user']}', '{conn_options['password']}') "
"SETTINGS materialized_postgresql_tables_list = 'table1';"
)
wait_until(
120,
0.5,
lambda: query_clickhouse(
client,
"select * from db1_postgres.table1 order by 1",
"ee600d8f7cd05bd0b169fa81f44300a9dd10085a",
),
)
cur.execute("INSERT INTO table1 (id, column1) VALUES (3, 'ghi'), (4, 'jkl');")
conn.commit()
wait_until(
120,
0.5,
lambda: query_clickhouse(
client,
"select * from db1_postgres.table1 order by 1",
"9eba2daaf7e4d7d27ac849525f68b562ab53947d",
),
)
log.debug("Sleeping before final checking if Neon is still alive")
time.sleep(3)
cur.execute("SELECT 1")

View File

@@ -17,7 +17,6 @@ from fixtures.pg_version import PgVersion
# 3. Disk space used
# 4. Peak memory usage
#
@pytest.mark.skip("See https://github.com/neondatabase/neon/issues/7124")
def test_bulk_insert(neon_with_baseline: PgCompare):
env = neon_with_baseline

View File

@@ -389,11 +389,6 @@ def test_duplicate_creation(neon_env_builder: NeonEnvBuilder):
repeat_result = ps_http.timeline_create(
env.pg_version, env.initial_tenant, success_timeline, timeout=60
)
# remote_consistent_lsn_visible will be published only after we've
# confirmed the generation, which is not part of what we await during
# timeline creation (uploads). mask it out here to avoid flakyness.
del success_result["remote_consistent_lsn_visible"]
del repeat_result["remote_consistent_lsn_visible"]
assert repeat_result == success_result
finally:
env.pageserver.stop(immediate=True)

View File

@@ -17,17 +17,22 @@ from fixtures.pg_version import PgVersion
# Test restarting page server, while safekeeper and compute node keep
# running.
def test_local_corruption(neon_env_builder: NeonEnvBuilder):
if neon_env_builder.pageserver_get_impl == "vectored":
reconstruct_function_name = "get_values_reconstruct_data"
else:
reconstruct_function_name = "get_value_reconstruct_data"
env = neon_env_builder.init_start()
env.pageserver.allowed_errors.extend(
[
".*get_values_reconstruct_data for layer .*",
f".*{reconstruct_function_name} for layer .*",
".*could not find data for key.*",
".*is not active. Current state: Broken.*",
".*will not become active. Current state: Broken.*",
".*failed to load metadata.*",
".*load failed.*load local timeline.*",
".*: layer load failed, assuming permanent failure:.*",
".*layer loading failed permanently: load layer: .*",
]
)
@@ -74,7 +79,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
# (We don't check layer file contents on startup, when loading the timeline)
#
# This will change when we implement checksums for layers
with pytest.raises(Exception, match="get_values_reconstruct_data for layer ") as err:
with pytest.raises(Exception, match=f"{reconstruct_function_name} for layer ") as err:
pg1.start()
log.info(
f"As expected, compute startup failed for timeline {tenant1}/{timeline1} with corrupt layers: {err}"

View File

@@ -227,6 +227,12 @@ def test_forward_compatibility(
)
try:
# Previous version neon_local and pageserver are not aware
# of the new config.
# TODO: remove these once the previous version of neon local supports them
neon_env_builder.pageserver_get_impl = None
neon_env_builder.pageserver_validate_vectored_get = None
neon_env_builder.num_safekeepers = 3
# Use previous version's production binaries (pageserver, safekeeper, pg_distrib_dir, etc.).

View File

@@ -48,6 +48,8 @@ def test_sharding_smoke(
# that the scrubber doesn't barf when it sees a sharded tenant.
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.preserve_database_files = True
env = neon_env_builder.init_start(
initial_tenant_shard_count=shard_count, initial_tenant_shard_stripe_size=stripe_size
)
@@ -370,6 +372,8 @@ def test_sharding_split_smoke(
# that the scrubber doesn't barf when it sees a sharded tenant.
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.preserve_database_files = True
non_default_tenant_config = {"gc_horizon": 77 * 1024 * 1024}
env = neon_env_builder.init_configs(True)

View File

@@ -3,7 +3,7 @@ import threading
import time
from collections import defaultdict
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Dict, List, Union
import pytest
from fixtures.common_types import TenantId, TenantShardId, TimelineId
@@ -1783,198 +1783,3 @@ def test_storage_controller_node_deletion(
assert victim.id not in [n["id"] for n in env.storage_controller.node_list()]
env.storage_controller.reconcile_all() # FIXME: workaround for optimizations happening on startup, see FIXME above.
env.storage_controller.consistency_check()
@pytest.mark.parametrize("shard_count", [None, 2])
def test_storage_controller_metadata_health(
neon_env_builder: NeonEnvBuilder,
shard_count: Optional[int],
):
"""
Create three tenants A, B, C.
Phase 1:
- A: Post healthy status.
- B: Post unhealthy status.
- C: No updates.
Phase 2:
- B: Post healthy status.
- C: Post healthy status.
Phase 3:
- A: Post unhealthy status.
Phase 4:
- Delete tenant A, metadata health status should be deleted as well.
"""
def update_and_query_metadata_health(
env: NeonEnv,
healthy: List[TenantShardId],
unhealthy: List[TenantShardId],
outdated_duration: str = "1h",
) -> Tuple[Set[str], Set[str]]:
"""
Update metadata health. Then list tenant shards with unhealthy and
outdated metadata health status.
"""
if healthy or unhealthy:
env.storage_controller.metadata_health_update(healthy, unhealthy)
result = env.storage_controller.metadata_health_list_unhealthy()
unhealthy_res = set(result["unhealthy_tenant_shards"])
result = env.storage_controller.metadata_health_list_outdated(outdated_duration)
outdated_res = set(record["tenant_shard_id"] for record in result["health_records"])
return unhealthy_res, outdated_res
neon_env_builder.enable_pageserver_remote_storage(s3_storage())
neon_env_builder.num_pageservers = 2
env = neon_env_builder.init_start()
# Mock tenant (`initial_tenant``) with healthy scrubber scan result
tenant_a_shard_ids = (
env.storage_controller.tenant_shard_split(env.initial_tenant, shard_count=shard_count)
if shard_count is not None
else [TenantShardId(env.initial_tenant, 0, 0)]
)
# Mock tenant with unhealthy scrubber scan result
tenant_b, _ = env.neon_cli.create_tenant(shard_count=shard_count)
tenant_b_shard_ids = (
env.storage_controller.tenant_shard_split(tenant_b, shard_count=shard_count)
if shard_count is not None
else [TenantShardId(tenant_b, 0, 0)]
)
# Mock tenant that never gets a health update from scrubber
tenant_c, _ = env.neon_cli.create_tenant(shard_count=shard_count)
tenant_c_shard_ids = (
env.storage_controller.tenant_shard_split(tenant_c, shard_count=shard_count)
if shard_count is not None
else [TenantShardId(tenant_c, 0, 0)]
)
# Metadata health table also updated as tenant shards are created.
assert env.storage_controller.metadata_health_is_healthy()
# post "fake" updates to storage controller db
unhealthy, outdated = update_and_query_metadata_health(
env, healthy=tenant_a_shard_ids, unhealthy=tenant_b_shard_ids
)
log.info(f"After Phase 1: {unhealthy=}, {outdated=}")
assert len(unhealthy) == len(tenant_b_shard_ids)
for t in tenant_b_shard_ids:
assert str(t) in unhealthy
assert len(outdated) == 0
unhealthy, outdated = update_and_query_metadata_health(
env, healthy=tenant_b_shard_ids + tenant_c_shard_ids, unhealthy=[]
)
log.info(f"After Phase 2: {unhealthy=}, {outdated=}")
assert len(unhealthy) == 0
assert len(outdated) == 0
unhealthy, outdated = update_and_query_metadata_health(
env, healthy=[], unhealthy=tenant_a_shard_ids
)
log.info(f"After Phase 3: {unhealthy=}, {outdated=}")
assert len(unhealthy) == len(tenant_a_shard_ids)
for t in tenant_a_shard_ids:
assert str(t) in unhealthy
assert len(outdated) == 0
# Phase 4: Delete A
env.storage_controller.pageserver_api().tenant_delete(env.initial_tenant)
# A's unhealthy metadata health status should be deleted as well.
assert env.storage_controller.metadata_health_is_healthy()
# All shards from B and C are not fresh if set outdated duration to 0 seconds.
unhealthy, outdated = update_and_query_metadata_health(
env, healthy=[], unhealthy=tenant_a_shard_ids, outdated_duration="0s"
)
assert len(unhealthy) == 0
for t in tenant_b_shard_ids + tenant_c_shard_ids:
assert str(t) in outdated
def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
"""
Test the `/control/v1/step_down` storage controller API. Upon receiving such
a request, the storage controller cancels any on-going reconciles and replies
with 503 to all requests apart from `/control/v1/step_down`, `/status` and `/metrics`.
"""
env = neon_env_builder.init_configs()
env.start()
tid = TenantId.generate()
tsid = str(TenantShardId(tid, shard_number=0, shard_count=0))
env.storage_controller.tenant_create(tid)
env.storage_controller.reconcile_until_idle()
env.storage_controller.configure_failpoints(("sleep-on-reconcile-epilogue", "return(10000)"))
# Make a change to the tenant config to trigger a slow reconcile
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
virtual_ps_http.patch_tenant_config_client_side(tid, {"compaction_threshold": 5}, None)
env.storage_controller.allowed_errors.append(
".*Accepted configuration update but reconciliation failed.*"
)
observed_state = env.storage_controller.step_down()
log.info(f"Storage controller stepped down with {observed_state=}")
# Validate that we waited for the slow reconcile to complete
# and updated the observed state in the storcon before stepping down.
node_id = str(env.pageserver.id)
assert tsid in observed_state
assert node_id in observed_state[tsid]["locations"]
assert "conf" in observed_state[tsid]["locations"][node_id]
assert "tenant_conf" in observed_state[tsid]["locations"][node_id]["conf"]
tenant_conf = observed_state[tsid]["locations"][node_id]["conf"]["tenant_conf"]
assert "compaction_threshold" in tenant_conf
assert tenant_conf["compaction_threshold"] == 5
# Validate that we propagated the change to the pageserver
ps_tenant_conf = env.pageserver.http_client().tenant_config(tid)
assert "compaction_threshold" in ps_tenant_conf.effective_config
assert ps_tenant_conf.effective_config["compaction_threshold"] == 5
# Validate that the storcon is not replying to the usual requests
# once it has stepped down.
with pytest.raises(StorageControllerApiException, match="stepped_down"):
env.storage_controller.tenant_list()
# Validate that we can step down multiple times and the observed state
# doesn't change.
observed_state_again = env.storage_controller.step_down()
assert observed_state == observed_state_again
assert (
env.storage_controller.get_metric_value(
"storage_controller_leadership_status", filter={"status": "leader"}
)
== 0
)
assert (
env.storage_controller.get_metric_value(
"storage_controller_leadership_status", filter={"status": "stepped_down"}
)
== 1
)
assert (
env.storage_controller.get_metric_value(
"storage_controller_leadership_status", filter={"status": "candidate"}
)
== 0
)

View File

@@ -404,7 +404,7 @@ files:
x::text as duration_seconds,
neon.approximate_working_set_size_seconds(x) as size
from
(select generate_series * 60 as x from generate_series(1, 60)) as t (x);
(select generate_series * 60 as x from generate_series(1, 60));
build: |
# Build cgroup-tools
#