Compare commits

..

2 Commits

Author SHA1 Message Date
Erik Grinaker
166a995a05 Merge branch 'release' into rc/release/2025-02-27 2025-02-27 10:45:31 +01:00
github-actions[bot]
5135e3b338 Storage release 2025-02-27 2025-02-27 00:17:57 +00:00
144 changed files with 1333 additions and 1871 deletions

View File

@@ -19,7 +19,7 @@ on:
description: "Tag of the last compute release"
value: ${{ jobs.tags.outputs.compute }}
run-kind:
description: "The kind of run we're currently in. Will be one of `pr`, `push-main`, `storage-rc`, `storage-release`, `proxy-rc`, `proxy-release`, `compute-rc`, `compute-release` or `merge_queue`"
description: "The kind of run we're currently in. Will be one of `pr-main`, `push-main`, `storage-rc`, `storage-release`, `proxy-rc`, `proxy-release`, `compute-rc`, `compute-release` or `merge_queue`"
value: ${{ jobs.tags.outputs.run-kind }}
permissions: {}
@@ -51,10 +51,10 @@ jobs:
|| (inputs.github-event-name == 'push' && github.ref_name == 'release') && 'storage-release'
|| (inputs.github-event-name == 'push' && github.ref_name == 'release-compute') && 'compute-release'
|| (inputs.github-event-name == 'push' && github.ref_name == 'release-proxy') && 'proxy-release'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'main') && 'pr-main'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'release') && 'storage-rc-pr'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'release-compute') && 'compute-rc-pr'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'release-proxy') && 'proxy-rc-pr'
|| (inputs.github-event-name == 'pull_request') && 'pr'
|| 'unknown'
}}
run: |
@@ -81,7 +81,7 @@ jobs:
compute-release)
echo "tag=release-compute-$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
;;
pr|storage-rc-pr|compute-rc-pr|proxy-rc-pr)
pr-main|storage-rc-pr|compute-rc-pr|proxy-rc-pr)
BUILD_AND_TEST_RUN_ID=$(gh run list -b $CURRENT_BRANCH -c $CURRENT_SHA -w 'Build and Test' -L 1 --json databaseId --jq '.[].databaseId')
echo "tag=$BUILD_AND_TEST_RUN_ID" | tee -a $GITHUB_OUTPUT
;;

View File

@@ -140,7 +140,6 @@ jobs:
--ignore test_runner/performance/test_logical_replication.py
--ignore test_runner/performance/test_physical_replication.py
--ignore test_runner/performance/test_perf_ingest_using_pgcopydb.py
--ignore test_runner/performance/test_cumulative_statistics_persistence.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -172,61 +171,6 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
cumstats-test:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 17
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Verify that cumulative statistics are preserved
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_cumulative_statistics_persistence.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 3600
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
replication-tests:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:

View File

@@ -487,7 +487,7 @@ jobs:
neon-image-arch:
needs: [ check-permissions, build-build-tools-image, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
strategy:
matrix:
arch: [ x64, arm64 ]
@@ -537,7 +537,7 @@ jobs:
neon-image:
needs: [ neon-image-arch, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: ubuntu-22.04
permissions:
id-token: write # aws-actions/configure-aws-credentials
@@ -559,7 +559,7 @@ jobs:
compute-node-image-arch:
needs: [ check-permissions, build-build-tools-image, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
@@ -651,7 +651,7 @@ jobs:
compute-node-image:
needs: [ compute-node-image-arch, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
@@ -694,7 +694,7 @@ jobs:
vm-compute-node-image:
needs: [ check-permissions, meta, compute-node-image ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: [ self-hosted, large ]
strategy:
fail-fast: false
@@ -775,7 +775,7 @@ jobs:
# Ensure that we don't have bad versions.
- name: Verify image versions
shell: bash # ensure no set -e for better error messages
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
run: |
pageserver_version=$(docker run --rm neondatabase/neon:${{ needs.meta.outputs.build-tag }} "/bin/sh" "-c" "/usr/local/bin/pageserver --version")
@@ -823,12 +823,12 @@ jobs:
- name: Test extension upgrade
timeout-minutes: 20
if: ${{ contains(fromJSON('["pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["pr-main", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
env:
TAG: >-
${{
false
|| needs.meta.outputs.run-kind == 'pr' && needs.meta.outputs.build-tag
|| needs.meta.outputs.run-kind == 'pr-main' && needs.meta.outputs.build-tag
|| needs.meta.outputs.run-kind == 'compute-rc-pr' && needs.meta.outputs.previous-storage-release
}}
TEST_EXTENSIONS_TAG: latest
@@ -870,7 +870,7 @@ jobs:
push-neon-image-dev:
needs: [ meta, generate-image-maps, neon-image ]
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -888,7 +888,7 @@ jobs:
push-compute-image-dev:
needs: [ meta, generate-image-maps, vm-compute-node-image ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -906,8 +906,7 @@ jobs:
push-neon-image-prod:
needs: [ meta, generate-image-maps, neon-image, test-images ]
# Depends on jobs that can get skipped
if: ${{ !failure() && !cancelled() && contains(fromJSON('["storage-release", "proxy-release"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["storage-release", "proxy-release"]'), needs.meta.outputs.run-kind) }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -925,8 +924,7 @@ jobs:
push-compute-image-prod:
needs: [ meta, generate-image-maps, vm-compute-node-image, test-images ]
# Depends on jobs that can get skipped
if: ${{ !failure() && !cancelled() && needs.meta.outputs.run-kind == 'compute-release' }}
if: ${{ needs.meta.outputs.run-kind == 'compute-release' }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -957,7 +955,7 @@ jobs:
trigger-custom-extensions-build-and-wait:
needs: [ check-permissions, meta ]
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: ubuntu-22.04
permissions:
id-token: write # aws-actions/configure-aws-credentials
@@ -1349,7 +1347,7 @@ jobs:
|| needs.check-codestyle-python.result == 'skipped'
|| needs.check-codestyle-rust.result == 'skipped'
|| needs.files-changed.result == 'skipped'
|| (needs.push-compute-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.push-neon-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.push-compute-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.push-neon-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind))
|| needs.test-images.result == 'skipped'
|| (needs.trigger-custom-extensions-build-and-wait.result == 'skipped' && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.trigger-custom-extensions-build-and-wait.result == 'skipped' && contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))

16
Cargo.lock generated
View File

@@ -984,9 +984,9 @@ dependencies = [
[[package]]
name = "bindgen"
version = "0.71.1"
version = "0.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3"
checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f"
dependencies = [
"bitflags 2.8.0",
"cexpr",
@@ -997,7 +997,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
"rustc-hash 2.1.1",
"rustc-hash",
"shlex",
"syn 2.0.90",
]
@@ -3537,7 +3537,7 @@ dependencies = [
"measured-derive",
"memchr",
"parking_lot 0.12.1",
"rustc-hash 1.1.0",
"rustc-hash",
"ryu",
]
@@ -5012,7 +5012,7 @@ dependencies = [
"reqwest-tracing",
"rsa",
"rstest",
"rustc-hash 1.1.0",
"rustc-hash",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"rustls-pemfile 2.1.1",
@@ -5630,12 +5630,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc-hash"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustc_version"
version = "0.4.0"

View File

@@ -43,7 +43,7 @@ members = [
]
[workspace.package]
edition = "2024"
edition = "2021"
license = "Apache-2.0"
## All dependency versions, used in the project
@@ -70,7 +70,7 @@ aws-types = "1.3"
axum = { version = "0.8.1", features = ["ws"] }
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.71"
bindgen = "0.70"
bit_field = "0.10.2"
bstr = "1.0"
byteorder = "1.4"

View File

@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::iter::once;
use std::os::unix::fs::{PermissionsExt, symlink};
use std::path::Path;
use std::process::{Command, Stdio};
@@ -12,7 +13,9 @@ use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use compute_api::privilege::Privilege;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeFeature, ComputeMode, ComputeSpec, ExtVersion, PgIdent};
use compute_api::spec::{
ComputeFeature, ComputeMode, ComputeSpec, Database, ExtVersion, PgIdent, Role,
};
use futures::StreamExt;
use futures::future::join_all;
use futures::stream::FuturesUnordered;
@@ -31,6 +34,16 @@ use utils::measured_stream::MeasuredReader;
use crate::installed_extensions::get_installed_extensions;
use crate::pg_helpers::*;
use crate::spec::*;
use crate::spec_apply::ApplySpecPhase::{
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSchemaNeon,
CreateSuperUser, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
RunInEachDatabase,
};
use crate::spec_apply::PerDatabasePhase::{
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
};
use crate::spec_apply::{DB, MutableApplyContext, PerDatabasePhase, apply_operations};
use crate::sync_sk::{check_if_synced, ping_safekeeper};
use crate::{config, extension_server, local_proxy};
@@ -915,6 +928,388 @@ impl ComputeNode {
Ok(client)
}
/// Apply the spec to the running PostgreSQL instance.
/// The caller can decide to run with multiple clients in parallel, or
/// single mode. Either way, the commands executed will be the same, and
/// only commands run in different databases are parallelized.
#[instrument(skip_all)]
pub fn apply_spec_sql(
&self,
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
concurrency: usize,
) -> Result<()> {
info!("Applying config with max {} concurrency", concurrency);
debug!("Config: {:?}", spec);
let rt = tokio::runtime::Handle::current();
rt.block_on(async {
// Proceed with post-startup configuration. Note, that order of operations is important.
let client = Self::get_maintenance_client(&conf).await?;
let spec = spec.clone();
let databases = get_existing_dbs_async(&client).await?;
let roles = get_existing_roles_async(&client)
.await?
.into_iter()
.map(|role| (role.name.clone(), role))
.collect::<HashMap<String, Role>>();
// Check if we need to drop subscriptions before starting the endpoint.
//
// It is important to do this operation exactly once when endpoint starts on a new branch.
// Otherwise, we may drop not inherited, but newly created subscriptions.
//
// We cannot rely only on spec.drop_subscriptions_before_start flag,
// because if for some reason compute restarts inside VM,
// it will start again with the same spec and flag value.
//
// To handle this, we save the fact of the operation in the database
// in the neon.drop_subscriptions_done table.
// If the table does not exist, we assume that the operation was never performed, so we must do it.
// If table exists, we check if the operation was performed on the current timelilne.
//
let mut drop_subscriptions_done = false;
if spec.drop_subscriptions_before_start {
let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
let query = format!("select 1 from neon.drop_subscriptions_done where timeline_id = '{}'", timeline_id);
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
drop_subscriptions_done = match
client.simple_query(&query).await {
Ok(result) => {
matches!(&result[0], postgres::SimpleQueryMessage::Row(_))
},
Err(e) =>
{
match e.code() {
Some(&SqlState::UNDEFINED_TABLE) => false,
_ => {
// We don't expect any other error here, except for the schema/table not existing
error!("Error checking if drop subscription operation was already performed: {}", e);
return Err(e.into());
}
}
}
}
};
let jwks_roles = Arc::new(
spec.as_ref()
.local_proxy_config
.iter()
.flat_map(|it| &it.jwks)
.flatten()
.flat_map(|setting| &setting.role_names)
.cloned()
.collect::<HashSet<_>>(),
);
let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
roles,
dbs: databases,
}));
// Apply special pre drop database phase.
// NOTE: we use the code of RunInEachDatabase phase for parallelism
// and connection management, but we don't really run it in *each* database,
// only in databases, we're about to drop.
info!("Applying PerDatabase (pre-dropdb) phase");
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
// Run the phase for each database that we're about to drop.
let db_processes = spec
.delta_operations
.iter()
.flatten()
.filter_map(move |op| {
if op.action.as_str() == "delete_db" {
Some(op.name.clone())
} else {
None
}
})
.map(|dbname| {
let spec = spec.clone();
let ctx = ctx.clone();
let jwks_roles = jwks_roles.clone();
let mut conf = conf.as_ref().clone();
let concurrency_token = concurrency_token.clone();
// We only need dbname field for this phase, so set other fields to dummy values
let db = DB::UserDB(Database {
name: dbname.clone(),
owner: "cloud_admin".to_string(),
options: None,
restrict_conn: false,
invalid: false,
});
debug!("Applying per-database phases for Database {:?}", &db);
match &db {
DB::SystemDB => {}
DB::UserDB(db) => {
conf.dbname(db.name.as_str());
}
}
let conf = Arc::new(conf);
let fut = Self::apply_spec_sql_db(
spec.clone(),
conf,
ctx.clone(),
jwks_roles.clone(),
concurrency_token.clone(),
db,
[DropLogicalSubscriptions].to_vec(),
);
Ok(spawn(fut))
})
.collect::<Vec<Result<_, anyhow::Error>>>();
for process in db_processes.into_iter() {
let handle = process?;
if let Err(e) = handle.await? {
// Handle the error case where the database does not exist
// We do not check whether the DB exists or not in the deletion phase,
// so we shouldn't be strict about it in pre-deletion cleanup as well.
if e.to_string().contains("does not exist") {
warn!("Error dropping subscription: {}", e);
} else {
return Err(e);
}
};
}
for phase in [
CreateSuperUser,
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
RenameAndDeleteDatabases,
CreateAndAlterDatabases,
CreateSchemaNeon,
] {
info!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
)
.await?;
}
info!("Applying RunInEachDatabase2 phase");
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
let db_processes = spec
.cluster
.databases
.iter()
.map(|db| DB::new(db.clone()))
// include
.chain(once(DB::SystemDB))
.map(|db| {
let spec = spec.clone();
let ctx = ctx.clone();
let jwks_roles = jwks_roles.clone();
let mut conf = conf.as_ref().clone();
let concurrency_token = concurrency_token.clone();
let db = db.clone();
debug!("Applying per-database phases for Database {:?}", &db);
match &db {
DB::SystemDB => {}
DB::UserDB(db) => {
conf.dbname(db.name.as_str());
}
}
let conf = Arc::new(conf);
let mut phases = vec![
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
];
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
phases.push(DropLogicalSubscriptions);
}
let fut = Self::apply_spec_sql_db(
spec.clone(),
conf,
ctx.clone(),
jwks_roles.clone(),
concurrency_token.clone(),
db,
phases,
);
Ok(spawn(fut))
})
.collect::<Vec<Result<_, anyhow::Error>>>();
for process in db_processes.into_iter() {
let handle = process?;
handle.await??;
}
let mut phases = vec![
HandleOtherExtensions,
HandleNeonExtension, // This step depends on CreateSchemaNeon
CreateAvailabilityCheck,
DropRoles,
];
// This step depends on CreateSchemaNeon
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
phases.push(FinalizeDropLogicalSubscriptions);
}
for phase in phases {
debug!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
)
.await?;
}
Ok::<(), anyhow::Error>(())
})?;
Ok(())
}
/// Apply SQL migrations of the RunInEachDatabase phase.
///
/// May opt to not connect to databases that don't have any scheduled
/// operations. The function is concurrency-controlled with the provided
/// semaphore. The caller has to make sure the semaphore isn't exhausted.
async fn apply_spec_sql_db(
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
jwks_roles: Arc<HashSet<String>>,
concurrency_token: Arc<tokio::sync::Semaphore>,
db: DB,
subphases: Vec<PerDatabasePhase>,
) -> Result<()> {
let _permit = concurrency_token.acquire().await?;
let mut client_conn = None;
for subphase in subphases {
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
RunInEachDatabase {
db: db.clone(),
subphase,
},
// Only connect if apply_operation actually wants a connection.
// It's quite possible this database doesn't need any queries,
// so by not connecting we save time and effort connecting to
// that database.
|| async {
if client_conn.is_none() {
let db_client = Self::get_maintenance_client(&conf).await?;
client_conn.replace(db_client);
}
let client = client_conn.as_ref().unwrap();
Ok(client)
},
)
.await?;
}
drop(client_conn);
Ok::<(), anyhow::Error>(())
}
/// Choose how many concurrent connections to use for applying the spec changes.
pub fn max_service_connections(
&self,
compute_state: &ComputeState,
spec: &ComputeSpec,
) -> usize {
// If the cluster is in Init state we don't have to deal with user connections,
// and can thus use all `max_connections` connection slots. However, that's generally not
// very efficient, so we generally still limit it to a smaller number.
if compute_state.status == ComputeStatus::Init {
// If the settings contain 'max_connections', use that as template
if let Some(config) = spec.cluster.settings.find("max_connections") {
config.parse::<usize>().ok()
} else {
// Otherwise, try to find the setting in the postgresql_conf string
spec.cluster
.postgresql_conf
.iter()
.flat_map(|conf| conf.split("\n"))
.filter_map(|line| {
if !line.contains("max_connections") {
return None;
}
let (key, value) = line.split_once("=")?;
let key = key
.trim_start_matches(char::is_whitespace)
.trim_end_matches(char::is_whitespace);
let value = value
.trim_start_matches(char::is_whitespace)
.trim_end_matches(char::is_whitespace);
if key != "max_connections" {
return None;
}
value.parse::<usize>().ok()
})
.next()
}
// If max_connections is present, use at most 1/3rd of that.
// When max_connections is lower than 30, try to use at least 10 connections, but
// never more than max_connections.
.map(|limit| match limit {
0..10 => limit,
10..30 => 10,
30.. => limit / 3,
})
// If we didn't find max_connections, default to 10 concurrent connections.
.unwrap_or(10)
} else {
// state == Running
// Because the cluster is already in the Running state, we should assume users are
// already connected to the cluster, and high concurrency could negatively
// impact user connectivity. Therefore, we can limit concurrency to the number of
// reserved superuser connections, which users wouldn't be able to use anyway.
spec.cluster
.settings
.find("superuser_reserved_connections")
.iter()
.filter_map(|val| val.parse::<usize>().ok())
.map(|val| if val > 1 { val - 1 } else { 1 })
.last()
.unwrap_or(3)
}
}
/// Do initial configuration of the already started Postgres.
#[instrument(skip_all)]
pub fn apply_config(&self, compute_state: &ComputeState) -> Result<()> {

View File

@@ -4,413 +4,15 @@ use std::future::Future;
use std::iter::{empty, once};
use std::sync::Arc;
use anyhow::{Context, Result};
use compute_api::responses::ComputeStatus;
use anyhow::Result;
use compute_api::spec::{ComputeFeature, ComputeSpec, Database, PgIdent, Role};
use futures::future::join_all;
use tokio::sync::RwLock;
use tokio_postgres::Client;
use tokio_postgres::error::SqlState;
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};
use tracing::{Instrument, debug, info_span, warn};
use crate::compute::{ComputeNode, ComputeState, construct_superuser_query};
use crate::pg_helpers::{
DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal, get_existing_dbs_async,
get_existing_roles_async,
};
use crate::spec_apply::ApplySpecPhase::{
CreateAndAlterDatabases, CreateAndAlterRoles, CreateAvailabilityCheck, CreateSchemaNeon,
CreateSuperUser, DropInvalidDatabases, DropRoles, FinalizeDropLogicalSubscriptions,
HandleNeonExtension, HandleOtherExtensions, RenameAndDeleteDatabases, RenameRoles,
RunInEachDatabase,
};
use crate::spec_apply::PerDatabasePhase::{
ChangeSchemaPerms, DeleteDBRoleReferences, DropLogicalSubscriptions, HandleAnonExtension,
};
impl ComputeNode {
/// Apply the spec to the running PostgreSQL instance.
/// The caller can decide to run with multiple clients in parallel, or
/// single mode. Either way, the commands executed will be the same, and
/// only commands run in different databases are parallelized.
#[instrument(skip_all)]
pub fn apply_spec_sql(
&self,
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
concurrency: usize,
) -> Result<()> {
info!("Applying config with max {} concurrency", concurrency);
debug!("Config: {:?}", spec);
let rt = tokio::runtime::Handle::current();
rt.block_on(async {
// Proceed with post-startup configuration. Note, that order of operations is important.
let client = Self::get_maintenance_client(&conf).await?;
let spec = spec.clone();
let databases = get_existing_dbs_async(&client).await?;
let roles = get_existing_roles_async(&client)
.await?
.into_iter()
.map(|role| (role.name.clone(), role))
.collect::<HashMap<String, Role>>();
// Check if we need to drop subscriptions before starting the endpoint.
//
// It is important to do this operation exactly once when endpoint starts on a new branch.
// Otherwise, we may drop not inherited, but newly created subscriptions.
//
// We cannot rely only on spec.drop_subscriptions_before_start flag,
// because if for some reason compute restarts inside VM,
// it will start again with the same spec and flag value.
//
// To handle this, we save the fact of the operation in the database
// in the neon.drop_subscriptions_done table.
// If the table does not exist, we assume that the operation was never performed, so we must do it.
// If table exists, we check if the operation was performed on the current timelilne.
//
let mut drop_subscriptions_done = false;
if spec.drop_subscriptions_before_start {
let timeline_id = self.get_timeline_id().context("timeline_id must be set")?;
let query = format!("select 1 from neon.drop_subscriptions_done where timeline_id = '{}'", timeline_id);
info!("Checking if drop subscription operation was already performed for timeline_id: {}", timeline_id);
drop_subscriptions_done = match
client.simple_query(&query).await {
Ok(result) => {
matches!(&result[0], postgres::SimpleQueryMessage::Row(_))
},
Err(e) =>
{
match e.code() {
Some(&SqlState::UNDEFINED_TABLE) => false,
_ => {
// We don't expect any other error here, except for the schema/table not existing
error!("Error checking if drop subscription operation was already performed: {}", e);
return Err(e.into());
}
}
}
}
};
let jwks_roles = Arc::new(
spec.as_ref()
.local_proxy_config
.iter()
.flat_map(|it| &it.jwks)
.flatten()
.flat_map(|setting| &setting.role_names)
.cloned()
.collect::<HashSet<_>>(),
);
let ctx = Arc::new(tokio::sync::RwLock::new(MutableApplyContext {
roles,
dbs: databases,
}));
// Apply special pre drop database phase.
// NOTE: we use the code of RunInEachDatabase phase for parallelism
// and connection management, but we don't really run it in *each* database,
// only in databases, we're about to drop.
info!("Applying PerDatabase (pre-dropdb) phase");
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
// Run the phase for each database that we're about to drop.
let db_processes = spec
.delta_operations
.iter()
.flatten()
.filter_map(move |op| {
if op.action.as_str() == "delete_db" {
Some(op.name.clone())
} else {
None
}
})
.map(|dbname| {
let spec = spec.clone();
let ctx = ctx.clone();
let jwks_roles = jwks_roles.clone();
let mut conf = conf.as_ref().clone();
let concurrency_token = concurrency_token.clone();
// We only need dbname field for this phase, so set other fields to dummy values
let db = DB::UserDB(Database {
name: dbname.clone(),
owner: "cloud_admin".to_string(),
options: None,
restrict_conn: false,
invalid: false,
});
debug!("Applying per-database phases for Database {:?}", &db);
match &db {
DB::SystemDB => {}
DB::UserDB(db) => {
conf.dbname(db.name.as_str());
}
}
let conf = Arc::new(conf);
let fut = Self::apply_spec_sql_db(
spec.clone(),
conf,
ctx.clone(),
jwks_roles.clone(),
concurrency_token.clone(),
db,
[DropLogicalSubscriptions].to_vec(),
);
Ok(tokio::spawn(fut))
})
.collect::<Vec<Result<_, anyhow::Error>>>();
for process in db_processes.into_iter() {
let handle = process?;
if let Err(e) = handle.await? {
// Handle the error case where the database does not exist
// We do not check whether the DB exists or not in the deletion phase,
// so we shouldn't be strict about it in pre-deletion cleanup as well.
if e.to_string().contains("does not exist") {
warn!("Error dropping subscription: {}", e);
} else {
return Err(e);
}
};
}
for phase in [
CreateSuperUser,
DropInvalidDatabases,
RenameRoles,
CreateAndAlterRoles,
RenameAndDeleteDatabases,
CreateAndAlterDatabases,
CreateSchemaNeon,
] {
info!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
)
.await?;
}
info!("Applying RunInEachDatabase2 phase");
let concurrency_token = Arc::new(tokio::sync::Semaphore::new(concurrency));
let db_processes = spec
.cluster
.databases
.iter()
.map(|db| DB::new(db.clone()))
// include
.chain(once(DB::SystemDB))
.map(|db| {
let spec = spec.clone();
let ctx = ctx.clone();
let jwks_roles = jwks_roles.clone();
let mut conf = conf.as_ref().clone();
let concurrency_token = concurrency_token.clone();
let db = db.clone();
debug!("Applying per-database phases for Database {:?}", &db);
match &db {
DB::SystemDB => {}
DB::UserDB(db) => {
conf.dbname(db.name.as_str());
}
}
let conf = Arc::new(conf);
let mut phases = vec![
DeleteDBRoleReferences,
ChangeSchemaPerms,
HandleAnonExtension,
];
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
info!("Adding DropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
phases.push(DropLogicalSubscriptions);
}
let fut = Self::apply_spec_sql_db(
spec.clone(),
conf,
ctx.clone(),
jwks_roles.clone(),
concurrency_token.clone(),
db,
phases,
);
Ok(tokio::spawn(fut))
})
.collect::<Vec<Result<_, anyhow::Error>>>();
for process in db_processes.into_iter() {
let handle = process?;
handle.await??;
}
let mut phases = vec![
HandleOtherExtensions,
HandleNeonExtension, // This step depends on CreateSchemaNeon
CreateAvailabilityCheck,
DropRoles,
];
// This step depends on CreateSchemaNeon
if spec.drop_subscriptions_before_start && !drop_subscriptions_done {
info!("Adding FinalizeDropLogicalSubscriptions phase because drop_subscriptions_before_start is set");
phases.push(FinalizeDropLogicalSubscriptions);
}
for phase in phases {
debug!("Applying phase {:?}", &phase);
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
phase,
|| async { Ok(&client) },
)
.await?;
}
Ok::<(), anyhow::Error>(())
})?;
Ok(())
}
/// Apply SQL migrations of the RunInEachDatabase phase.
///
/// May opt to not connect to databases that don't have any scheduled
/// operations. The function is concurrency-controlled with the provided
/// semaphore. The caller has to make sure the semaphore isn't exhausted.
async fn apply_spec_sql_db(
spec: Arc<ComputeSpec>,
conf: Arc<tokio_postgres::Config>,
ctx: Arc<tokio::sync::RwLock<MutableApplyContext>>,
jwks_roles: Arc<HashSet<String>>,
concurrency_token: Arc<tokio::sync::Semaphore>,
db: DB,
subphases: Vec<PerDatabasePhase>,
) -> Result<()> {
let _permit = concurrency_token.acquire().await?;
let mut client_conn = None;
for subphase in subphases {
apply_operations(
spec.clone(),
ctx.clone(),
jwks_roles.clone(),
RunInEachDatabase {
db: db.clone(),
subphase,
},
// Only connect if apply_operation actually wants a connection.
// It's quite possible this database doesn't need any queries,
// so by not connecting we save time and effort connecting to
// that database.
|| async {
if client_conn.is_none() {
let db_client = Self::get_maintenance_client(&conf).await?;
client_conn.replace(db_client);
}
let client = client_conn.as_ref().unwrap();
Ok(client)
},
)
.await?;
}
drop(client_conn);
Ok::<(), anyhow::Error>(())
}
/// Choose how many concurrent connections to use for applying the spec changes.
pub fn max_service_connections(
&self,
compute_state: &ComputeState,
spec: &ComputeSpec,
) -> usize {
// If the cluster is in Init state we don't have to deal with user connections,
// and can thus use all `max_connections` connection slots. However, that's generally not
// very efficient, so we generally still limit it to a smaller number.
if compute_state.status == ComputeStatus::Init {
// If the settings contain 'max_connections', use that as template
if let Some(config) = spec.cluster.settings.find("max_connections") {
config.parse::<usize>().ok()
} else {
// Otherwise, try to find the setting in the postgresql_conf string
spec.cluster
.postgresql_conf
.iter()
.flat_map(|conf| conf.split("\n"))
.filter_map(|line| {
if !line.contains("max_connections") {
return None;
}
let (key, value) = line.split_once("=")?;
let key = key
.trim_start_matches(char::is_whitespace)
.trim_end_matches(char::is_whitespace);
let value = value
.trim_start_matches(char::is_whitespace)
.trim_end_matches(char::is_whitespace);
if key != "max_connections" {
return None;
}
value.parse::<usize>().ok()
})
.next()
}
// If max_connections is present, use at most 1/3rd of that.
// When max_connections is lower than 30, try to use at least 10 connections, but
// never more than max_connections.
.map(|limit| match limit {
0..10 => limit,
10..30 => 10,
30.. => limit / 3,
})
// If we didn't find max_connections, default to 10 concurrent connections.
.unwrap_or(10)
} else {
// state == Running
// Because the cluster is already in the Running state, we should assume users are
// already connected to the cluster, and high concurrency could negatively
// impact user connectivity. Therefore, we can limit concurrency to the number of
// reserved superuser connections, which users wouldn't be able to use anyway.
spec.cluster
.settings
.find("superuser_reserved_connections")
.iter()
.filter_map(|val| val.parse::<usize>().ok())
.map(|val| if val > 1 { val - 1 } else { 1 })
.last()
.unwrap_or(3)
}
}
}
use crate::compute::construct_superuser_query;
use crate::pg_helpers::{DatabaseExt, Escaping, GenericOptionsSearch, RoleExt, escape_literal};
#[derive(Clone)]
pub enum DB {

View File

@@ -25,7 +25,7 @@ use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use nix::errno::Errno;
use nix::fcntl::{FcntlArg, FdFlag};
use nix::sys::signal::{Signal, kill};
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
use utils::pid_file::{self, PidFileRead};

View File

@@ -5,16 +5,7 @@
//! easier to work with locally. The python tests in `test_runner`
//! rely on `neon_local` to set up the environment for each test.
//!
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use anyhow::{anyhow, bail, Context, Result};
use clap::Parser;
use compute_api::spec::ComputeMode;
use control_plane::endpoint::ComputeControlPlane;
@@ -28,7 +19,7 @@ use control_plane::storage_controller::{
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
};
use control_plane::{broker, local_env};
use nix::fcntl::{FlockArg, flock};
use nix::fcntl::{flock, FlockArg};
use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
@@ -44,13 +35,23 @@ use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use std::time::Duration;
use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
use tokio::task::JoinSet;
use url::Host;
use utils::auth::{Claims, Scope};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::project_git_version;
use utils::{
auth::{Claims, Scope},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
project_git_version,
};
// Default id of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
@@ -920,9 +921,7 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
let init_conf: NeonLocalInitConf = if let Some(config_path) = &args.config {
// User (likely the Python test suite) provided a description of the environment.
if args.num_pageservers.is_some() {
bail!(
"Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead"
);
bail!("Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead");
}
// load and parse the file
let contents = std::fs::read_to_string(config_path).with_context(|| {
@@ -1316,14 +1315,10 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
match (mode, args.hot_standby) {
(ComputeMode::Static(_), true) => {
bail!(
"Cannot start a node in hot standby mode when it is already configured as a static replica"
)
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
}
(ComputeMode::Primary, true) => {
bail!(
"Cannot start a node as a hot standby replica, it is already configured as primary node"
)
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
}
_ => {}
}

View File

@@ -8,6 +8,7 @@
use std::time::Duration;
use anyhow::Context;
use camino::Utf8PathBuf;
use crate::{background_process, local_env};

View File

@@ -37,20 +37,27 @@
//! ```
//!
use std::collections::BTreeMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::net::TcpStream;
use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use anyhow::{Context, Result, anyhow, bail};
use anyhow::{anyhow, bail, Context, Result};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::{ComputeCtlConfig, ComputeStatus, ComputeStatusResponse};
use compute_api::spec::{
Cluster, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent, RemoteExtSpec, Role,
};
use nix::sys::signal::{Signal, kill};
use compute_api::responses::ComputeCtlConfig;
use compute_api::spec::Database;
use compute_api::spec::PgIdent;
use compute_api::spec::RemoteExtSpec;
use compute_api::spec::Role;
use nix::sys::signal::kill;
use nix::sys::signal::Signal;
use pageserver_api::shard::ShardStripeSize;
use reqwest::header::CONTENT_TYPE;
use serde::{Deserialize, Serialize};
@@ -62,6 +69,9 @@ use crate::local_env::LocalEnv;
use crate::postgresql_conf::PostgresConf;
use crate::storage_controller::StorageController;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
// contents of a endpoint.json file
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct EndpointConf {
@@ -227,9 +237,7 @@ impl ComputeControlPlane {
});
if let Some((key, _)) = duplicates.next() {
bail!(
"attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported."
);
bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported.");
}
}
Ok(())

View File

@@ -3,22 +3,28 @@
//! Now it also provides init method which acts like a stub for proper installation
//! script which will use local paths.
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Duration;
use std::{env, fs};
use anyhow::{bail, Context};
use anyhow::{Context, bail};
use clap::ValueEnum;
use postgres_backend::AuthType;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use utils::auth::{Claims, encode_from_key_file};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use std::collections::HashMap;
use std::env;
use std::fs;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Duration;
use utils::{
auth::{encode_from_key_file, Claims},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
};
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::pageserver::PageServerNode;
use crate::pageserver::PAGESERVER_REMOTE_STORAGE_DIR;
use crate::safekeeper::SafekeeperNode;
pub const DEFAULT_PG_VERSION: u32 = 16;
@@ -459,9 +465,7 @@ impl LocalEnv {
if old_timeline_id == &timeline_id {
Ok(())
} else {
bail!(
"branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}"
);
bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
}
} else {
existing_values.push((tenant_id, timeline_id));

View File

@@ -7,6 +7,7 @@
//! ```
//!
use std::collections::HashMap;
use std::io;
use std::io::Write;
use std::num::NonZeroU64;
@@ -14,19 +15,22 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use anyhow::{Context, bail};
use anyhow::{bail, Context};
use camino::Utf8PathBuf;
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use postgres_backend::AuthType;
use postgres_connection::{PgConnectionConfig, parse_host_port};
use postgres_connection::{parse_host_port, PgConnectionConfig};
use utils::auth::{Claims, Scope};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use utils::id::NodeId;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use crate::background_process;
use crate::local_env::{LocalEnv, NeonLocalInitPageserverConf, PageServerConf};
use crate::local_env::{NeonLocalInitPageserverConf, PageServerConf};
use crate::{background_process, local_env::LocalEnv};
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const PAGESERVER_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/pageserver";
@@ -77,11 +81,7 @@ impl PageServerNode {
&self,
conf: NeonLocalInitPageserverConf,
) -> anyhow::Result<toml_edit::DocumentMut> {
assert_eq!(
&PageServerConf::from(&conf),
&self.conf,
"during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully"
);
assert_eq!(&PageServerConf::from(&conf), &self.conf, "during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully");
// TODO(christian): instead of what we do here, create a pageserver_api::config::ConfigToml (PR #7656)

View File

@@ -1,6 +1,3 @@
use std::collections::HashMap;
use std::fmt;
///
/// Module for parsing postgresql.conf file.
///
@@ -9,6 +6,8 @@ use std::fmt;
/// funny stuff like include-directives or funny escaping.
use once_cell::sync::Lazy;
use regex::Regex;
use std::collections::HashMap;
use std::fmt;
/// In-memory representation of a postgresql.conf file
#[derive(Default, Debug)]

View File

@@ -14,15 +14,18 @@ use std::{io, result};
use anyhow::Context;
use camino::Utf8PathBuf;
use http_utils::error::HttpErrorBody;
use postgres_connection::PgConnectionConfig;
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use http_utils::error::HttpErrorBody;
use utils::auth::{Claims, Scope};
use utils::id::NodeId;
use crate::background_process;
use crate::local_env::{LocalEnv, SafekeeperConf};
use crate::{
background_process,
local_env::{LocalEnv, SafekeeperConf},
};
#[derive(Error, Debug)]
pub enum SafekeeperHttpError {

View File

@@ -1,39 +1,44 @@
use std::ffi::OsStr;
use std::fs;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::process::ExitStatus;
use std::str::FromStr;
use std::sync::OnceLock;
use std::time::{Duration, Instant};
use crate::{
background_process,
local_env::{LocalEnv, NeonStorageControllerConf},
};
use camino::{Utf8Path, Utf8PathBuf};
use hyper0::Uri;
use nix::unistd::Pid;
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest,
TenantShardMigrateResponse,
use pageserver_api::{
controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest,
TenantShardMigrateResponse,
},
models::{
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
},
shard::{ShardStripeSize, TenantShardId},
};
use pageserver_api::models::{
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
};
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use reqwest::Method;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
ffi::OsStr,
fs,
net::SocketAddr,
path::PathBuf,
process::ExitStatus,
str::FromStr,
sync::OnceLock,
time::{Duration, Instant},
};
use tokio::process::Command;
use tracing::instrument;
use url::Url;
use utils::auth::{Claims, Scope, encode_from_key_file};
use utils::id::{NodeId, TenantId};
use utils::{
auth::{encode_from_key_file, Claims, Scope},
id::{NodeId, TenantId},
};
use whoami::username;
use crate::background_process;
use crate::local_env::{LocalEnv, NeonStorageControllerConf};
pub struct StorageController {
env: LocalEnv,
private_key: Option<Vec<u8>>,
@@ -91,8 +96,7 @@ pub struct AttachHookRequest {
#[derive(Serialize, Deserialize)]
pub struct AttachHookResponse {
#[serde(rename = "gen")]
pub generation: Option<u32>,
pub gen: Option<u32>,
}
#[derive(Serialize, Deserialize)]
@@ -775,7 +779,7 @@ impl StorageController {
)
.await?;
Ok(response.generation)
Ok(response.gen)
}
#[instrument(skip(self))]

View File

@@ -1,28 +1,35 @@
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::time::Duration;
use futures::StreamExt;
use std::{
collections::{HashMap, HashSet},
str::FromStr,
time::Duration,
};
use clap::{Parser, Subcommand};
use futures::StreamExt;
use pageserver_api::controller_api::{
AvailabilityZone, NodeAvailabilityWrapper, NodeConfigureRequest, NodeDescribeResponse,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShardResponse, PlacementPolicy,
SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy,
ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy, TenantCreateRequest,
TenantDescribeResponse, TenantPolicyRequest, TenantShardMigrateRequest,
TenantShardMigrateResponse,
use pageserver_api::{
controller_api::{
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy,
ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy,
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
ShardParameters, TenantConfig, TenantConfigPatchRequest, TenantConfigRequest,
TenantShardSplitRequest, TenantShardSplitResponse,
},
shard::{ShardStripeSize, TenantShardId},
};
use pageserver_api::models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary, ShardParameters,
TenantConfig, TenantConfigPatchRequest, TenantConfigRequest, TenantShardSplitRequest,
TenantShardSplitResponse,
};
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::{self};
use reqwest::{Method, StatusCode, Url};
use storage_controller_client::control_api::Client;
use utils::id::{NodeId, TenantId, TimelineId};
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use storage_controller_client::control_api::Client;
#[derive(Subcommand, Debug)]
enum Command {
/// Register a pageserver with the storage controller. This shouldn't usually be necessary,
@@ -914,9 +921,7 @@ async fn main() -> anyhow::Result<()> {
}
Command::TenantDrop { tenant_id, unclean } => {
if !unclean {
anyhow::bail!(
"This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed."
)
anyhow::bail!("This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed.")
}
storcon_client
.dispatch::<(), ()>(
@@ -928,9 +933,7 @@ async fn main() -> anyhow::Result<()> {
}
Command::NodeDrop { node_id, unclean } => {
if !unclean {
anyhow::bail!(
"This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed."
)
anyhow::bail!("This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed.")
}
storcon_client
.dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)

View File

@@ -7,7 +7,7 @@ index f255fe6..0a0fa65 100644
GENERATED_SCHEDULE_DEPS = $(TB_DIR)/all_tests $(TB_DIR)/exclude_tests
REGRESS = --schedule $(TB_DIR)/run.sch # Set this again just to be safe
-REGRESS_OPTS = --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
+REGRESS_OPTS = --use-existing --dbname=contrib_regression --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
+REGRESS_OPTS = --use-existing --dbname=pgtap_regression --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
SETUP_SCH = test/schedule/main.sch # schedule to use for test setup; this can be forcibly changed by some targets!
IGNORE_TESTS = $(notdir $(EXCLUDE_TEST_FILES:.sql=))
PARALLEL_TESTS = $(filter-out $(IGNORE_TESTS),$(filter-out $(SERIAL_TESTS),$(ALL_TESTS)))

View File

@@ -12,7 +12,6 @@ if [ -z ${OLD_COMPUTE_TAG+x} ] || [ -z ${NEW_COMPUTE_TAG+x} ] || [ -z "${OLD_COM
fi
export PG_VERSION=${PG_VERSION:-16}
export PG_TEST_VERSION=${PG_VERSION}
# Waits for compute node is ready
function wait_for_ready {
TIME=0
while ! docker compose logs compute_is_ready | grep -q "accepting connections" && [ ${TIME} -le 300 ] ; do
@@ -24,45 +23,11 @@ function wait_for_ready {
exit 2
fi
}
# Creates extensions. Gets a string with space-separated extensions as a parameter
function create_extensions() {
for ext in ${1}; do
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext} CASCADE"
done
}
# Creates a new timeline. Gets the parent ID and an extension name as parameters.
# Saves the timeline ID in the variable EXT_TIMELINE
function create_timeline() {
generate_id new_timeline_id
PARAMS=(
-sbf
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${new_timeline_id}\", \"pg_version\": ${PG_VERSION}, \"ancestor_timeline_id\": \"${1}\"}"
"http://127.0.0.1:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
EXT_TIMELINE[${2}]=${new_timeline_id}
}
# Checks if the timeline ID of the compute node is expected. Gets the timeline ID as a parameter
function check_timeline() {
TID=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
if [ "${TID}" != "${1}" ]; then
echo Timeline mismatch
exit 1
fi
}
# Restarts the compute node with the required compute tag and timeline.
# Accepts the tag for the compute node and the timeline as parameters.
function restart_compute() {
docker compose down compute compute_is_ready
COMPUTE_TAG=${1} TAG=${OLD_COMPUTE_TAG} TENANT_ID=${tenant_id} TIMELINE_ID=${2} docker compose up --quiet-pull -d --build compute compute_is_ready
wait_for_ready
check_timeline ${2}
}
declare -A EXT_TIMELINE
EXTENSIONS='[
{"extname": "plv8", "extdir": "plv8-src"},
{"extname": "vector", "extdir": "pgvector-src"},
@@ -82,7 +47,7 @@ EXTENSIONS='[
{"extname": "pg_repack", "extdir": "pg_repack-src"}
]'
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
TAG=${NEW_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d
COMPUTE_TAG=${NEW_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d
wait_for_ready
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
@@ -90,14 +55,12 @@ create_extensions "${EXTNAMES}"
query="select json_object_agg(extname,extversion) from pg_extension where extname in ('${EXTNAMES// /\',\'}')"
new_vers=$(docker compose exec neon-test-extensions psql -Aqt -d contrib_regression -c "$query")
docker compose --profile test-extensions down
TAG=${OLD_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d --force-recreate
COMPUTE_TAG=${OLD_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d --force-recreate
wait_for_ready
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
tenant_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.tenant_id")
EXT_TIMELINE["main"]=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
create_timeline "${EXT_TIMELINE["main"]}" init
restart_compute "${OLD_COMPUTE_TAG}" "${EXT_TIMELINE["init"]}"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE pgtap_regression"
docker compose exec neon-test-extensions psql -d pgtap_regression -c "CREATE EXTENSION pgtap"
create_extensions "${EXTNAMES}"
if [ "${FORCE_ALL_UPGRADE_TESTS:-false}" = true ]; then
exts="${EXTNAMES}"
@@ -108,13 +71,29 @@ fi
if [ -z "${exts}" ]; then
echo "No extensions were upgraded"
else
tenant_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.tenant_id")
timeline_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
for ext in ${exts}; do
echo Testing ${ext}...
create_timeline "${EXT_TIMELINE["main"]}" ${ext}
EXTDIR=$(echo ${EXTENSIONS} | jq -r '.[] | select(.extname=="'${ext}'") | .extdir')
restart_compute "${OLD_COMPUTE_TAG}" "${EXT_TIMELINE[${ext}]}"
docker compose exec neon-test-extensions psql -d contrib_regression -c "CREATE EXTENSION ${ext} CASCADE"
restart_compute "${NEW_COMPUTE_TAG}" "${EXT_TIMELINE[${ext}]}"
generate_id new_timeline_id
PARAMS=(
-sbf
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${new_timeline_id}\", \"pg_version\": ${PG_VERSION}, \"ancestor_timeline_id\": \"${timeline_id}\"}"
"http://127.0.0.1:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
TENANT_ID=${tenant_id} TIMELINE_ID=${new_timeline_id} COMPUTE_TAG=${OLD_COMPUTE_TAG} docker compose down compute compute_is_ready
COMPUTE_TAG=${NEW_COMPUTE_TAG} TENANT_ID=${tenant_id} TIMELINE_ID=${new_timeline_id} docker compose up --quiet-pull -d --build compute compute_is_ready
wait_for_ready
TID=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
if [ ${TID} != ${new_timeline_id} ]; then
echo Timeline mismatch
exit 1
fi
docker compose exec neon-test-extensions psql -d contrib_regression -c "\dx ${ext}"
if ! docker compose exec neon-test-extensions sh -c /ext-src/${EXTDIR}/test-upgrade.sh; then
docker compose exec neon-test-extensions cat /ext-src/${EXTDIR}/regression.diffs

View File

@@ -1,7 +1,7 @@
[package]
name = "consumption_metrics"
version = "0.1.0"
edition = "2024"
edition = "2021"
license = "Apache-2.0"
[dependencies]

View File

@@ -1,5 +1,4 @@
use std::collections::VecDeque;
use std::sync::Arc;
use std::{collections::VecDeque, sync::Arc};
use parking_lot::{Mutex, MutexGuard};

View File

@@ -1,7 +1,11 @@
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, Ordering};
use std::sync::{Arc, OnceLock, mpsc};
use std::thread::JoinHandle;
use std::{
panic::AssertUnwindSafe,
sync::{
atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
mpsc, Arc, OnceLock,
},
thread::JoinHandle,
};
use tracing::{debug, error, trace};

View File

@@ -1,19 +1,26 @@
use std::cmp::Ordering;
use std::collections::{BinaryHeap, VecDeque};
use std::fmt::{self, Debug};
use std::ops::DerefMut;
use std::sync::{Arc, mpsc};
use std::{
cmp::Ordering,
collections::{BinaryHeap, VecDeque},
fmt::{self, Debug},
ops::DerefMut,
sync::{mpsc, Arc},
};
use parking_lot::lock_api::{MappedMutexGuard, MutexGuard};
use parking_lot::{Mutex, RawMutex};
use parking_lot::{
lock_api::{MappedMutexGuard, MutexGuard},
Mutex, RawMutex,
};
use rand::rngs::StdRng;
use tracing::debug;
use super::chan::Chan;
use super::proto::AnyMessage;
use crate::executor::{self, ThreadContext};
use crate::options::NetworkOptions;
use crate::proto::{NetEvent, NodeEvent};
use crate::{
executor::{self, ThreadContext},
options::NetworkOptions,
proto::NetEvent,
proto::NodeEvent,
};
use super::{chan::Chan, proto::AnyMessage};
pub struct NetworkTask {
options: Arc<NetworkOptions>,

View File

@@ -2,11 +2,14 @@ use std::sync::Arc;
use rand::Rng;
use super::chan::Chan;
use super::network::TCP;
use super::world::{Node, NodeId, World};
use crate::proto::NodeEvent;
use super::{
chan::Chan,
network::TCP,
world::{Node, NodeId, World},
};
/// Abstraction with all functions (aka syscalls) available to the node.
#[derive(Clone)]
pub struct NodeOs {

View File

@@ -1,5 +1,4 @@
use rand::Rng;
use rand::rngs::StdRng;
use rand::{rngs::StdRng, Rng};
/// Describes random delays and failures. Delay will be uniformly distributed in [min, max].
/// Connection failure will occur with the probablity fail_prob.

View File

@@ -3,8 +3,7 @@ use std::fmt::Debug;
use bytes::Bytes;
use utils::lsn::Lsn;
use crate::network::TCP;
use crate::world::NodeId;
use crate::{network::TCP, world::NodeId};
/// Internal node events.
#[derive(Debug)]

View File

@@ -1,8 +1,12 @@
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64};
use std::{
cmp::Ordering,
collections::BinaryHeap,
ops::DerefMut,
sync::{
atomic::{AtomicU32, AtomicU64},
Arc,
},
};
use parking_lot::Mutex;
use tracing::trace;

View File

@@ -1,18 +1,19 @@
use std::ops::DerefMut;
use std::sync::{Arc, mpsc};
use parking_lot::Mutex;
use rand::SeedableRng;
use rand::rngs::StdRng;
use rand::{rngs::StdRng, SeedableRng};
use std::{
ops::DerefMut,
sync::{mpsc, Arc},
};
use super::chan::Chan;
use super::network::TCP;
use super::node_os::NodeOs;
use crate::executor::{ExternalHandle, Runtime};
use crate::network::NetworkTask;
use crate::options::NetworkOptions;
use crate::proto::{NodeEvent, SimEvent};
use crate::time::Timing;
use crate::{
executor::{ExternalHandle, Runtime},
network::NetworkTask,
options::NetworkOptions,
proto::{NodeEvent, SimEvent},
time::Timing,
};
use super::{chan::Chan, network::TCP, node_os::NodeOs};
pub type NodeId = u32;

View File

@@ -1,15 +1,14 @@
//! Simple test to verify that simulator is working.
#[cfg(test)]
mod reliable_copy_test {
use std::sync::Arc;
use anyhow::Result;
use desim::executor::{self, PollSome};
use desim::node_os::NodeOs;
use desim::options::{Delay, NetworkOptions};
use desim::proto::{AnyMessage, NetEvent, NodeEvent, ReplCell};
use desim::proto::{NetEvent, NodeEvent, ReplCell};
use desim::world::{NodeId, World};
use desim::{node_os::NodeOs, proto::AnyMessage};
use parking_lot::Mutex;
use std::sync::Arc;
use tracing::info;
/// Disk storage trait and implementation.

View File

@@ -1,29 +1,29 @@
use std::future::Future;
use std::io::Write as _;
use std::str::FromStr;
use std::time::Duration;
use ::pprof::ProfilerGuardBuilder;
use crate::error::{api_error_handler, route_error_handler, ApiError};
use crate::pprof;
use crate::request::{get_query_param, parse_query_param};
use ::pprof::protos::Message as _;
use anyhow::{Context, anyhow};
use ::pprof::ProfilerGuardBuilder;
use anyhow::{anyhow, Context};
use bytes::{Bytes, BytesMut};
use hyper::header::{AUTHORIZATION, CONTENT_DISPOSITION, CONTENT_TYPE, HeaderName};
use hyper::header::{HeaderName, AUTHORIZATION, CONTENT_DISPOSITION};
use hyper::http::HeaderValue;
use hyper::{Body, Method, Request, Response};
use metrics::{Encoder, IntCounter, TextEncoder, register_int_counter};
use hyper::Method;
use hyper::{header::CONTENT_TYPE, Body, Request, Response};
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy;
use regex::Regex;
use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use tokio::sync::{Mutex, Notify, mpsc};
use tokio::sync::{mpsc, Mutex, Notify};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use tracing::{Instrument, debug, info, info_span, warn};
use tracing::{debug, info, info_span, warn, Instrument};
use utils::auth::{AuthError, Claims, SwappableJwtAuth};
use crate::error::{ApiError, api_error_handler, route_error_handler};
use crate::pprof;
use crate::request::{get_query_param, parse_query_param};
use std::future::Future;
use std::io::Write as _;
use std::str::FromStr;
use std::time::Duration;
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
@@ -375,7 +375,7 @@ pub async fn profile_cpu_handler(req: Request<Body>) -> Result<Response<Body>, A
Err(_) => {
return Err(ApiError::Conflict(
"profiler already running (use ?force=true to cancel it)".into(),
));
))
}
}
tokio::time::sleep(Duration::from_millis(1)).await; // don't busy-wait
@@ -539,8 +539,8 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
}
}
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
-> Middleware<B, ApiError> {
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
) -> Middleware<B, ApiError> {
Middleware::pre(move |req| async move {
let request_id = match req.headers().get(&X_REQUEST_ID_HEADER) {
Some(request_id) => request_id
@@ -664,7 +664,7 @@ pub fn auth_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
None => {
return Err(ApiError::Unauthorized(
"missing authorization header".to_string(),
));
))
}
}
}
@@ -717,13 +717,11 @@ pub fn check_permission_with(
#[cfg(test)]
mod tests {
use std::future::poll_fn;
use std::net::{IpAddr, SocketAddr};
use super::*;
use hyper::service::Service;
use routerify::RequestServiceBuilder;
use super::*;
use std::future::poll_fn;
use std::net::{IpAddr, SocketAddr};
#[tokio::test]
async fn test_request_id_returned() {

View File

@@ -1,10 +1,10 @@
use hyper::{header, Body, Response, StatusCode};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::error::Error as StdError;
use hyper::{Body, Response, StatusCode, header};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::{error, info, warn};
use utils::auth::AuthError;
#[derive(Debug, Error)]

View File

@@ -1,10 +1,11 @@
use crate::error::ApiError;
use crate::json::{json_request, json_response};
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use utils::failpoint_support::apply_failpoint;
use crate::error::ApiError;
use crate::json::{json_request, json_response};
use utils::failpoint_support::apply_failpoint;
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;

View File

@@ -1,6 +1,6 @@
use anyhow::Context;
use bytes::Buf;
use hyper::{Body, Request, Response, StatusCode, header};
use hyper::{header, Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use super::error::ApiError;

View File

@@ -9,4 +9,4 @@ extern crate hyper0 as hyper;
/// Current fast way to apply simple http routing in various Neon binaries.
/// Re-exported for sake of uniform approach, that could be later replaced with better alternatives, if needed.
pub use routerify::{RouterBuilder, RouterService, ext::RequestExt};
pub use routerify::{ext::RequestExt, RouterBuilder, RouterService};

View File

@@ -1,15 +1,15 @@
use anyhow::bail;
use flate2::write::{GzDecoder, GzEncoder};
use flate2::Compression;
use itertools::Itertools as _;
use pprof::protos::{Function, Line, Location, Message as _, Profile};
use regex::Regex;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::ffi::c_void;
use std::io::Write as _;
use anyhow::bail;
use flate2::Compression;
use flate2::write::{GzDecoder, GzEncoder};
use itertools::Itertools as _;
use pprof::protos::{Function, Line, Location, Message as _, Profile};
use regex::Regex;
/// Decodes a gzip-compressed Protobuf-encoded pprof profile.
pub fn decode(bytes: &[u8]) -> anyhow::Result<Profile> {
let mut gz = GzDecoder::new(Vec::new());

View File

@@ -1,13 +1,10 @@
use core::fmt;
use std::borrow::Cow;
use std::str::FromStr;
use anyhow::anyhow;
use hyper::body::HttpBody;
use hyper::{Body, Request};
use routerify::ext::RequestExt;
use std::{borrow::Cow, str::FromStr};
use super::error::ApiError;
use anyhow::anyhow;
use hyper::{body::HttpBody, Body, Request};
use routerify::ext::RequestExt;
pub fn get_request_param<'a>(
request: &'a Request<Body>,

View File

@@ -6,15 +6,17 @@
//! Probabilistic cardinality estimators, such as the HyperLogLog algorithm,
//! use significantly less memory than this, but can only approximate the cardinality.
use std::hash::{BuildHasher, BuildHasherDefault, Hash};
use std::sync::atomic::AtomicU8;
use std::{
hash::{BuildHasher, BuildHasherDefault, Hash},
sync::atomic::AtomicU8,
};
use measured::LabelGroup;
use measured::label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor};
use measured::metric::counter::CounterState;
use measured::metric::name::MetricNameEncoder;
use measured::metric::{Metric, MetricType, MetricVec};
use measured::text::TextEncoder;
use measured::{
label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor},
metric::{counter::CounterState, name::MetricNameEncoder, Metric, MetricType, MetricVec},
text::TextEncoder,
LabelGroup,
};
use twox_hash::xxh3;
/// Create an [`HyperLogLogVec`] and registers to default registry.
@@ -25,7 +27,9 @@ macro_rules! register_hll_vec {
$crate::register(Box::new(hll_vec.clone())).map(|_| hll_vec)
}};
($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{ $crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES) }};
($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{
$crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
}};
}
/// Create an [`HyperLogLog`] and registers to default registry.
@@ -36,7 +40,9 @@ macro_rules! register_hll {
$crate::register(Box::new(hll.clone())).map(|_| hll)
}};
($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{ $crate::register_hll!($N, $crate::opts!($NAME, $HELP)) }};
($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{
$crate::register_hll!($N, $crate::opts!($NAME, $HELP))
}};
}
/// HLL is a probabilistic cardinality measure.
@@ -189,10 +195,8 @@ impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEnc
mod tests {
use std::collections::HashSet;
use measured::FixedCardinalityLabel;
use measured::label::StaticLabelSet;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use measured::{label::StaticLabelSet, FixedCardinalityLabel};
use rand::{rngs::StdRng, Rng, SeedableRng};
use rand_distr::{Distribution, Zipf};
use crate::HyperLogLogVec;

View File

@@ -1,10 +1,9 @@
//! A timestamp captured at process startup to identify restarts of the process, e.g., in logs and metrics.
use std::fmt::Display;
use chrono::Utc;
use super::register_uint_gauge;
use std::fmt::Display;
pub struct LaunchTimestamp(chrono::DateTime<Utc>);

View File

@@ -4,26 +4,38 @@
//! a default registry.
#![deny(clippy::undocumented_unsafe_blocks)]
use measured::label::{LabelGroupSet, LabelGroupVisitor, LabelName, NoLabels};
use measured::metric::counter::CounterState;
use measured::metric::gauge::GaugeState;
use measured::metric::group::Encoding;
use measured::metric::name::{MetricName, MetricNameEncoder};
use measured::metric::{MetricEncoding, MetricFamilyEncoding};
use measured::{FixedCardinalityLabel, LabelGroup, MetricGroup};
use measured::{
label::{LabelGroupSet, LabelGroupVisitor, LabelName, NoLabels},
metric::{
counter::CounterState,
gauge::GaugeState,
group::Encoding,
name::{MetricName, MetricNameEncoder},
MetricEncoding, MetricFamilyEncoding,
},
FixedCardinalityLabel, LabelGroup, MetricGroup,
};
use once_cell::sync::Lazy;
use prometheus::Registry;
use prometheus::core::{
Atomic, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec,
};
pub use prometheus::local::LocalHistogram;
pub use prometheus::{
Counter, CounterVec, Encoder, Error, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, TextEncoder, core, default_registry, exponential_buckets,
linear_buckets, opts, proto, register, register_counter_vec, register_gauge,
register_gauge_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
};
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::Error;
use prometheus::Registry;
pub use prometheus::{core, default_registry, proto};
pub use prometheus::{exponential_buckets, linear_buckets};
pub use prometheus::{register_counter_vec, Counter, CounterVec};
pub use prometheus::{register_gauge, Gauge};
pub use prometheus::{register_gauge_vec, GaugeVec};
pub use prometheus::{register_histogram, Histogram};
pub use prometheus::{register_histogram_vec, HistogramVec};
pub use prometheus::{register_int_counter, IntCounter};
pub use prometheus::{register_int_counter_vec, IntCounterVec};
pub use prometheus::{register_int_gauge, IntGauge};
pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
pub use prometheus::{Encoder, TextEncoder};
pub mod launch_timestamp;
mod wrappers;

View File

@@ -4,28 +4,28 @@
//! is rather narrow, but we can extend it once required.
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use std::future::Future;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::os::fd::{AsRawFd, RawFd};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{Poll, ready};
use std::{fmt, io};
use anyhow::Context;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::os::fd::AsRawFd;
use std::os::fd::RawFd;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{ready, Poll};
use std::{fmt, io};
use std::{future::Future, str::FromStr};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
use pq_proto::framed::{ConnectionError, Framed, FramedReader, FramedWriter};
use pq_proto::{
BeMessage, FeMessage, FeStartupPacket, ProtocolError, SQLSTATE_ADMIN_SHUTDOWN,
SQLSTATE_INTERNAL_ERROR, SQLSTATE_SUCCESSFUL_COMPLETION,
};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
/// An error, occurred during query processing:
/// either during the connection ([`ConnectionError`]) or before/after it.
@@ -746,7 +746,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
match e {
QueryError::Shutdown => return Ok(ProcessMsgResult::Break),
QueryError::SimulatedConnectionError => {
return Err(QueryError::SimulatedConnectionError);
return Err(QueryError::SimulatedConnectionError)
}
err @ QueryError::Reconnect => {
// Instruct the client to reconnect, stop processing messages
@@ -1020,9 +1020,7 @@ fn log_query_error(query: &str, e: &QueryError) {
}
}
QueryError::Disconnected(other_connection_error) => {
error!(
"query handler for '{query}' failed with connection error: {other_connection_error:?}"
)
error!("query handler for '{query}' failed with connection error: {other_connection_error:?}")
}
QueryError::SimulatedConnectionError => {
error!("query handler for query '{query}' failed due to a simulated connection error")

View File

@@ -1,11 +1,10 @@
use std::io::Cursor;
use std::sync::Arc;
/// Test postgres_backend_async with tokio_postgres
use once_cell::sync::Lazy;
use postgres_backend::{AuthType, Handler, PostgresBackend, QueryError};
use pq_proto::{BeMessage, RowDescriptor};
use rustls::crypto::ring;
use std::io::Cursor;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio_postgres::config::SslMode;

View File

@@ -1,10 +1,9 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use anyhow::{bail, Context};
use itertools::Itertools;
use std::borrow::Cow;
use std::fmt;
use anyhow::{Context, bail};
use itertools::Itertools;
use url::Host;
/// Parses a string of format either `host:port` or `host` into a corresponding pair.
@@ -30,9 +29,8 @@ pub fn parse_host_port<S: AsRef<str>>(host_port: S) -> Result<(Host, Option<u16>
#[cfg(test)]
mod tests_parse_host_port {
use url::Host;
use crate::parse_host_port;
use url::Host;
#[test]
fn test_normal() {
@@ -209,11 +207,10 @@ impl fmt::Debug for PgConnectionConfig {
#[cfg(test)]
mod tests_pg_connection_config {
use crate::PgConnectionConfig;
use once_cell::sync::Lazy;
use url::Host;
use crate::PgConnectionConfig;
static STUB_HOST: Lazy<Host> = Lazy::new(|| Host::Domain("stub.host.example".to_owned()));
#[test]

View File

@@ -1,6 +1,6 @@
use std::ffi::CStr;
use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use postgres_ffi::v17::wal_generator::LogicalMessageGenerator;
use postgres_ffi::v17::waldecoder_handler::WalStreamDecoderHandler;
use postgres_ffi::waldecoder::WalStreamDecoder;

View File

@@ -4,7 +4,7 @@ use std::env;
use std::path::PathBuf;
use std::process::Command;
use anyhow::{Context, anyhow};
use anyhow::{anyhow, Context};
use bindgen::callbacks::{DeriveInfo, ParseCallbacks};
#[derive(Debug)]

View File

@@ -21,9 +21,7 @@ macro_rules! postgres_ffi {
pub mod bindings {
// bindgen generates bindings for a lot of stuff we don't need
#![allow(dead_code)]
#![allow(unsafe_op_in_unsafe_fn)]
#![allow(clippy::undocumented_unsafe_blocks)]
#![allow(clippy::ptr_offset_with_cast)]
use serde::{Deserialize, Serialize};
include!(concat!(
@@ -45,7 +43,8 @@ macro_rules! postgres_ffi {
pub const PG_MAJORVERSION: &str = stringify!($version);
// Re-export some symbols from bindings
pub use bindings::{CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, XLogRecord};
pub use bindings::DBState_DB_SHUTDOWNED;
pub use bindings::{CheckPoint, ControlFileData, XLogRecord};
pub const ZERO_CHECKPOINT: bytes::Bytes =
bytes::Bytes::from_static(&[0u8; xlog_utils::SIZEOF_CHECKPOINT]);
@@ -222,17 +221,21 @@ pub mod relfile_utils;
pub mod walrecord;
// Export some widely used datatypes that are unlikely to change across Postgres versions
pub use v14::bindings::{
BlockNumber, CheckPoint, ControlFileData, MultiXactId, OffsetNumber, Oid, PageHeaderData,
RepOriginId, TimeLineID, TimestampTz, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32,
uint64,
};
pub use v14::bindings::RepOriginId;
pub use v14::bindings::{uint32, uint64, Oid};
pub use v14::bindings::{BlockNumber, OffsetNumber};
pub use v14::bindings::{MultiXactId, TransactionId};
pub use v14::bindings::{TimeLineID, TimestampTz, XLogRecPtr, XLogSegNo};
// Likewise for these, although the assumption that these don't change is a little more iffy.
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
pub use v14::bindings::{PageHeaderData, XLogRecord};
pub use v14::xlog_utils::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
pub use v14::bindings::{CheckPoint, ControlFileData};
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
@@ -243,11 +246,13 @@ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
// Export some version independent functions that are used outside of this mod
pub use v14::xlog_utils::encode_logical_message;
pub use v14::xlog_utils::get_current_timestamp;
pub use v14::xlog_utils::to_pg_timestamp;
pub use v14::xlog_utils::try_from_pg_timestamp;
pub use v14::xlog_utils::XLogFileName;
pub use v14::bindings::DBState_DB_SHUTDOWNED;
pub use v14::xlog_utils::{
XLogFileName, encode_logical_message, get_current_timestamp, to_pg_timestamp,
try_from_pg_timestamp,
};
pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> bool {
dispatch_pgversion!(version, pgv::bindings::bkpimg_is_compressed(bimg_info))
@@ -350,9 +355,8 @@ pub fn fsm_logical_to_physical(addr: BlockNumber) -> BlockNumber {
}
pub mod waldecoder {
use std::num::NonZeroU32;
use bytes::{Buf, Bytes, BytesMut};
use std::num::NonZeroU32;
use thiserror::Error;
use utils::lsn::Lsn;

View File

@@ -9,7 +9,8 @@
//! comments on them.
//!
use crate::{BLCKSZ, PageHeaderData};
use crate::PageHeaderData;
use crate::BLCKSZ;
//
// From pg_tablespace_d.h

View File

@@ -3,16 +3,18 @@
//!
//! TODO: Generate separate types for each supported PG version
use crate::pg_constants;
use crate::XLogRecord;
use crate::{
BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId, TimestampTz,
TransactionId,
};
use crate::{BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD};
use bytes::{Buf, Bytes};
use serde::{Deserialize, Serialize};
use utils::bin_ser::DeserializeError;
use utils::lsn::Lsn;
use crate::{
BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId,
TimestampTz, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
};
#[repr(C)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlMultiXactCreate {
@@ -506,9 +508,8 @@ pub fn decode_wal_record(
}
pub mod v14 {
use bytes::{Buf, Bytes};
use crate::{OffsetNumber, TransactionId};
use bytes::{Buf, Bytes};
#[repr(C)]
#[derive(Debug)]
@@ -677,10 +678,9 @@ pub mod v15 {
}
pub mod v16 {
use bytes::{Buf, Bytes};
pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert, XlParameterChange};
use crate::{OffsetNumber, TransactionId};
use bytes::{Buf, Bytes};
pub struct XlHeapDelete {
pub xmax: TransactionId,
@@ -746,9 +746,8 @@ pub mod v16 {
/* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
pub mod rm_neon {
use bytes::{Buf, Bytes};
use crate::{OffsetNumber, TransactionId};
use bytes::{Buf, Bytes};
#[repr(C)]
#[derive(Debug)]
@@ -859,14 +858,14 @@ pub mod v16 {
}
pub mod v17 {
pub use super::v14::XlHeapLockUpdated;
pub use crate::{TimeLineID, TimestampTz};
use bytes::{Buf, Bytes};
pub use super::v14::XlHeapLockUpdated;
pub use super::v16::rm_neon;
pub use super::v16::{
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
rm_neon,
};
pub use crate::{TimeLineID, TimestampTz};
#[repr(C)]
#[derive(Debug)]

View File

@@ -1,9 +1,7 @@
use std::path::PathBuf;
use std::str::FromStr;
use anyhow::*;
use clap::{Arg, ArgMatches, Command, value_parser};
use clap::{value_parser, Arg, ArgMatches, Command};
use postgres::Client;
use std::{path::PathBuf, str::FromStr};
use wal_craft::*;
fn main() -> Result<()> {

View File

@@ -1,18 +1,17 @@
use anyhow::{bail, ensure};
use camino_tempfile::{tempdir, Utf8TempDir};
use log::*;
use postgres::types::PgLsn;
use postgres::Client;
use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
use postgres_ffi::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
use anyhow::{bail, ensure};
use camino_tempfile::{Utf8TempDir, tempdir};
use log::*;
use postgres::Client;
use postgres::types::PgLsn;
use postgres_ffi::{
WAL_SEGMENT_SIZE, XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD,
XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
macro_rules! xlog_utils_test {
($version:ident) => {
#[path = "."]

View File

@@ -10,10 +10,11 @@
//! calls.
//!
//! [Box]: https://docs.rs/futures-util/0.3.26/src/futures_util/lock/bilock.rs.html#107
use std::future::Future;
use std::io::{self, ErrorKind};
use bytes::{Buf, BytesMut};
use std::{
future::Future,
io::{self, ErrorKind},
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
use crate::{BeMessage, FeMessage, FeStartupPacket, ProtocolError};

View File

@@ -5,15 +5,14 @@
pub mod framed;
use std::borrow::Cow;
use std::{fmt, io, str};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, fmt, io, str};
// re-export for use in utils pageserver_feedback.rs
pub use postgres_protocol::PG_EPOCH;
use serde::{Deserialize, Serialize};
pub type Oid = u32;
pub type SystemId = u64;
@@ -207,8 +206,8 @@ use rand::distributions::{Distribution, Standard};
impl Distribution<CancelKeyData> for Standard {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> CancelKeyData {
CancelKeyData {
backend_pid: rng.r#gen(),
cancel_key: rng.r#gen(),
backend_pid: rng.gen(),
cancel_key: rng.gen(),
}
}
}
@@ -1036,7 +1035,7 @@ impl BeMessage<'_> {
buf.put_u8(b'd');
write_body(buf, |buf| {
buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol
// dependency
// dependency
buf.put_u64(rec.streaming_lsn);
buf.put_u64(rec.commit_lsn);
buf.put_slice(rec.data);

View File

@@ -130,7 +130,11 @@ impl StorageModel {
break;
}
}
if possible { Some(snapshot_later) } else { None }
if possible {
Some(snapshot_later)
} else {
None
}
} else {
None
};

View File

@@ -76,10 +76,7 @@ pub fn draw_svg(
let mut result = String::new();
writeln!(
result,
"<svg xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\" height=\"300\" width=\"500\">"
)?;
writeln!(result, "<svg xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\" height=\"300\" width=\"500\">")?;
draw.calculate_svg_layout();

View File

@@ -1,8 +1,8 @@
//! Tracing wrapper for Hyper HTTP server
use hyper0::HeaderMap;
use hyper0::{Body, Request, Response};
use std::future::Future;
use hyper0::{Body, HeaderMap, Request, Response};
use tracing::Instrument;
use tracing_opentelemetry::OpenTelemetrySpanExt;

View File

@@ -36,11 +36,11 @@
pub mod http;
use opentelemetry::KeyValue;
use opentelemetry::trace::TracerProvider;
use opentelemetry::KeyValue;
use tracing::Subscriber;
use tracing_subscriber::Layer;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
/// Set up OpenTelemetry exporter, using configuration from environment variables.
///

View File

@@ -1,6 +1,6 @@
use std::time::Duration;
use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use utils::id;
use utils::logging::log_slow;

View File

@@ -1,15 +1,12 @@
// For details about authentication see docs/authentication.md
use std::borrow::Cow;
use std::fmt::Display;
use std::fs;
use std::sync::Arc;
use arc_swap::ArcSwap;
use std::{borrow::Cow, fmt::Display, fs, sync::Arc};
use anyhow::Result;
use arc_swap::ArcSwap;
use camino::Utf8Path;
use jsonwebtoken::{
Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, decode, encode,
decode, encode, Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation,
};
use serde::{Deserialize, Serialize};
@@ -132,9 +129,7 @@ impl JwtAuth {
anyhow::bail!("path is neither a directory or a file")
};
if decoding_keys.is_empty() {
anyhow::bail!(
"Configured for JWT auth with zero decoding keys. All JWT gated requests would be rejected."
);
anyhow::bail!("Configured for JWT auth with zero decoding keys. All JWT gated requests would be rejected.");
}
Ok(Self::new(decoding_keys))
}
@@ -180,9 +175,8 @@ pub fn encode_from_key_file(claims: &Claims, key_data: &[u8]) -> Result<String>
#[cfg(test)]
mod tests {
use std::str::FromStr;
use super::*;
use std::str::FromStr;
// Generated with:
//
@@ -221,9 +215,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let encoded_eddsa = "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJzY29wZSI6InRlbmFudCIsInRlbmFudF9pZCI6IjNkMWY3NTk1YjQ2ODIzMDMwNGUwYjczY2VjYmNiMDgxIiwiaXNzIjoibmVvbi5jb250cm9scGxhbmUiLCJpYXQiOjE2Nzg0NDI0Nzl9.rNheBnluMJNgXzSTTJoTNIGy4P_qe0JUHl_nVEGuDCTgHOThPVr552EnmKccrCKquPeW3c2YUk0Y9Oh4KyASAw";
// Check it can be validated with the public key
let auth = JwtAuth::new(vec![
DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(),
]);
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap()]);
let claims_from_token = auth.decode(encoded_eddsa).unwrap().claims;
assert_eq!(claims_from_token, expected_claims);
}
@@ -238,9 +230,7 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let encoded = encode_from_key_file(&claims, TEST_PRIV_KEY_ED25519).unwrap();
// decode it back
let auth = JwtAuth::new(vec![
DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(),
]);
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap()]);
let decoded = auth.decode(&encoded).unwrap();
assert_eq!(decoded.claims, claims);

View File

@@ -121,11 +121,9 @@ where
#[cfg(test)]
mod tests {
use std::io;
use tokio::sync::Mutex;
use super::*;
use std::io;
use tokio::sync::Mutex;
#[test]
fn backoff_defaults_produce_growing_backoff_sequence() {

View File

@@ -13,11 +13,9 @@
#![warn(missing_docs)]
use std::io::{self, Read, Write};
use bincode::Options;
use serde::Serialize;
use serde::de::DeserializeOwned;
use serde::{de::DeserializeOwned, Serialize};
use std::io::{self, Read, Write};
use thiserror::Error;
/// An error that occurred during a deserialize operation
@@ -263,11 +261,9 @@ impl<T> LeSer for T {}
#[cfg(test)]
mod tests {
use std::io::Cursor;
use serde::{Deserialize, Serialize};
use super::DeserializeError;
use serde::{Deserialize, Serialize};
use std::io::Cursor;
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ShortStruct {

View File

@@ -1,5 +1,7 @@
use std::fmt::Display;
use std::time::{Duration, Instant};
use std::{
fmt::Display,
time::{Duration, Instant},
};
use metrics::IntCounter;

View File

@@ -1,5 +1,4 @@
use tokio_util::task::TaskTracker;
use tokio_util::task::task_tracker::TaskTrackerToken;
use tokio_util::task::{task_tracker::TaskTrackerToken, TaskTracker};
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///

View File

@@ -1,7 +1,9 @@
use std::borrow::Cow;
use std::fs::{self, File};
use std::io::{self, Write};
use std::os::fd::AsRawFd;
use std::{
borrow::Cow,
fs::{self, File},
io::{self, Write},
};
use camino::{Utf8Path, Utf8PathBuf};

View File

@@ -1,7 +1,6 @@
//! Wrapper around `std::env::var` for parsing environment variables.
use std::fmt::Display;
use std::str::FromStr;
use std::{fmt::Display, str::FromStr};
/// For types `V` that implement [`FromStr`].
pub fn var<V, E>(varname: &str) -> Option<V>

View File

@@ -127,9 +127,6 @@ pub async fn failpoint_sleep_cancellable_helper(
tracing::info!("failpoint {:?}: sleep done", name);
}
/// Initialize the configured failpoints
///
/// You must call this function before any concurrent threads do operations.
pub fn init() -> fail::FailScenario<'static> {
// The failpoints lib provides support for parsing the `FAILPOINTS` env var.
// We want non-default behavior for `exit`, though, so, we handle it separately.
@@ -137,10 +134,7 @@ pub fn init() -> fail::FailScenario<'static> {
// Format for FAILPOINTS is "name=actions" separated by ";".
let actions = std::env::var("FAILPOINTS");
if actions.is_ok() {
// SAFETY: this function should before any threads start and access env vars concurrently
unsafe {
std::env::remove_var("FAILPOINTS");
}
std::env::remove_var("FAILPOINTS");
} else {
// let the library handle non-utf8, or nothing for not present
}

View File

@@ -58,9 +58,10 @@ where
#[cfg(test)]
mod test {
use super::ignore_absent_files;
use crate::fs_ext::{is_directory_empty, list_dir};
use super::ignore_absent_files;
#[test]
fn is_empty_dir() {
use super::PathExt;

View File

@@ -38,8 +38,7 @@ pub fn rename_noreplace<P1: ?Sized + NixPath, P2: ?Sized + NixPath>(
#[cfg(test)]
mod test {
use std::fs;
use std::path::PathBuf;
use std::{fs, path::PathBuf};
use super::*;

View File

@@ -169,9 +169,9 @@ mod test {
];
let mut s = String::new();
for (line, gen_, expected) in examples {
for (line, gen, expected) in examples {
s.clear();
write!(s, "{}", &gen_.get_suffix()).expect("string grows");
write!(s, "{}", &gen.get_suffix()).expect("string grows");
assert_eq!(s, expected, "example on {line}");
}
}

View File

@@ -1,9 +1,8 @@
//! A wrapper around `ArcSwap` that ensures there is only one writer at a time and writes
//! don't block reads.
use std::sync::Arc;
use arc_swap::ArcSwap;
use std::sync::Arc;
use tokio::sync::TryLockError;
pub struct GuardArcSwap<T> {

View File

@@ -1,6 +1,5 @@
use std::fmt;
use std::num::ParseIntError;
use std::str::FromStr;
use std::{fmt, str::FromStr};
use anyhow::Context;
use hex::FromHex;
@@ -216,7 +215,7 @@ macro_rules! id_newtype {
impl AsRef<[u8]> for $t {
fn as_ref(&self) -> &[u8] {
&self.0.0
&self.0 .0
}
}
@@ -368,9 +367,10 @@ impl FromStr for NodeId {
mod tests {
use serde_assert::{Deserializer, Serializer, Token, Tokens};
use super::*;
use crate::bin_ser::BeSer;
use super::*;
#[test]
fn test_id_serde_non_human_readable() {
let original_id = Id([

View File

@@ -21,12 +21,15 @@
//!
//! Another explaination can be found here: <https://brandur.org/rate-limiting>
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Mutex,
},
time::Duration,
};
use tokio::sync::Notify;
use tokio::time::Instant;
use tokio::{sync::Notify, time::Instant};
pub struct LeakyBucketConfig {
/// This is the "time cost" of a single request unit.

View File

@@ -2,23 +2,21 @@
//!
//! <https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27>
use std::io;
use std::mem::MaybeUninit;
use std::os::fd::RawFd;
use std::os::raw::c_int;
use std::{
io,
mem::MaybeUninit,
os::{fd::RawFd, raw::c_int},
};
use nix::libc::{FIONREAD, TIOCOUTQ};
unsafe fn do_ioctl(socket_fd: RawFd, cmd: nix::libc::Ioctl) -> io::Result<c_int> {
let mut inq: MaybeUninit<c_int> = MaybeUninit::uninit();
// SAFETY: encapsulating fn is unsafe, we require `socket_fd` to be a valid file descriptor
unsafe {
let err = nix::libc::ioctl(socket_fd, cmd, inq.as_mut_ptr());
if err == 0 {
Ok(inq.assume_init())
} else {
Err(io::Error::last_os_error())
}
let err = nix::libc::ioctl(socket_fd, cmd, inq.as_mut_ptr());
if err == 0 {
Ok(inq.assume_init())
} else {
Err(io::Error::last_os_error())
}
}
@@ -26,14 +24,12 @@ unsafe fn do_ioctl(socket_fd: RawFd, cmd: nix::libc::Ioctl) -> io::Result<c_int>
///
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
pub unsafe fn inq(socket_fd: RawFd) -> io::Result<c_int> {
// SAFETY: encapsulating fn is unsafe
unsafe { do_ioctl(socket_fd, FIONREAD) }
do_ioctl(socket_fd, FIONREAD)
}
/// # Safety
///
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
pub unsafe fn outq(socket_fd: RawFd) -> io::Result<c_int> {
// SAFETY: encapsulating fn is unsafe
unsafe { do_ioctl(socket_fd, TIOCOUTQ) }
do_ioctl(socket_fd, TIOCOUTQ)
}

View File

@@ -6,15 +6,16 @@
//! there for potential pitfalls with lock files that are used
//! to store PIDs (pidfiles).
use std::fs;
use std::io::{Read, Write};
use std::ops::Deref;
use std::os::unix::prelude::AsRawFd;
use std::{
fs,
io::{Read, Write},
ops::Deref,
os::unix::prelude::AsRawFd,
};
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use nix::errno::Errno::EAGAIN;
use nix::fcntl;
use nix::{errno::Errno::EAGAIN, fcntl};
use crate::crashsafe;

View File

@@ -273,9 +273,7 @@ fn log_panic_to_stderr(
location: Option<PrettyLocation<'_, '_>>,
backtrace: &std::backtrace::Backtrace,
) {
eprintln!(
"panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}"
);
eprintln!("panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}");
}
struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>);
@@ -363,8 +361,7 @@ pub async fn log_slow<O>(name: &str, threshold: Duration, f: impl Future<Output
#[cfg(test)]
mod tests {
use metrics::IntCounterVec;
use metrics::core::Opts;
use metrics::{core::Opts, IntCounterVec};
use crate::logging::{TracingEventCountLayer, TracingEventCountMetric};

View File

@@ -1,13 +1,11 @@
#![warn(missing_docs)]
use serde::{de::Visitor, Deserialize, Serialize};
use std::fmt;
use std::ops::{Add, AddAssign};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use serde::de::Visitor;
use serde::{Deserialize, Serialize};
use crate::seqwait::MonotonicCounter;
/// Transaction log block size in bytes
@@ -409,10 +407,11 @@ impl rand::distributions::uniform::UniformSampler for LsnSampler {
#[cfg(test)]
mod tests {
use serde_assert::{Deserializer, Serializer, Token, Tokens};
use crate::bin_ser::BeSer;
use super::*;
use crate::bin_ser::BeSer;
use serde_assert::{Deserializer, Serializer, Token, Tokens};
#[test]
fn test_lsn_strings() {

View File

@@ -1,8 +1,7 @@
use pin_project_lite::pin_project;
use std::io::Read;
use std::pin::Pin;
use std::{io, task};
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
pin_project! {

View File

@@ -1,7 +1,7 @@
use std::time::{Duration, SystemTime};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use pq_proto::{PG_EPOCH, read_cstr};
use pq_proto::{read_cstr, PG_EPOCH};
use serde::{Deserialize, Serialize};
use tracing::{trace, warn};

View File

@@ -3,7 +3,7 @@
//! postgres_connection crate.
use anyhow::Context;
use postgres_connection::{PgConnectionConfig, parse_host_port};
use postgres_connection::{parse_host_port, PgConnectionConfig};
use crate::id::TenantTimelineId;

View File

@@ -53,11 +53,10 @@ mod tests {
#[test]
fn basics() {
use super::RateLimit;
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;
use super::RateLimit;
let called = AtomicUsize::new(0);
let mut f = RateLimit::new(Duration::from_millis(100));

View File

@@ -1,7 +1,7 @@
use sentry::ClientInitGuard;
use std::borrow::Cow;
use std::env;
use sentry::ClientInitGuard;
pub use sentry::release_name;
#[must_use]

View File

@@ -5,7 +5,6 @@ use std::collections::BinaryHeap;
use std::mem;
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::watch::{self, channel};
use tokio::time::timeout;
@@ -249,7 +248,11 @@ where
let internal = self.internal.lock().unwrap();
let cnt = internal.current.cnt_value();
drop(internal);
if cnt >= num { Ok(()) } else { Err(cnt) }
if cnt >= num {
Ok(())
} else {
Err(cnt)
}
}
/// Register and return a channel that will be notified when a number arrives,
@@ -322,9 +325,8 @@ where
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use std::sync::Arc;
impl MonotonicCounter<i32> for i32 {
fn cnt_advance(&mut self, val: i32) {

View File

@@ -12,7 +12,11 @@ pub struct Percent(#[serde(deserialize_with = "deserialize_pct_0_to_100")] u8);
impl Percent {
pub const fn new(pct: u8) -> Option<Self> {
if pct <= 100 { Some(Percent(pct)) } else { None }
if pct <= 100 {
Some(Percent(pct))
} else {
None
}
}
pub fn get(&self) -> u8 {

View File

@@ -1,7 +1,6 @@
//! See `pageserver_api::shard` for description on sharding.
use std::ops::RangeInclusive;
use std::str::FromStr;
use std::{ops::RangeInclusive, str::FromStr};
use hex::FromHex;
use serde::{Deserialize, Serialize};
@@ -60,7 +59,11 @@ impl ShardCount {
/// This method returns the actual number of shards, i.e. if our internal value is
/// zero, we return 1 (unsharded tenants have 1 shard).
pub fn count(&self) -> u8 {
if self.0 > 0 { self.0 } else { 1 }
if self.0 > 0 {
self.0
} else {
1
}
}
/// The literal internal value: this is **not** the number of shards in the

View File

@@ -1,7 +1,7 @@
pub use signal_hook::consts::TERM_SIGNALS;
pub use signal_hook::consts::signal::*;
use signal_hook::iterator::Signals;
pub use signal_hook::consts::{signal::*, TERM_SIGNALS};
pub enum Signal {
Quit,
Interrupt,

View File

@@ -44,7 +44,8 @@
#![warn(missing_docs)]
use std::ops::Deref;
use std::sync::{Arc, RwLock, RwLockWriteGuard, Weak};
use std::sync::{Arc, Weak};
use std::sync::{RwLock, RwLockWriteGuard};
use tokio::sync::watch;
@@ -218,11 +219,10 @@ impl RcuWaitList {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
use std::time::Duration;
use super::*;
#[tokio::test]
async fn two_writers() {
let rcu = Rcu::new(1);

View File

@@ -1,6 +1,10 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
/// Gates are a concurrency helper, primarily used for implementing safe shutdown.
///

View File

@@ -1,6 +1,7 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, MutexGuard,
};
use tokio::sync::Semaphore;
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
@@ -300,13 +301,14 @@ impl Drop for InitPermit {
#[cfg(test)]
mod tests {
use std::convert::Infallible;
use std::pin::{Pin, pin};
use std::time::Duration;
use futures::Future;
use super::*;
use std::{
convert::Infallible,
pin::{pin, Pin},
time::Duration,
};
#[tokio::test]
async fn many_initializers() {

View File

@@ -1,5 +1,4 @@
use core::future::poll_fn;
use core::task::Poll;
use core::{future::poll_fn, task::Poll};
use std::sync::{Arc, Mutex};
use diatomic_waker::DiatomicWaker;

View File

@@ -1,8 +1,9 @@
use std::io;
use std::net::{TcpListener, ToSocketAddrs};
use std::{
io,
net::{TcpListener, ToSocketAddrs},
};
use nix::sys::socket::setsockopt;
use nix::sys::socket::sockopt::ReuseAddr;
use nix::sys::socket::{setsockopt, sockopt::ReuseAddr};
/// Bind a [`TcpListener`] to addr with `SO_REUSEADDR` set to true.
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {

View File

@@ -172,14 +172,16 @@ fn tracing_subscriber_configured() -> bool {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::fmt::{self};
use std::hash::{Hash, Hasher};
use tracing_subscriber::prelude::*;
use super::*;
use std::{
collections::HashSet,
fmt::{self},
hash::{Hash, Hasher},
};
struct MemoryIdentity<'a>(&'a dyn Extractor);
impl MemoryIdentity<'_> {

View File

@@ -44,11 +44,9 @@ where
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arc_swap::ArcSwap;
use super::*;
use arc_swap::ArcSwap;
use std::sync::Arc;
#[test]
fn test_try_rcu_success() {

View File

@@ -1,6 +1,4 @@
use std::alloc::Layout;
use std::cmp::Ordering;
use std::ops::RangeBounds;
use std::{alloc::Layout, cmp::Ordering, ops::RangeBounds};
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum VecMapOrdering {
@@ -216,8 +214,7 @@ fn extract_key<K, V>(entry: &(K, V)) -> &K {
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use std::ops::Bound;
use std::{collections::BTreeMap, ops::Bound};
use super::{VecMap, VecMapOrdering};

View File

@@ -1,14 +1,19 @@
use std::io::SeekFrom;
use anyhow::{Context, Result};
use async_compression::Level;
use async_compression::tokio::bufread::ZstdDecoder;
use async_compression::tokio::write::ZstdEncoder;
use async_compression::zstd::CParameter;
use async_compression::{
tokio::{bufread::ZstdDecoder, write::ZstdEncoder},
zstd::CParameter,
Level,
};
use camino::Utf8Path;
use nix::NixPath;
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncBufRead, AsyncSeekExt, AsyncWriteExt};
use tokio::{
fs::{File, OpenOptions},
io::AsyncBufRead,
io::AsyncSeekExt,
io::AsyncWriteExt,
};
use tokio_tar::{Archive, Builder, HeaderMode};
use walkdir::WalkDir;

View File

@@ -1,8 +1,7 @@
use std::io::Read;
use bytes::{Buf, BytesMut};
use hex_literal::hex;
use serde::Deserialize;
use std::io::Read;
use utils::bin_ser::LeSer;
#[derive(Debug, PartialEq, Eq, Deserialize)]

View File

@@ -1,25 +1,23 @@
use std::env;
use std::num::NonZeroUsize;
use std::sync::Arc;
use anyhow::Context;
use criterion::{criterion_group, criterion_main, Criterion};
use futures::{stream::FuturesUnordered, StreamExt};
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
use postgres_ffi::{waldecoder::WalStreamDecoder, MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
use pprof::criterion::{Output, PProfProfiler};
use serde::Deserialize;
use std::{env, num::NonZeroUsize, sync::Arc};
use camino::{Utf8Path, Utf8PathBuf};
use camino_tempfile::Utf8TempDir;
use criterion::{Criterion, criterion_group, criterion_main};
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
use pprof::criterion::{Output, PProfProfiler};
use remote_storage::{
DownloadOpts, GenericRemoteStorage, ListingMode, RemoteStorageConfig, RemoteStorageKind,
S3Config,
};
use serde::Deserialize;
use tokio_util::sync::CancellationToken;
use utils::lsn::Lsn;
use utils::shard::{ShardCount, ShardNumber};
use utils::{
lsn::Lsn,
shard::{ShardCount, ShardNumber},
};
use wal_decoder::models::InterpretedWalRecord;
const S3_BUCKET: &str = "neon-github-public-dev";
@@ -33,7 +31,7 @@ const METADATA_FILENAME: &str = "metadata.json";
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[allow(non_upper_case_globals)]
#[unsafe(export_name = "malloc_conf")]
#[export_name = "malloc_conf"]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
async fn create_s3_client() -> anyhow::Result<Arc<GenericRemoteStorage>> {

View File

@@ -3,6 +3,8 @@
use std::collections::HashMap;
use crate::models::*;
use crate::serialized_batch::SerializedValueBatch;
use bytes::{Buf, Bytes};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::{RelTag, SlruKind};
@@ -12,9 +14,6 @@ use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
use postgres_ffi::walrecord::*;
use utils::lsn::Lsn;
use crate::models::*;
use crate::serialized_batch::SerializedValueBatch;
impl InterpretedWalRecord {
/// Decode and interpreted raw bytes which represent one Postgres WAL record.
/// Data blocks which do not match any of the provided shard identities are filtered out.

View File

@@ -8,18 +8,20 @@
use std::collections::{BTreeSet, HashMap};
use bytes::{Bytes, BytesMut};
use pageserver_api::key::{CompactKey, Key, rel_block_to_key};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::keyspace::KeySpace;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::value::Value;
use pageserver_api::{key::CompactKey, value::Value};
use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn, pg_constants};
use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
use serde::{Deserialize, Serialize};
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
use pageserver_api::key::Key;
use crate::models::InterpretedWalRecord;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
@@ -513,11 +515,10 @@ impl SerializedValueBatch {
let empty = self.raw.is_empty();
if cfg!(debug_assertions) && empty {
assert!(
self.metadata
.iter()
.all(|meta| matches!(meta, ValueMeta::Observed(_)))
);
assert!(self
.metadata
.iter()
.all(|meta| matches!(meta, ValueMeta::Observed(_))));
}
!empty

View File

@@ -7,12 +7,15 @@ use utils::lsn::Lsn;
use utils::postgres_client::{Compression, InterpretedFormat};
use crate::models::{
FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord, proto,
FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord,
};
use crate::serialized_batch::{
ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta,
};
use crate::models::proto;
#[derive(Debug, thiserror::Error)]
pub enum ToWireFormatError {
#[error("{0}")]
@@ -80,8 +83,8 @@ impl ToWireFormat for InterpretedWalRecords {
format: InterpretedFormat,
compression: Option<Compression>,
) -> Result<Bytes, ToWireFormatError> {
use async_compression::Level;
use async_compression::tokio::write::ZstdEncoder;
use async_compression::Level;
let encode_res: Result<Bytes, ToWireFormatError> = match format {
InterpretedFormat::Bincode => {

View File

@@ -1,11 +1,9 @@
//! Links with walproposer, pgcommon, pgport and runs bindgen on walproposer.h
//! to generate Rust bindings for it.
use std::env;
use std::path::PathBuf;
use std::process::Command;
use std::{env, path::PathBuf, process::Command};
use anyhow::{Context, anyhow};
use anyhow::{anyhow, Context};
const WALPROPOSER_PG_VERSION: &str = "v17";

View File

@@ -3,14 +3,27 @@
#![allow(dead_code)]
use std::ffi::{CStr, CString};
use std::ffi::CStr;
use std::ffi::CString;
use crate::bindings::{
NeonWALReadResult, PGAsyncReadResult, PGAsyncWriteResult, Safekeeper, Size, StringInfoData,
TimestampTz, WalProposer, WalProposerConnStatusType, WalProposerConnectPollStatusType,
WalProposerExecStatusType, WalproposerShmemState, XLogRecPtr, uint32, walproposer_api,
};
use crate::walproposer::{ApiImpl, StreamingCallback, WaitResult};
use crate::bindings::uint32;
use crate::bindings::walproposer_api;
use crate::bindings::NeonWALReadResult;
use crate::bindings::PGAsyncReadResult;
use crate::bindings::PGAsyncWriteResult;
use crate::bindings::Safekeeper;
use crate::bindings::Size;
use crate::bindings::StringInfoData;
use crate::bindings::TimestampTz;
use crate::bindings::WalProposer;
use crate::bindings::WalProposerConnStatusType;
use crate::bindings::WalProposerConnectPollStatusType;
use crate::bindings::WalProposerExecStatusType;
use crate::bindings::WalproposerShmemState;
use crate::bindings::XLogRecPtr;
use crate::walproposer::ApiImpl;
use crate::walproposer::StreamingCallback;
use crate::walproposer::WaitResult;
extern "C" fn get_shmem_state(wp: *mut WalProposer) -> *mut WalproposerShmemState {
unsafe {

View File

@@ -2,15 +2,15 @@
use std::ffi::CString;
use postgres_ffi::WAL_SEGMENT_SIZE;
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use crate::api_bindings::{Level, create_api, take_vec_u8};
use crate::bindings::{
NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
use crate::{
api_bindings::{create_api, take_vec_u8, Level},
bindings::{
NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
},
};
use postgres_ffi::WAL_SEGMENT_SIZE;
use utils::{id::TenantTimelineId, lsn::Lsn};
/// Rust high-level wrapper for C walproposer API. Many methods are not required
/// for simple cases, hence todo!() in default implementations.
@@ -275,17 +275,22 @@ impl StreamingCallback {
#[cfg(test)]
mod tests {
use core::panic;
use std::cell::{Cell, UnsafeCell};
use std::ffi::CString;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::sync_channel;
use std::{
cell::Cell,
ffi::CString,
sync::{atomic::AtomicUsize, mpsc::sync_channel},
};
use std::cell::UnsafeCell;
use utils::id::TenantTimelineId;
use crate::{
api_bindings::Level,
bindings::{NeonWALReadResult, PG_VERSION_NUM},
walproposer::Wrapper,
};
use super::ApiImpl;
use crate::api_bindings::Level;
use crate::bindings::{NeonWALReadResult, PG_VERSION_NUM};
use crate::walproposer::Wrapper;
#[derive(Clone, Copy, Debug)]
struct WaitEventsData {

Some files were not shown because too many files have changed in this diff Show More