Merge pull request #11023 from neondatabase/rc/release-proxy/2025-02-27

Proxy release 2025-02-27
This commit is contained in:
JC Grünhage
2025-02-27 19:10:58 +01:00
committed by GitHub
142 changed files with 1456 additions and 925 deletions

View File

@@ -19,7 +19,7 @@ on:
description: "Tag of the last compute release"
value: ${{ jobs.tags.outputs.compute }}
run-kind:
description: "The kind of run we're currently in. Will be one of `pr-main`, `push-main`, `storage-rc`, `storage-release`, `proxy-rc`, `proxy-release`, `compute-rc`, `compute-release` or `merge_queue`"
description: "The kind of run we're currently in. Will be one of `pr`, `push-main`, `storage-rc`, `storage-release`, `proxy-rc`, `proxy-release`, `compute-rc`, `compute-release` or `merge_queue`"
value: ${{ jobs.tags.outputs.run-kind }}
permissions: {}
@@ -51,10 +51,10 @@ jobs:
|| (inputs.github-event-name == 'push' && github.ref_name == 'release') && 'storage-release'
|| (inputs.github-event-name == 'push' && github.ref_name == 'release-compute') && 'compute-release'
|| (inputs.github-event-name == 'push' && github.ref_name == 'release-proxy') && 'proxy-release'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'main') && 'pr-main'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'release') && 'storage-rc-pr'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'release-compute') && 'compute-rc-pr'
|| (inputs.github-event-name == 'pull_request' && github.base_ref == 'release-proxy') && 'proxy-rc-pr'
|| (inputs.github-event-name == 'pull_request') && 'pr'
|| 'unknown'
}}
run: |
@@ -81,7 +81,7 @@ jobs:
compute-release)
echo "tag=release-compute-$(git rev-list --count HEAD)" | tee -a $GITHUB_OUTPUT
;;
pr-main|storage-rc-pr|compute-rc-pr|proxy-rc-pr)
pr|storage-rc-pr|compute-rc-pr|proxy-rc-pr)
BUILD_AND_TEST_RUN_ID=$(gh run list -b $CURRENT_BRANCH -c $CURRENT_SHA -w 'Build and Test' -L 1 --json databaseId --jq '.[].databaseId')
echo "tag=$BUILD_AND_TEST_RUN_ID" | tee -a $GITHUB_OUTPUT
;;

View File

@@ -140,6 +140,7 @@ jobs:
--ignore test_runner/performance/test_logical_replication.py
--ignore test_runner/performance/test_physical_replication.py
--ignore test_runner/performance/test_perf_ingest_using_pgcopydb.py
--ignore test_runner/performance/test_cumulative_statistics_persistence.py
env:
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
@@ -171,6 +172,61 @@ jobs:
env:
SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
cumstats-test:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:
contents: write
statuses: write
id-token: write # aws-actions/configure-aws-credentials
env:
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
DEFAULT_PG_VERSION: 17
TEST_OUTPUT: /tmp/test_output
BUILD_TYPE: remote
SAVE_PERF_REPORT: ${{ github.event.inputs.save_perf_report || ( github.ref_name == 'main' ) }}
PLATFORM: "neon-staging"
runs-on: [ self-hosted, us-east-2, x64 ]
container:
image: neondatabase/build-tools:pinned-bookworm
credentials:
username: ${{ secrets.NEON_DOCKERHUB_USERNAME }}
password: ${{ secrets.NEON_DOCKERHUB_PASSWORD }}
options: --init
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-region: eu-central-1
role-to-assume: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
role-duration-seconds: 18000 # 5 hours
- name: Download Neon artifact
uses: ./.github/actions/download
with:
name: neon-${{ runner.os }}-${{ runner.arch }}-release-artifact
path: /tmp/neon/
prefix: latest
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
- name: Verify that cumulative statistics are preserved
uses: ./.github/actions/run-python-test-set
with:
build_type: ${{ env.BUILD_TYPE }}
test_selection: performance/test_cumulative_statistics_persistence.py
run_in_parallel: false
save_perf_report: ${{ env.SAVE_PERF_REPORT }}
extra_params: -m remote_cluster --timeout 3600
pg_version: ${{ env.DEFAULT_PG_VERSION }}
aws-oicd-role-arn: ${{ vars.DEV_AWS_OIDC_ROLE_ARN }}
env:
VIP_VAP_ACCESS_TOKEN: "${{ secrets.VIP_VAP_ACCESS_TOKEN }}"
PERF_TEST_RESULT_CONNSTR: "${{ secrets.PERF_TEST_RESULT_CONNSTR }}"
NEON_API_KEY: ${{ secrets.NEON_STAGING_API_KEY }}
replication-tests:
if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }}
permissions:

View File

@@ -487,7 +487,7 @@ jobs:
neon-image-arch:
needs: [ check-permissions, build-build-tools-image, meta ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
strategy:
matrix:
arch: [ x64, arm64 ]
@@ -537,7 +537,7 @@ jobs:
neon-image:
needs: [ neon-image-arch, meta ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: ubuntu-22.04
permissions:
id-token: write # aws-actions/configure-aws-credentials
@@ -559,7 +559,7 @@ jobs:
compute-node-image-arch:
needs: [ check-permissions, build-build-tools-image, meta ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
@@ -651,7 +651,7 @@ jobs:
compute-node-image:
needs: [ compute-node-image-arch, meta ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
permissions:
id-token: write # aws-actions/configure-aws-credentials
statuses: write
@@ -694,7 +694,7 @@ jobs:
vm-compute-node-image:
needs: [ check-permissions, meta, compute-node-image ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: [ self-hosted, large ]
strategy:
fail-fast: false
@@ -775,7 +775,7 @@ jobs:
# Ensure that we don't have bad versions.
- name: Verify image versions
shell: bash # ensure no set -e for better error messages
if: ${{ contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
run: |
pageserver_version=$(docker run --rm neondatabase/neon:${{ needs.meta.outputs.build-tag }} "/bin/sh" "-c" "/usr/local/bin/pageserver --version")
@@ -823,12 +823,12 @@ jobs:
- name: Test extension upgrade
timeout-minutes: 20
if: ${{ contains(fromJSON('["pr-main", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["pr", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
env:
TAG: >-
${{
false
|| needs.meta.outputs.run-kind == 'pr-main' && needs.meta.outputs.build-tag
|| needs.meta.outputs.run-kind == 'pr' && needs.meta.outputs.build-tag
|| needs.meta.outputs.run-kind == 'compute-rc-pr' && needs.meta.outputs.previous-storage-release
}}
TEST_EXTENSIONS_TAG: latest
@@ -870,7 +870,7 @@ jobs:
push-neon-image-dev:
needs: [ meta, generate-image-maps, neon-image ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind) }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -888,7 +888,7 @@ jobs:
push-compute-image-dev:
needs: [ meta, generate-image-maps, vm-compute-node-image ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -906,7 +906,8 @@ jobs:
push-neon-image-prod:
needs: [ meta, generate-image-maps, neon-image, test-images ]
if: ${{ contains(fromJSON('["storage-release", "proxy-release"]'), needs.meta.outputs.run-kind) }}
# Depends on jobs that can get skipped
if: ${{ !failure() && !cancelled() && contains(fromJSON('["storage-release", "proxy-release"]'), needs.meta.outputs.run-kind) }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -924,7 +925,8 @@ jobs:
push-compute-image-prod:
needs: [ meta, generate-image-maps, vm-compute-node-image, test-images ]
if: ${{ needs.meta.outputs.run-kind == 'compute-release' }}
# Depends on jobs that can get skipped
if: ${{ !failure() && !cancelled() && needs.meta.outputs.run-kind == 'compute-release' }}
uses: ./.github/workflows/_push-to-container-registry.yml
permissions:
id-token: write # Required for aws/azure login
@@ -955,7 +957,7 @@ jobs:
trigger-custom-extensions-build-and-wait:
needs: [ check-permissions, meta ]
if: ${{ contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
if: ${{ contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind) }}
runs-on: ubuntu-22.04
permissions:
id-token: write # aws-actions/configure-aws-credentials
@@ -1347,7 +1349,7 @@ jobs:
|| needs.check-codestyle-python.result == 'skipped'
|| needs.check-codestyle-rust.result == 'skipped'
|| needs.files-changed.result == 'skipped'
|| (needs.push-compute-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.push-neon-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr-main", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.push-compute-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.push-neon-image-dev.result == 'skipped' && contains(fromJSON('["push-main", "pr", "storage-release", "storage-rc-pr", "proxy-release", "proxy-rc-pr"]'), needs.meta.outputs.run-kind))
|| needs.test-images.result == 'skipped'
|| (needs.trigger-custom-extensions-build-and-wait.result == 'skipped' && contains(fromJSON('["push-main", "pr-main", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))
|| (needs.trigger-custom-extensions-build-and-wait.result == 'skipped' && contains(fromJSON('["push-main", "pr", "compute-release", "compute-rc-pr"]'), needs.meta.outputs.run-kind))

16
Cargo.lock generated
View File

@@ -984,9 +984,9 @@ dependencies = [
[[package]]
name = "bindgen"
version = "0.70.1"
version = "0.71.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f"
checksum = "5f58bf3d7db68cfbac37cfc485a8d711e87e064c3d0fe0435b92f7a407f9d6b3"
dependencies = [
"bitflags 2.8.0",
"cexpr",
@@ -997,7 +997,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
"rustc-hash",
"rustc-hash 2.1.1",
"shlex",
"syn 2.0.90",
]
@@ -3537,7 +3537,7 @@ dependencies = [
"measured-derive",
"memchr",
"parking_lot 0.12.1",
"rustc-hash",
"rustc-hash 1.1.0",
"ryu",
]
@@ -5012,7 +5012,7 @@ dependencies = [
"reqwest-tracing",
"rsa",
"rstest",
"rustc-hash",
"rustc-hash 1.1.0",
"rustls 0.23.18",
"rustls-native-certs 0.8.0",
"rustls-pemfile 2.1.1",
@@ -5630,6 +5630,12 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
[[package]]
name = "rustc-hash"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustc_version"
version = "0.4.0"

View File

@@ -43,7 +43,7 @@ members = [
]
[workspace.package]
edition = "2021"
edition = "2024"
license = "Apache-2.0"
## All dependency versions, used in the project
@@ -70,7 +70,7 @@ aws-types = "1.3"
axum = { version = "0.8.1", features = ["ws"] }
base64 = "0.13.0"
bincode = "1.3"
bindgen = "0.70"
bindgen = "0.71"
bit_field = "0.10.2"
bstr = "1.0"
byteorder = "1.4"

View File

@@ -25,7 +25,7 @@ use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use nix::errno::Errno;
use nix::fcntl::{FcntlArg, FdFlag};
use nix::sys::signal::{kill, Signal};
use nix::sys::signal::{Signal, kill};
use nix::unistd::Pid;
use utils::pid_file::{self, PidFileRead};

View File

@@ -5,7 +5,16 @@
//! easier to work with locally. The python tests in `test_runner`
//! rely on `neon_local` to set up the environment for each test.
//!
use anyhow::{anyhow, bail, Context, Result};
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use std::time::Duration;
use anyhow::{Context, Result, anyhow, bail};
use clap::Parser;
use compute_api::spec::ComputeMode;
use control_plane::endpoint::ComputeControlPlane;
@@ -19,7 +28,7 @@ use control_plane::storage_controller::{
NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController,
};
use control_plane::{broker, local_env};
use nix::fcntl::{flock, FlockArg};
use nix::fcntl::{FlockArg, flock};
use pageserver_api::config::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_PAGESERVER_PG_PORT,
@@ -35,23 +44,13 @@ use safekeeper_api::{
DEFAULT_HTTP_LISTEN_PORT as DEFAULT_SAFEKEEPER_HTTP_PORT,
DEFAULT_PG_LISTEN_PORT as DEFAULT_SAFEKEEPER_PG_PORT,
};
use std::borrow::Cow;
use std::collections::{BTreeSet, HashMap};
use std::fs::File;
use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use std::time::Duration;
use storage_broker::DEFAULT_LISTEN_ADDR as DEFAULT_BROKER_ADDR;
use tokio::task::JoinSet;
use url::Host;
use utils::{
auth::{Claims, Scope},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
project_git_version,
};
use utils::auth::{Claims, Scope};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use utils::lsn::Lsn;
use utils::project_git_version;
// Default id of a safekeeper node, if not specified on the command line.
const DEFAULT_SAFEKEEPER_ID: NodeId = NodeId(1);
@@ -921,7 +920,9 @@ fn handle_init(args: &InitCmdArgs) -> anyhow::Result<LocalEnv> {
let init_conf: NeonLocalInitConf = if let Some(config_path) = &args.config {
// User (likely the Python test suite) provided a description of the environment.
if args.num_pageservers.is_some() {
bail!("Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead");
bail!(
"Cannot specify both --num-pageservers and --config, use key `pageservers` in the --config file instead"
);
}
// load and parse the file
let contents = std::fs::read_to_string(config_path).with_context(|| {
@@ -1315,10 +1316,14 @@ async fn handle_endpoint(subcmd: &EndpointCmd, env: &local_env::LocalEnv) -> Res
match (mode, args.hot_standby) {
(ComputeMode::Static(_), true) => {
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
bail!(
"Cannot start a node in hot standby mode when it is already configured as a static replica"
)
}
(ComputeMode::Primary, true) => {
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
bail!(
"Cannot start a node as a hot standby replica, it is already configured as primary node"
)
}
_ => {}
}

View File

@@ -8,7 +8,6 @@
use std::time::Duration;
use anyhow::Context;
use camino::Utf8PathBuf;
use crate::{background_process, local_env};

View File

@@ -37,27 +37,20 @@
//! ```
//!
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::net::TcpStream;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
use std::path::PathBuf;
use std::process::Command;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::{anyhow, bail, Context, Result};
use anyhow::{Context, Result, anyhow, bail};
use compute_api::requests::ConfigurationRequest;
use compute_api::responses::ComputeCtlConfig;
use compute_api::spec::Database;
use compute_api::spec::PgIdent;
use compute_api::spec::RemoteExtSpec;
use compute_api::spec::Role;
use nix::sys::signal::kill;
use nix::sys::signal::Signal;
use compute_api::responses::{ComputeCtlConfig, ComputeStatus, ComputeStatusResponse};
use compute_api::spec::{
Cluster, ComputeFeature, ComputeMode, ComputeSpec, Database, PgIdent, RemoteExtSpec, Role,
};
use nix::sys::signal::{Signal, kill};
use pageserver_api::shard::ShardStripeSize;
use reqwest::header::CONTENT_TYPE;
use serde::{Deserialize, Serialize};
@@ -69,9 +62,6 @@ use crate::local_env::LocalEnv;
use crate::postgresql_conf::PostgresConf;
use crate::storage_controller::StorageController;
use compute_api::responses::{ComputeStatus, ComputeStatusResponse};
use compute_api::spec::{Cluster, ComputeFeature, ComputeMode, ComputeSpec};
// contents of a endpoint.json file
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug)]
pub struct EndpointConf {
@@ -237,7 +227,9 @@ impl ComputeControlPlane {
});
if let Some((key, _)) = duplicates.next() {
bail!("attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported.");
bail!(
"attempting to create a duplicate primary endpoint on tenant {tenant_id}, timeline {timeline_id}: endpoint {key:?} exists already. please don't do this, it is not supported."
);
}
}
Ok(())

View File

@@ -3,28 +3,22 @@
//! Now it also provides init method which acts like a stub for proper installation
//! script which will use local paths.
use anyhow::{bail, Context};
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Duration;
use std::{env, fs};
use anyhow::{Context, bail};
use clap::ValueEnum;
use postgres_backend::AuthType;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env;
use std::fs;
use std::net::IpAddr;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::time::Duration;
use utils::{
auth::{encode_from_key_file, Claims},
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
};
use utils::auth::{Claims, encode_from_key_file};
use utils::id::{NodeId, TenantId, TenantTimelineId, TimelineId};
use crate::pageserver::PageServerNode;
use crate::pageserver::PAGESERVER_REMOTE_STORAGE_DIR;
use crate::pageserver::{PAGESERVER_REMOTE_STORAGE_DIR, PageServerNode};
use crate::safekeeper::SafekeeperNode;
pub const DEFAULT_PG_VERSION: u32 = 16;
@@ -465,7 +459,9 @@ impl LocalEnv {
if old_timeline_id == &timeline_id {
Ok(())
} else {
bail!("branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}");
bail!(
"branch '{branch_name}' is already mapped to timeline {old_timeline_id}, cannot map to another timeline {timeline_id}"
);
}
} else {
existing_values.push((tenant_id, timeline_id));

View File

@@ -7,7 +7,6 @@
//! ```
//!
use std::collections::HashMap;
use std::io;
use std::io::Write;
use std::num::NonZeroU64;
@@ -15,22 +14,19 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::time::Duration;
use anyhow::{bail, Context};
use anyhow::{Context, bail};
use camino::Utf8PathBuf;
use pageserver_api::models::{self, TenantInfo, TimelineInfo};
use pageserver_api::shard::TenantShardId;
use pageserver_client::mgmt_api;
use postgres_backend::AuthType;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use postgres_connection::{PgConnectionConfig, parse_host_port};
use utils::auth::{Claims, Scope};
use utils::id::NodeId;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use utils::id::{NodeId, TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::local_env::{NeonLocalInitPageserverConf, PageServerConf};
use crate::{background_process, local_env::LocalEnv};
use crate::background_process;
use crate::local_env::{LocalEnv, NeonLocalInitPageserverConf, PageServerConf};
/// Directory within .neon which will be used by default for LocalFs remote storage.
pub const PAGESERVER_REMOTE_STORAGE_DIR: &str = "local_fs_remote_storage/pageserver";
@@ -81,7 +77,11 @@ impl PageServerNode {
&self,
conf: NeonLocalInitPageserverConf,
) -> anyhow::Result<toml_edit::DocumentMut> {
assert_eq!(&PageServerConf::from(&conf), &self.conf, "during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully");
assert_eq!(
&PageServerConf::from(&conf),
&self.conf,
"during neon_local init, we derive the runtime state of ps conf (self.conf) from the --config flag fully"
);
// TODO(christian): instead of what we do here, create a pageserver_api::config::ConfigToml (PR #7656)

View File

@@ -1,3 +1,6 @@
use std::collections::HashMap;
use std::fmt;
///
/// Module for parsing postgresql.conf file.
///
@@ -6,8 +9,6 @@
/// funny stuff like include-directives or funny escaping.
use once_cell::sync::Lazy;
use regex::Regex;
use std::collections::HashMap;
use std::fmt;
/// In-memory representation of a postgresql.conf file
#[derive(Default, Debug)]

View File

@@ -14,18 +14,15 @@ use std::{io, result};
use anyhow::Context;
use camino::Utf8PathBuf;
use http_utils::error::HttpErrorBody;
use postgres_connection::PgConnectionConfig;
use reqwest::{IntoUrl, Method};
use thiserror::Error;
use http_utils::error::HttpErrorBody;
use utils::auth::{Claims, Scope};
use utils::id::NodeId;
use crate::{
background_process,
local_env::{LocalEnv, SafekeeperConf},
};
use crate::background_process;
use crate::local_env::{LocalEnv, SafekeeperConf};
#[derive(Error, Debug)]
pub enum SafekeeperHttpError {

View File

@@ -1,44 +1,39 @@
use crate::{
background_process,
local_env::{LocalEnv, NeonStorageControllerConf},
};
use std::ffi::OsStr;
use std::fs;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::process::ExitStatus;
use std::str::FromStr;
use std::sync::OnceLock;
use std::time::{Duration, Instant};
use camino::{Utf8Path, Utf8PathBuf};
use hyper0::Uri;
use nix::unistd::Pid;
use pageserver_api::{
controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest,
TenantShardMigrateResponse,
},
models::{
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
},
shard::{ShardStripeSize, TenantShardId},
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest,
TenantCreateResponse, TenantLocateResponse, TenantShardMigrateRequest,
TenantShardMigrateResponse,
};
use pageserver_api::models::{
TenantShardSplitRequest, TenantShardSplitResponse, TimelineCreateRequest, TimelineInfo,
};
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::ResponseErrorMessageExt;
use postgres_backend::AuthType;
use reqwest::Method;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{
ffi::OsStr,
fs,
net::SocketAddr,
path::PathBuf,
process::ExitStatus,
str::FromStr,
sync::OnceLock,
time::{Duration, Instant},
};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::process::Command;
use tracing::instrument;
use url::Url;
use utils::{
auth::{encode_from_key_file, Claims, Scope},
id::{NodeId, TenantId},
};
use utils::auth::{Claims, Scope, encode_from_key_file};
use utils::id::{NodeId, TenantId};
use whoami::username;
use crate::background_process;
use crate::local_env::{LocalEnv, NeonStorageControllerConf};
pub struct StorageController {
env: LocalEnv,
private_key: Option<Vec<u8>>,
@@ -96,7 +91,8 @@ pub struct AttachHookRequest {
#[derive(Serialize, Deserialize)]
pub struct AttachHookResponse {
pub gen: Option<u32>,
#[serde(rename = "gen")]
pub generation: Option<u32>,
}
#[derive(Serialize, Deserialize)]
@@ -779,7 +775,7 @@ impl StorageController {
)
.await?;
Ok(response.gen)
Ok(response.generation)
}
#[instrument(skip(self))]

View File

@@ -1,34 +1,27 @@
use futures::StreamExt;
use std::{
collections::{HashMap, HashSet},
str::FromStr,
time::Duration,
};
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::time::Duration;
use clap::{Parser, Subcommand};
use pageserver_api::{
controller_api::{
AvailabilityZone, NodeAvailabilityWrapper, NodeDescribeResponse, NodeShardResponse,
SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy,
ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy,
TenantCreateRequest, TenantDescribeResponse, TenantPolicyRequest,
},
models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary,
ShardParameters, TenantConfig, TenantConfigPatchRequest, TenantConfigRequest,
TenantShardSplitRequest, TenantShardSplitResponse,
},
shard::{ShardStripeSize, TenantShardId},
use futures::StreamExt;
use pageserver_api::controller_api::{
AvailabilityZone, NodeAvailabilityWrapper, NodeConfigureRequest, NodeDescribeResponse,
NodeRegisterRequest, NodeSchedulingPolicy, NodeShardResponse, PlacementPolicy,
SafekeeperDescribeResponse, SafekeeperSchedulingPolicyRequest, ShardSchedulingPolicy,
ShardsPreferredAzsRequest, ShardsPreferredAzsResponse, SkSchedulingPolicy, TenantCreateRequest,
TenantDescribeResponse, TenantPolicyRequest, TenantShardMigrateRequest,
TenantShardMigrateResponse,
};
use pageserver_api::models::{
EvictionPolicy, EvictionPolicyLayerAccessThreshold, LocationConfigSecondary, ShardParameters,
TenantConfig, TenantConfigPatchRequest, TenantConfigRequest, TenantShardSplitRequest,
TenantShardSplitResponse,
};
use pageserver_api::shard::{ShardStripeSize, TenantShardId};
use pageserver_client::mgmt_api::{self};
use reqwest::{Method, StatusCode, Url};
use utils::id::{NodeId, TenantId, TimelineId};
use pageserver_api::controller_api::{
NodeConfigureRequest, NodeRegisterRequest, NodeSchedulingPolicy, PlacementPolicy,
TenantShardMigrateRequest, TenantShardMigrateResponse,
};
use storage_controller_client::control_api::Client;
use utils::id::{NodeId, TenantId, TimelineId};
#[derive(Subcommand, Debug)]
enum Command {
@@ -921,7 +914,9 @@ async fn main() -> anyhow::Result<()> {
}
Command::TenantDrop { tenant_id, unclean } => {
if !unclean {
anyhow::bail!("This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed.")
anyhow::bail!(
"This command is not a tenant deletion, and uncleanly drops all controller state for the tenant. If you know what you're doing, add `--unclean` to proceed."
)
}
storcon_client
.dispatch::<(), ()>(
@@ -933,7 +928,9 @@ async fn main() -> anyhow::Result<()> {
}
Command::NodeDrop { node_id, unclean } => {
if !unclean {
anyhow::bail!("This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed.")
anyhow::bail!(
"This command is not a clean node decommission, and uncleanly drops all controller state for the node, without checking if any tenants still refer to it. If you know what you're doing, add `--unclean` to proceed."
)
}
storcon_client
.dispatch::<(), ()>(Method::POST, format!("debug/v1/node/{node_id}/drop"), None)

View File

@@ -7,7 +7,7 @@ index f255fe6..0a0fa65 100644
GENERATED_SCHEDULE_DEPS = $(TB_DIR)/all_tests $(TB_DIR)/exclude_tests
REGRESS = --schedule $(TB_DIR)/run.sch # Set this again just to be safe
-REGRESS_OPTS = --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
+REGRESS_OPTS = --use-existing --dbname=pgtap_regression --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
+REGRESS_OPTS = --use-existing --dbname=contrib_regression --inputdir=test --max-connections=$(PARALLEL_CONN) --schedule $(SETUP_SCH) $(REGRESS_CONF)
SETUP_SCH = test/schedule/main.sch # schedule to use for test setup; this can be forcibly changed by some targets!
IGNORE_TESTS = $(notdir $(EXCLUDE_TEST_FILES:.sql=))
PARALLEL_TESTS = $(filter-out $(IGNORE_TESTS),$(filter-out $(SERIAL_TESTS),$(ALL_TESTS)))

View File

@@ -12,6 +12,7 @@ if [ -z ${OLD_COMPUTE_TAG+x} ] || [ -z ${NEW_COMPUTE_TAG+x} ] || [ -z "${OLD_COM
fi
export PG_VERSION=${PG_VERSION:-16}
export PG_TEST_VERSION=${PG_VERSION}
# Waits for compute node is ready
function wait_for_ready {
TIME=0
while ! docker compose logs compute_is_ready | grep -q "accepting connections" && [ ${TIME} -le 300 ] ; do
@@ -23,11 +24,45 @@ function wait_for_ready {
exit 2
fi
}
# Creates extensions. Gets a string with space-separated extensions as a parameter
function create_extensions() {
for ext in ${1}; do
docker compose exec neon-test-extensions psql -X -v ON_ERROR_STOP=1 -d contrib_regression -c "CREATE EXTENSION IF NOT EXISTS ${ext} CASCADE"
done
}
# Creates a new timeline. Gets the parent ID and an extension name as parameters.
# Saves the timeline ID in the variable EXT_TIMELINE
function create_timeline() {
generate_id new_timeline_id
PARAMS=(
-sbf
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${new_timeline_id}\", \"pg_version\": ${PG_VERSION}, \"ancestor_timeline_id\": \"${1}\"}"
"http://127.0.0.1:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
EXT_TIMELINE[${2}]=${new_timeline_id}
}
# Checks if the timeline ID of the compute node is expected. Gets the timeline ID as a parameter
function check_timeline() {
TID=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
if [ "${TID}" != "${1}" ]; then
echo Timeline mismatch
exit 1
fi
}
# Restarts the compute node with the required compute tag and timeline.
# Accepts the tag for the compute node and the timeline as parameters.
function restart_compute() {
docker compose down compute compute_is_ready
COMPUTE_TAG=${1} TAG=${OLD_COMPUTE_TAG} TENANT_ID=${tenant_id} TIMELINE_ID=${2} docker compose up --quiet-pull -d --build compute compute_is_ready
wait_for_ready
check_timeline ${2}
}
declare -A EXT_TIMELINE
EXTENSIONS='[
{"extname": "plv8", "extdir": "plv8-src"},
{"extname": "vector", "extdir": "pgvector-src"},
@@ -47,7 +82,7 @@ EXTENSIONS='[
{"extname": "pg_repack", "extdir": "pg_repack-src"}
]'
EXTNAMES=$(echo ${EXTENSIONS} | jq -r '.[].extname' | paste -sd ' ' -)
COMPUTE_TAG=${NEW_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d
TAG=${NEW_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d
wait_for_ready
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
@@ -55,12 +90,14 @@ create_extensions "${EXTNAMES}"
query="select json_object_agg(extname,extversion) from pg_extension where extname in ('${EXTNAMES// /\',\'}')"
new_vers=$(docker compose exec neon-test-extensions psql -Aqt -d contrib_regression -c "$query")
docker compose --profile test-extensions down
COMPUTE_TAG=${OLD_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d --force-recreate
TAG=${OLD_COMPUTE_TAG} docker compose --profile test-extensions up --quiet-pull --build -d --force-recreate
wait_for_ready
docker compose exec neon-test-extensions psql -c "DROP DATABASE IF EXISTS contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE contrib_regression"
docker compose exec neon-test-extensions psql -c "CREATE DATABASE pgtap_regression"
docker compose exec neon-test-extensions psql -d pgtap_regression -c "CREATE EXTENSION pgtap"
tenant_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.tenant_id")
EXT_TIMELINE["main"]=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
create_timeline "${EXT_TIMELINE["main"]}" init
restart_compute "${OLD_COMPUTE_TAG}" "${EXT_TIMELINE["init"]}"
create_extensions "${EXTNAMES}"
if [ "${FORCE_ALL_UPGRADE_TESTS:-false}" = true ]; then
exts="${EXTNAMES}"
@@ -71,29 +108,13 @@ fi
if [ -z "${exts}" ]; then
echo "No extensions were upgraded"
else
tenant_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.tenant_id")
timeline_id=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
for ext in ${exts}; do
echo Testing ${ext}...
create_timeline "${EXT_TIMELINE["main"]}" ${ext}
EXTDIR=$(echo ${EXTENSIONS} | jq -r '.[] | select(.extname=="'${ext}'") | .extdir')
generate_id new_timeline_id
PARAMS=(
-sbf
-X POST
-H "Content-Type: application/json"
-d "{\"new_timeline_id\": \"${new_timeline_id}\", \"pg_version\": ${PG_VERSION}, \"ancestor_timeline_id\": \"${timeline_id}\"}"
"http://127.0.0.1:9898/v1/tenant/${tenant_id}/timeline/"
)
result=$(curl "${PARAMS[@]}")
echo $result | jq .
TENANT_ID=${tenant_id} TIMELINE_ID=${new_timeline_id} COMPUTE_TAG=${OLD_COMPUTE_TAG} docker compose down compute compute_is_ready
COMPUTE_TAG=${NEW_COMPUTE_TAG} TENANT_ID=${tenant_id} TIMELINE_ID=${new_timeline_id} docker compose up --quiet-pull -d --build compute compute_is_ready
wait_for_ready
TID=$(docker compose exec neon-test-extensions psql -Aqt -c "SHOW neon.timeline_id")
if [ ${TID} != ${new_timeline_id} ]; then
echo Timeline mismatch
exit 1
fi
restart_compute "${OLD_COMPUTE_TAG}" "${EXT_TIMELINE[${ext}]}"
docker compose exec neon-test-extensions psql -d contrib_regression -c "CREATE EXTENSION ${ext} CASCADE"
restart_compute "${NEW_COMPUTE_TAG}" "${EXT_TIMELINE[${ext}]}"
docker compose exec neon-test-extensions psql -d contrib_regression -c "\dx ${ext}"
if ! docker compose exec neon-test-extensions sh -c /ext-src/${EXTDIR}/test-upgrade.sh; then
docker compose exec neon-test-extensions cat /ext-src/${EXTDIR}/regression.diffs

View File

@@ -1,7 +1,7 @@
[package]
name = "consumption_metrics"
version = "0.1.0"
edition = "2021"
edition = "2024"
license = "Apache-2.0"
[dependencies]

View File

@@ -1,4 +1,5 @@
use std::{collections::VecDeque, sync::Arc};
use std::collections::VecDeque;
use std::sync::Arc;
use parking_lot::{Mutex, MutexGuard};

View File

@@ -1,11 +1,7 @@
use std::{
panic::AssertUnwindSafe,
sync::{
atomic::{AtomicBool, AtomicU32, AtomicU8, Ordering},
mpsc, Arc, OnceLock,
},
thread::JoinHandle,
};
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, Ordering};
use std::sync::{Arc, OnceLock, mpsc};
use std::thread::JoinHandle;
use tracing::{debug, error, trace};

View File

@@ -1,26 +1,19 @@
use std::{
cmp::Ordering,
collections::{BinaryHeap, VecDeque},
fmt::{self, Debug},
ops::DerefMut,
sync::{mpsc, Arc},
};
use std::cmp::Ordering;
use std::collections::{BinaryHeap, VecDeque};
use std::fmt::{self, Debug};
use std::ops::DerefMut;
use std::sync::{Arc, mpsc};
use parking_lot::{
lock_api::{MappedMutexGuard, MutexGuard},
Mutex, RawMutex,
};
use parking_lot::lock_api::{MappedMutexGuard, MutexGuard};
use parking_lot::{Mutex, RawMutex};
use rand::rngs::StdRng;
use tracing::debug;
use crate::{
executor::{self, ThreadContext},
options::NetworkOptions,
proto::NetEvent,
proto::NodeEvent,
};
use super::{chan::Chan, proto::AnyMessage};
use super::chan::Chan;
use super::proto::AnyMessage;
use crate::executor::{self, ThreadContext};
use crate::options::NetworkOptions;
use crate::proto::{NetEvent, NodeEvent};
pub struct NetworkTask {
options: Arc<NetworkOptions>,

View File

@@ -2,14 +2,11 @@ use std::sync::Arc;
use rand::Rng;
use super::chan::Chan;
use super::network::TCP;
use super::world::{Node, NodeId, World};
use crate::proto::NodeEvent;
use super::{
chan::Chan,
network::TCP,
world::{Node, NodeId, World},
};
/// Abstraction with all functions (aka syscalls) available to the node.
#[derive(Clone)]
pub struct NodeOs {

View File

@@ -1,4 +1,5 @@
use rand::{rngs::StdRng, Rng};
use rand::Rng;
use rand::rngs::StdRng;
/// Describes random delays and failures. Delay will be uniformly distributed in [min, max].
/// Connection failure will occur with the probablity fail_prob.

View File

@@ -3,7 +3,8 @@ use std::fmt::Debug;
use bytes::Bytes;
use utils::lsn::Lsn;
use crate::{network::TCP, world::NodeId};
use crate::network::TCP;
use crate::world::NodeId;
/// Internal node events.
#[derive(Debug)]

View File

@@ -1,12 +1,8 @@
use std::{
cmp::Ordering,
collections::BinaryHeap,
ops::DerefMut,
sync::{
atomic::{AtomicU32, AtomicU64},
Arc,
},
};
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64};
use parking_lot::Mutex;
use tracing::trace;

View File

@@ -1,19 +1,18 @@
use std::ops::DerefMut;
use std::sync::{Arc, mpsc};
use parking_lot::Mutex;
use rand::{rngs::StdRng, SeedableRng};
use std::{
ops::DerefMut,
sync::{mpsc, Arc},
};
use rand::SeedableRng;
use rand::rngs::StdRng;
use crate::{
executor::{ExternalHandle, Runtime},
network::NetworkTask,
options::NetworkOptions,
proto::{NodeEvent, SimEvent},
time::Timing,
};
use super::{chan::Chan, network::TCP, node_os::NodeOs};
use super::chan::Chan;
use super::network::TCP;
use super::node_os::NodeOs;
use crate::executor::{ExternalHandle, Runtime};
use crate::network::NetworkTask;
use crate::options::NetworkOptions;
use crate::proto::{NodeEvent, SimEvent};
use crate::time::Timing;
pub type NodeId = u32;

View File

@@ -1,14 +1,15 @@
//! Simple test to verify that simulator is working.
#[cfg(test)]
mod reliable_copy_test {
use std::sync::Arc;
use anyhow::Result;
use desim::executor::{self, PollSome};
use desim::node_os::NodeOs;
use desim::options::{Delay, NetworkOptions};
use desim::proto::{NetEvent, NodeEvent, ReplCell};
use desim::proto::{AnyMessage, NetEvent, NodeEvent, ReplCell};
use desim::world::{NodeId, World};
use desim::{node_os::NodeOs, proto::AnyMessage};
use parking_lot::Mutex;
use std::sync::Arc;
use tracing::info;
/// Disk storage trait and implementation.

View File

@@ -1,30 +1,30 @@
use crate::error::{api_error_handler, route_error_handler, ApiError};
use crate::pprof;
use crate::request::{get_query_param, parse_query_param};
use ::pprof::protos::Message as _;
use ::pprof::ProfilerGuardBuilder;
use anyhow::{anyhow, Context};
use bytes::{Bytes, BytesMut};
use hyper::header::{HeaderName, AUTHORIZATION, CONTENT_DISPOSITION};
use hyper::http::HeaderValue;
use hyper::Method;
use hyper::{header::CONTENT_TYPE, Body, Request, Response};
use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder};
use once_cell::sync::Lazy;
use regex::Regex;
use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use tokio::sync::{mpsc, Mutex, Notify};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use tracing::{debug, info, info_span, warn, Instrument};
use utils::auth::{AuthError, Claims, SwappableJwtAuth};
use std::future::Future;
use std::io::Write as _;
use std::str::FromStr;
use std::time::Duration;
use ::pprof::ProfilerGuardBuilder;
use ::pprof::protos::Message as _;
use anyhow::{Context, anyhow};
use bytes::{Bytes, BytesMut};
use hyper::header::{AUTHORIZATION, CONTENT_DISPOSITION, CONTENT_TYPE, HeaderName};
use hyper::http::HeaderValue;
use hyper::{Body, Method, Request, Response};
use metrics::{Encoder, IntCounter, TextEncoder, register_int_counter};
use once_cell::sync::Lazy;
use regex::Regex;
use routerify::ext::RequestExt;
use routerify::{Middleware, RequestInfo, Router, RouterBuilder};
use tokio::sync::{Mutex, Notify, mpsc};
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::io::ReaderStream;
use tracing::{Instrument, debug, info, info_span, warn};
use utils::auth::{AuthError, Claims, SwappableJwtAuth};
use crate::error::{ApiError, api_error_handler, route_error_handler};
use crate::pprof;
use crate::request::{get_query_param, parse_query_param};
static SERVE_METRICS_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"libmetrics_metric_handler_requests_total",
@@ -375,7 +375,7 @@ pub async fn profile_cpu_handler(req: Request<Body>) -> Result<Response<Body>, A
Err(_) => {
return Err(ApiError::Conflict(
"profiler already running (use ?force=true to cancel it)".into(),
))
));
}
}
tokio::time::sleep(Duration::from_millis(1)).await; // don't busy-wait
@@ -539,8 +539,8 @@ pub async fn profile_heap_handler(req: Request<Body>) -> Result<Response<Body>,
}
}
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
) -> Middleware<B, ApiError> {
pub fn add_request_id_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>()
-> Middleware<B, ApiError> {
Middleware::pre(move |req| async move {
let request_id = match req.headers().get(&X_REQUEST_ID_HEADER) {
Some(request_id) => request_id
@@ -664,7 +664,7 @@ pub fn auth_middleware<B: hyper::body::HttpBody + Send + Sync + 'static>(
None => {
return Err(ApiError::Unauthorized(
"missing authorization header".to_string(),
))
));
}
}
}
@@ -717,12 +717,14 @@ pub fn check_permission_with(
#[cfg(test)]
mod tests {
use super::*;
use hyper::service::Service;
use routerify::RequestServiceBuilder;
use std::future::poll_fn;
use std::net::{IpAddr, SocketAddr};
use hyper::service::Service;
use routerify::RequestServiceBuilder;
use super::*;
#[tokio::test]
async fn test_request_id_returned() {
let builder = RequestServiceBuilder::new(make_router().build().unwrap()).unwrap();

View File

@@ -1,10 +1,10 @@
use hyper::{header, Body, Response, StatusCode};
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::error::Error as StdError;
use hyper::{Body, Response, StatusCode, header};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::{error, info, warn};
use utils::auth::AuthError;
#[derive(Debug, Error)]

View File

@@ -1,12 +1,11 @@
use crate::error::ApiError;
use crate::json::{json_request, json_response};
use hyper::{Body, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio_util::sync::CancellationToken;
use utils::failpoint_support::apply_failpoint;
use crate::error::ApiError;
use crate::json::{json_request, json_response};
pub type ConfigureFailpointsRequest = Vec<FailpointConfig>;
/// Information for configuring a single fail point

View File

@@ -1,6 +1,6 @@
use anyhow::Context;
use bytes::Buf;
use hyper::{header, Body, Request, Response, StatusCode};
use hyper::{Body, Request, Response, StatusCode, header};
use serde::{Deserialize, Serialize};
use super::error::ApiError;

View File

@@ -9,4 +9,4 @@ extern crate hyper0 as hyper;
/// Current fast way to apply simple http routing in various Neon binaries.
/// Re-exported for sake of uniform approach, that could be later replaced with better alternatives, if needed.
pub use routerify::{ext::RequestExt, RouterBuilder, RouterService};
pub use routerify::{RouterBuilder, RouterService, ext::RequestExt};

View File

@@ -1,15 +1,15 @@
use anyhow::bail;
use flate2::write::{GzDecoder, GzEncoder};
use flate2::Compression;
use itertools::Itertools as _;
use pprof::protos::{Function, Line, Location, Message as _, Profile};
use regex::Regex;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::ffi::c_void;
use std::io::Write as _;
use anyhow::bail;
use flate2::Compression;
use flate2::write::{GzDecoder, GzEncoder};
use itertools::Itertools as _;
use pprof::protos::{Function, Line, Location, Message as _, Profile};
use regex::Regex;
/// Decodes a gzip-compressed Protobuf-encoded pprof profile.
pub fn decode(bytes: &[u8]) -> anyhow::Result<Profile> {
let mut gz = GzDecoder::new(Vec::new());

View File

@@ -1,10 +1,13 @@
use core::fmt;
use std::{borrow::Cow, str::FromStr};
use std::borrow::Cow;
use std::str::FromStr;
use anyhow::anyhow;
use hyper::body::HttpBody;
use hyper::{Body, Request};
use routerify::ext::RequestExt;
use super::error::ApiError;
use anyhow::anyhow;
use hyper::{body::HttpBody, Body, Request};
use routerify::ext::RequestExt;
pub fn get_request_param<'a>(
request: &'a Request<Body>,

View File

@@ -6,17 +6,15 @@
//! Probabilistic cardinality estimators, such as the HyperLogLog algorithm,
//! use significantly less memory than this, but can only approximate the cardinality.
use std::{
hash::{BuildHasher, BuildHasherDefault, Hash},
sync::atomic::AtomicU8,
};
use std::hash::{BuildHasher, BuildHasherDefault, Hash};
use std::sync::atomic::AtomicU8;
use measured::{
label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor},
metric::{counter::CounterState, name::MetricNameEncoder, Metric, MetricType, MetricVec},
text::TextEncoder,
LabelGroup,
};
use measured::LabelGroup;
use measured::label::{LabelGroupVisitor, LabelName, LabelValue, LabelVisitor};
use measured::metric::counter::CounterState;
use measured::metric::name::MetricNameEncoder;
use measured::metric::{Metric, MetricType, MetricVec};
use measured::text::TextEncoder;
use twox_hash::xxh3;
/// Create an [`HyperLogLogVec`] and registers to default registry.
@@ -27,9 +25,7 @@ macro_rules! register_hll_vec {
$crate::register(Box::new(hll_vec.clone())).map(|_| hll_vec)
}};
($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{
$crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES)
}};
($N:literal, $NAME:expr, $HELP:expr, $LABELS_NAMES:expr $(,)?) => {{ $crate::register_hll_vec!($N, $crate::opts!($NAME, $HELP), $LABELS_NAMES) }};
}
/// Create an [`HyperLogLog`] and registers to default registry.
@@ -40,9 +36,7 @@ macro_rules! register_hll {
$crate::register(Box::new(hll.clone())).map(|_| hll)
}};
($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{
$crate::register_hll!($N, $crate::opts!($NAME, $HELP))
}};
($N:literal, $NAME:expr, $HELP:expr $(,)?) => {{ $crate::register_hll!($N, $crate::opts!($NAME, $HELP)) }};
}
/// HLL is a probabilistic cardinality measure.
@@ -195,8 +189,10 @@ impl<W: std::io::Write, const N: usize> measured::metric::MetricEncoding<TextEnc
mod tests {
use std::collections::HashSet;
use measured::{label::StaticLabelSet, FixedCardinalityLabel};
use rand::{rngs::StdRng, Rng, SeedableRng};
use measured::FixedCardinalityLabel;
use measured::label::StaticLabelSet;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use rand_distr::{Distribution, Zipf};
use crate::HyperLogLogVec;

View File

@@ -1,9 +1,10 @@
//! A timestamp captured at process startup to identify restarts of the process, e.g., in logs and metrics.
use std::fmt::Display;
use chrono::Utc;
use super::register_uint_gauge;
use std::fmt::Display;
pub struct LaunchTimestamp(chrono::DateTime<Utc>);

View File

@@ -4,38 +4,26 @@
//! a default registry.
#![deny(clippy::undocumented_unsafe_blocks)]
use measured::{
label::{LabelGroupSet, LabelGroupVisitor, LabelName, NoLabels},
metric::{
counter::CounterState,
gauge::GaugeState,
group::Encoding,
name::{MetricName, MetricNameEncoder},
MetricEncoding, MetricFamilyEncoding,
},
FixedCardinalityLabel, LabelGroup, MetricGroup,
};
use measured::label::{LabelGroupSet, LabelGroupVisitor, LabelName, NoLabels};
use measured::metric::counter::CounterState;
use measured::metric::gauge::GaugeState;
use measured::metric::group::Encoding;
use measured::metric::name::{MetricName, MetricNameEncoder};
use measured::metric::{MetricEncoding, MetricFamilyEncoding};
use measured::{FixedCardinalityLabel, LabelGroup, MetricGroup};
use once_cell::sync::Lazy;
use prometheus::Registry;
use prometheus::core::{
Atomic, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, GenericGaugeVec,
};
pub use prometheus::local::LocalHistogram;
pub use prometheus::opts;
pub use prometheus::register;
pub use prometheus::Error;
use prometheus::Registry;
pub use prometheus::{core, default_registry, proto};
pub use prometheus::{exponential_buckets, linear_buckets};
pub use prometheus::{register_counter_vec, Counter, CounterVec};
pub use prometheus::{register_gauge, Gauge};
pub use prometheus::{register_gauge_vec, GaugeVec};
pub use prometheus::{register_histogram, Histogram};
pub use prometheus::{register_histogram_vec, HistogramVec};
pub use prometheus::{register_int_counter, IntCounter};
pub use prometheus::{register_int_counter_vec, IntCounterVec};
pub use prometheus::{register_int_gauge, IntGauge};
pub use prometheus::{register_int_gauge_vec, IntGaugeVec};
pub use prometheus::{Encoder, TextEncoder};
pub use prometheus::{
Counter, CounterVec, Encoder, Error, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter,
IntCounterVec, IntGauge, IntGaugeVec, TextEncoder, core, default_registry, exponential_buckets,
linear_buckets, opts, proto, register, register_counter_vec, register_gauge,
register_gauge_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
};
pub mod launch_timestamp;
mod wrappers;

View File

@@ -4,28 +4,28 @@
//! is rather narrow, but we can extend it once required.
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use anyhow::Context;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::io::ErrorKind;
use std::net::SocketAddr;
use std::os::fd::AsRawFd;
use std::os::fd::RawFd;
use std::os::fd::{AsRawFd, RawFd};
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::task::{ready, Poll};
use std::task::{Poll, ready};
use std::{fmt, io};
use std::{future::Future, str::FromStr};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
use anyhow::Context;
use bytes::Bytes;
use pq_proto::framed::{ConnectionError, Framed, FramedReader, FramedWriter};
use pq_proto::{
BeMessage, FeMessage, FeStartupPacket, ProtocolError, SQLSTATE_ADMIN_SHUTDOWN,
SQLSTATE_INTERNAL_ERROR, SQLSTATE_SUCCESSFUL_COMPLETION,
};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_rustls::TlsAcceptor;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, trace, warn};
/// An error, occurred during query processing:
/// either during the connection ([`ConnectionError`]) or before/after it.
@@ -746,7 +746,7 @@ impl<IO: AsyncRead + AsyncWrite + Unpin> PostgresBackend<IO> {
match e {
QueryError::Shutdown => return Ok(ProcessMsgResult::Break),
QueryError::SimulatedConnectionError => {
return Err(QueryError::SimulatedConnectionError)
return Err(QueryError::SimulatedConnectionError);
}
err @ QueryError::Reconnect => {
// Instruct the client to reconnect, stop processing messages
@@ -1020,7 +1020,9 @@ fn log_query_error(query: &str, e: &QueryError) {
}
}
QueryError::Disconnected(other_connection_error) => {
error!("query handler for '{query}' failed with connection error: {other_connection_error:?}")
error!(
"query handler for '{query}' failed with connection error: {other_connection_error:?}"
)
}
QueryError::SimulatedConnectionError => {
error!("query handler for query '{query}' failed due to a simulated connection error")

View File

@@ -1,10 +1,11 @@
use std::io::Cursor;
use std::sync::Arc;
/// Test postgres_backend_async with tokio_postgres
use once_cell::sync::Lazy;
use postgres_backend::{AuthType, Handler, PostgresBackend, QueryError};
use pq_proto::{BeMessage, RowDescriptor};
use rustls::crypto::ring;
use std::io::Cursor;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio_postgres::config::SslMode;

View File

@@ -1,9 +1,10 @@
#![deny(unsafe_code)]
#![deny(clippy::undocumented_unsafe_blocks)]
use anyhow::{bail, Context};
use itertools::Itertools;
use std::borrow::Cow;
use std::fmt;
use anyhow::{Context, bail};
use itertools::Itertools;
use url::Host;
/// Parses a string of format either `host:port` or `host` into a corresponding pair.
@@ -29,9 +30,10 @@ pub fn parse_host_port<S: AsRef<str>>(host_port: S) -> Result<(Host, Option<u16>
#[cfg(test)]
mod tests_parse_host_port {
use crate::parse_host_port;
use url::Host;
use crate::parse_host_port;
#[test]
fn test_normal() {
let (host, port) = parse_host_port("hello:123").unwrap();
@@ -207,10 +209,11 @@ impl fmt::Debug for PgConnectionConfig {
#[cfg(test)]
mod tests_pg_connection_config {
use crate::PgConnectionConfig;
use once_cell::sync::Lazy;
use url::Host;
use crate::PgConnectionConfig;
static STUB_HOST: Lazy<Host> = Lazy::new(|| Host::Domain("stub.host.example".to_owned()));
#[test]

View File

@@ -1,6 +1,6 @@
use std::ffi::CStr;
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use postgres_ffi::v17::wal_generator::LogicalMessageGenerator;
use postgres_ffi::v17::waldecoder_handler::WalStreamDecoderHandler;
use postgres_ffi::waldecoder::WalStreamDecoder;

View File

@@ -4,7 +4,7 @@ use std::env;
use std::path::PathBuf;
use std::process::Command;
use anyhow::{anyhow, Context};
use anyhow::{Context, anyhow};
use bindgen::callbacks::{DeriveInfo, ParseCallbacks};
#[derive(Debug)]

View File

@@ -21,7 +21,9 @@ macro_rules! postgres_ffi {
pub mod bindings {
// bindgen generates bindings for a lot of stuff we don't need
#![allow(dead_code)]
#![allow(unsafe_op_in_unsafe_fn)]
#![allow(clippy::undocumented_unsafe_blocks)]
#![allow(clippy::ptr_offset_with_cast)]
use serde::{Deserialize, Serialize};
include!(concat!(
@@ -43,8 +45,7 @@ macro_rules! postgres_ffi {
pub const PG_MAJORVERSION: &str = stringify!($version);
// Re-export some symbols from bindings
pub use bindings::DBState_DB_SHUTDOWNED;
pub use bindings::{CheckPoint, ControlFileData, XLogRecord};
pub use bindings::{CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, XLogRecord};
pub const ZERO_CHECKPOINT: bytes::Bytes =
bytes::Bytes::from_static(&[0u8; xlog_utils::SIZEOF_CHECKPOINT]);
@@ -221,21 +222,17 @@ pub mod relfile_utils;
pub mod walrecord;
// Export some widely used datatypes that are unlikely to change across Postgres versions
pub use v14::bindings::RepOriginId;
pub use v14::bindings::{uint32, uint64, Oid};
pub use v14::bindings::{BlockNumber, OffsetNumber};
pub use v14::bindings::{MultiXactId, TransactionId};
pub use v14::bindings::{TimeLineID, TimestampTz, XLogRecPtr, XLogSegNo};
pub use v14::bindings::{
BlockNumber, CheckPoint, ControlFileData, MultiXactId, OffsetNumber, Oid, PageHeaderData,
RepOriginId, TimeLineID, TimestampTz, TransactionId, XLogRecPtr, XLogRecord, XLogSegNo, uint32,
uint64,
};
// Likewise for these, although the assumption that these don't change is a little more iffy.
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
pub use v14::bindings::{PageHeaderData, XLogRecord};
pub use v14::xlog_utils::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
pub use v14::bindings::{CheckPoint, ControlFileData};
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
@@ -246,13 +243,11 @@ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
// Export some version independent functions that are used outside of this mod
pub use v14::xlog_utils::encode_logical_message;
pub use v14::xlog_utils::get_current_timestamp;
pub use v14::xlog_utils::to_pg_timestamp;
pub use v14::xlog_utils::try_from_pg_timestamp;
pub use v14::xlog_utils::XLogFileName;
pub use v14::bindings::DBState_DB_SHUTDOWNED;
pub use v14::xlog_utils::{
XLogFileName, encode_logical_message, get_current_timestamp, to_pg_timestamp,
try_from_pg_timestamp,
};
pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> bool {
dispatch_pgversion!(version, pgv::bindings::bkpimg_is_compressed(bimg_info))
@@ -355,8 +350,9 @@ pub fn fsm_logical_to_physical(addr: BlockNumber) -> BlockNumber {
}
pub mod waldecoder {
use bytes::{Buf, Bytes, BytesMut};
use std::num::NonZeroU32;
use bytes::{Buf, Bytes, BytesMut};
use thiserror::Error;
use utils::lsn::Lsn;

View File

@@ -9,8 +9,7 @@
//! comments on them.
//!
use crate::PageHeaderData;
use crate::BLCKSZ;
use crate::{BLCKSZ, PageHeaderData};
//
// From pg_tablespace_d.h

View File

@@ -3,18 +3,16 @@
//!
//! TODO: Generate separate types for each supported PG version
use crate::pg_constants;
use crate::XLogRecord;
use crate::{
BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId, TimestampTz,
TransactionId,
};
use crate::{BLCKSZ, XLOG_SIZE_OF_XLOG_RECORD};
use bytes::{Buf, Bytes};
use serde::{Deserialize, Serialize};
use utils::bin_ser::DeserializeError;
use utils::lsn::Lsn;
use crate::{
BLCKSZ, BlockNumber, MultiXactId, MultiXactOffset, MultiXactStatus, Oid, RepOriginId,
TimestampTz, TransactionId, XLOG_SIZE_OF_XLOG_RECORD, XLogRecord, pg_constants,
};
#[repr(C)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct XlMultiXactCreate {
@@ -508,9 +506,10 @@ pub fn decode_wal_record(
}
pub mod v14 {
use crate::{OffsetNumber, TransactionId};
use bytes::{Buf, Bytes};
use crate::{OffsetNumber, TransactionId};
#[repr(C)]
#[derive(Debug)]
pub struct XlHeapInsert {
@@ -678,9 +677,10 @@ pub mod v15 {
}
pub mod v16 {
use bytes::{Buf, Bytes};
pub use super::v14::{XlHeapInsert, XlHeapLockUpdated, XlHeapMultiInsert, XlParameterChange};
use crate::{OffsetNumber, TransactionId};
use bytes::{Buf, Bytes};
pub struct XlHeapDelete {
pub xmax: TransactionId,
@@ -746,9 +746,10 @@ pub mod v16 {
/* Since PG16, we have the Neon RMGR (RM_NEON_ID) to manage Neon-flavored WAL. */
pub mod rm_neon {
use crate::{OffsetNumber, TransactionId};
use bytes::{Buf, Bytes};
use crate::{OffsetNumber, TransactionId};
#[repr(C)]
#[derive(Debug)]
pub struct XlNeonHeapInsert {
@@ -858,14 +859,14 @@ pub mod v16 {
}
pub mod v17 {
pub use super::v14::XlHeapLockUpdated;
pub use crate::{TimeLineID, TimestampTz};
use bytes::{Buf, Bytes};
pub use super::v16::rm_neon;
pub use super::v14::XlHeapLockUpdated;
pub use super::v16::{
XlHeapDelete, XlHeapInsert, XlHeapLock, XlHeapMultiInsert, XlHeapUpdate, XlParameterChange,
rm_neon,
};
pub use crate::{TimeLineID, TimestampTz};
#[repr(C)]
#[derive(Debug)]

View File

@@ -1,7 +1,9 @@
use std::path::PathBuf;
use std::str::FromStr;
use anyhow::*;
use clap::{value_parser, Arg, ArgMatches, Command};
use clap::{Arg, ArgMatches, Command, value_parser};
use postgres::Client;
use std::{path::PathBuf, str::FromStr};
use wal_craft::*;
fn main() -> Result<()> {

View File

@@ -1,17 +1,18 @@
use anyhow::{bail, ensure};
use camino_tempfile::{tempdir, Utf8TempDir};
use log::*;
use postgres::types::PgLsn;
use postgres::Client;
use postgres_ffi::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
use postgres_ffi::{
XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, Instant};
use anyhow::{bail, ensure};
use camino_tempfile::{Utf8TempDir, tempdir};
use log::*;
use postgres::Client;
use postgres::types::PgLsn;
use postgres_ffi::{
WAL_SEGMENT_SIZE, XLOG_BLCKSZ, XLOG_SIZE_OF_XLOG_LONG_PHD, XLOG_SIZE_OF_XLOG_RECORD,
XLOG_SIZE_OF_XLOG_SHORT_PHD,
};
macro_rules! xlog_utils_test {
($version:ident) => {
#[path = "."]

View File

@@ -10,11 +10,10 @@
//! calls.
//!
//! [Box]: https://docs.rs/futures-util/0.3.26/src/futures_util/lock/bilock.rs.html#107
use std::future::Future;
use std::io::{self, ErrorKind};
use bytes::{Buf, BytesMut};
use std::{
future::Future,
io::{self, ErrorKind},
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf};
use crate::{BeMessage, FeMessage, FeStartupPacket, ProtocolError};

View File

@@ -5,14 +5,15 @@
pub mod framed;
use std::borrow::Cow;
use std::{fmt, io, str};
use byteorder::{BigEndian, ReadBytesExt};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::{borrow::Cow, fmt, io, str};
// re-export for use in utils pageserver_feedback.rs
pub use postgres_protocol::PG_EPOCH;
use serde::{Deserialize, Serialize};
pub type Oid = u32;
pub type SystemId = u64;
@@ -206,8 +207,8 @@ use rand::distributions::{Distribution, Standard};
impl Distribution<CancelKeyData> for Standard {
fn sample<R: rand::Rng + ?Sized>(&self, rng: &mut R) -> CancelKeyData {
CancelKeyData {
backend_pid: rng.gen(),
cancel_key: rng.gen(),
backend_pid: rng.r#gen(),
cancel_key: rng.r#gen(),
}
}
}
@@ -1035,7 +1036,7 @@ impl BeMessage<'_> {
buf.put_u8(b'd');
write_body(buf, |buf| {
buf.put_u8(b'0'); // matches INTERPRETED_WAL_RECORD_TAG in postgres-protocol
// dependency
// dependency
buf.put_u64(rec.streaming_lsn);
buf.put_u64(rec.commit_lsn);
buf.put_slice(rec.data);

View File

@@ -130,11 +130,7 @@ impl StorageModel {
break;
}
}
if possible {
Some(snapshot_later)
} else {
None
}
if possible { Some(snapshot_later) } else { None }
} else {
None
};

View File

@@ -76,7 +76,10 @@ pub fn draw_svg(
let mut result = String::new();
writeln!(result, "<svg xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\" height=\"300\" width=\"500\">")?;
writeln!(
result,
"<svg xmlns=\"http://www.w3.org/2000/svg\" xmlns:xlink=\"http://www.w3.org/1999/xlink\" height=\"300\" width=\"500\">"
)?;
draw.calculate_svg_layout();

View File

@@ -1,8 +1,8 @@
//! Tracing wrapper for Hyper HTTP server
use hyper0::HeaderMap;
use hyper0::{Body, Request, Response};
use std::future::Future;
use hyper0::{Body, HeaderMap, Request, Response};
use tracing::Instrument;
use tracing_opentelemetry::OpenTelemetrySpanExt;

View File

@@ -36,11 +36,11 @@
pub mod http;
use opentelemetry::trace::TracerProvider;
use opentelemetry::KeyValue;
use opentelemetry::trace::TracerProvider;
use tracing::Subscriber;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;
use tracing_subscriber::registry::LookupSpan;
/// Set up OpenTelemetry exporter, using configuration from environment variables.
///

View File

@@ -1,6 +1,6 @@
use std::time::Duration;
use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use criterion::{Bencher, Criterion, criterion_group, criterion_main};
use pprof::criterion::{Output, PProfProfiler};
use utils::id;
use utils::logging::log_slow;

View File

@@ -1,12 +1,15 @@
// For details about authentication see docs/authentication.md
use arc_swap::ArcSwap;
use std::{borrow::Cow, fmt::Display, fs, sync::Arc};
use std::borrow::Cow;
use std::fmt::Display;
use std::fs;
use std::sync::Arc;
use anyhow::Result;
use arc_swap::ArcSwap;
use camino::Utf8Path;
use jsonwebtoken::{
decode, encode, Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation,
Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, decode, encode,
};
use serde::{Deserialize, Serialize};
@@ -129,7 +132,9 @@ impl JwtAuth {
anyhow::bail!("path is neither a directory or a file")
};
if decoding_keys.is_empty() {
anyhow::bail!("Configured for JWT auth with zero decoding keys. All JWT gated requests would be rejected.");
anyhow::bail!(
"Configured for JWT auth with zero decoding keys. All JWT gated requests would be rejected."
);
}
Ok(Self::new(decoding_keys))
}
@@ -175,9 +180,10 @@ pub fn encode_from_key_file(claims: &Claims, key_data: &[u8]) -> Result<String>
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
use super::*;
// Generated with:
//
// openssl genpkey -algorithm ed25519 -out ed25519-priv.pem
@@ -215,7 +221,9 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let encoded_eddsa = "eyJhbGciOiJFZERTQSIsInR5cCI6IkpXVCJ9.eyJzY29wZSI6InRlbmFudCIsInRlbmFudF9pZCI6IjNkMWY3NTk1YjQ2ODIzMDMwNGUwYjczY2VjYmNiMDgxIiwiaXNzIjoibmVvbi5jb250cm9scGxhbmUiLCJpYXQiOjE2Nzg0NDI0Nzl9.rNheBnluMJNgXzSTTJoTNIGy4P_qe0JUHl_nVEGuDCTgHOThPVr552EnmKccrCKquPeW3c2YUk0Y9Oh4KyASAw";
// Check it can be validated with the public key
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap()]);
let auth = JwtAuth::new(vec![
DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(),
]);
let claims_from_token = auth.decode(encoded_eddsa).unwrap().claims;
assert_eq!(claims_from_token, expected_claims);
}
@@ -230,7 +238,9 @@ MC4CAQAwBQYDK2VwBCIEID/Drmc1AA6U/znNRWpF3zEGegOATQxfkdWxitcOMsIH
let encoded = encode_from_key_file(&claims, TEST_PRIV_KEY_ED25519).unwrap();
// decode it back
let auth = JwtAuth::new(vec![DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap()]);
let auth = JwtAuth::new(vec![
DecodingKey::from_ed_pem(TEST_PUB_KEY_ED25519).unwrap(),
]);
let decoded = auth.decode(&encoded).unwrap();
assert_eq!(decoded.claims, claims);

View File

@@ -121,10 +121,12 @@ where
#[cfg(test)]
mod tests {
use super::*;
use std::io;
use tokio::sync::Mutex;
use super::*;
#[test]
fn backoff_defaults_produce_growing_backoff_sequence() {
let mut current_backoff_value = None;

View File

@@ -13,9 +13,11 @@
#![warn(missing_docs)]
use bincode::Options;
use serde::{de::DeserializeOwned, Serialize};
use std::io::{self, Read, Write};
use bincode::Options;
use serde::Serialize;
use serde::de::DeserializeOwned;
use thiserror::Error;
/// An error that occurred during a deserialize operation
@@ -261,10 +263,12 @@ impl<T> LeSer for T {}
#[cfg(test)]
mod tests {
use super::DeserializeError;
use serde::{Deserialize, Serialize};
use std::io::Cursor;
use serde::{Deserialize, Serialize};
use super::DeserializeError;
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct ShortStruct {
a: u8,

View File

@@ -1,7 +1,5 @@
use std::{
fmt::Display,
time::{Duration, Instant},
};
use std::fmt::Display;
use std::time::{Duration, Instant};
use metrics::IntCounter;

View File

@@ -1,4 +1,5 @@
use tokio_util::task::{task_tracker::TaskTrackerToken, TaskTracker};
use tokio_util::task::TaskTracker;
use tokio_util::task::task_tracker::TaskTrackerToken;
/// While a reference is kept around, the associated [`Barrier::wait`] will wait.
///

View File

@@ -1,9 +1,7 @@
use std::borrow::Cow;
use std::fs::{self, File};
use std::io::{self, Write};
use std::os::fd::AsRawFd;
use std::{
borrow::Cow,
fs::{self, File},
io::{self, Write},
};
use camino::{Utf8Path, Utf8PathBuf};

View File

@@ -1,6 +1,7 @@
//! Wrapper around `std::env::var` for parsing environment variables.
use std::{fmt::Display, str::FromStr};
use std::fmt::Display;
use std::str::FromStr;
/// For types `V` that implement [`FromStr`].
pub fn var<V, E>(varname: &str) -> Option<V>

View File

@@ -127,6 +127,9 @@ pub async fn failpoint_sleep_cancellable_helper(
tracing::info!("failpoint {:?}: sleep done", name);
}
/// Initialize the configured failpoints
///
/// You must call this function before any concurrent threads do operations.
pub fn init() -> fail::FailScenario<'static> {
// The failpoints lib provides support for parsing the `FAILPOINTS` env var.
// We want non-default behavior for `exit`, though, so, we handle it separately.
@@ -134,7 +137,10 @@ pub fn init() -> fail::FailScenario<'static> {
// Format for FAILPOINTS is "name=actions" separated by ";".
let actions = std::env::var("FAILPOINTS");
if actions.is_ok() {
std::env::remove_var("FAILPOINTS");
// SAFETY: this function should before any threads start and access env vars concurrently
unsafe {
std::env::remove_var("FAILPOINTS");
}
} else {
// let the library handle non-utf8, or nothing for not present
}

View File

@@ -58,9 +58,8 @@ where
#[cfg(test)]
mod test {
use crate::fs_ext::{is_directory_empty, list_dir};
use super::ignore_absent_files;
use crate::fs_ext::{is_directory_empty, list_dir};
#[test]
fn is_empty_dir() {

View File

@@ -38,7 +38,8 @@ pub fn rename_noreplace<P1: ?Sized + NixPath, P2: ?Sized + NixPath>(
#[cfg(test)]
mod test {
use std::{fs, path::PathBuf};
use std::fs;
use std::path::PathBuf;
use super::*;

View File

@@ -169,9 +169,9 @@ mod test {
];
let mut s = String::new();
for (line, gen, expected) in examples {
for (line, gen_, expected) in examples {
s.clear();
write!(s, "{}", &gen.get_suffix()).expect("string grows");
write!(s, "{}", &gen_.get_suffix()).expect("string grows");
assert_eq!(s, expected, "example on {line}");
}
}

View File

@@ -1,8 +1,9 @@
//! A wrapper around `ArcSwap` that ensures there is only one writer at a time and writes
//! don't block reads.
use arc_swap::ArcSwap;
use std::sync::Arc;
use arc_swap::ArcSwap;
use tokio::sync::TryLockError;
pub struct GuardArcSwap<T> {

View File

@@ -1,5 +1,6 @@
use std::fmt;
use std::num::ParseIntError;
use std::{fmt, str::FromStr};
use std::str::FromStr;
use anyhow::Context;
use hex::FromHex;
@@ -215,7 +216,7 @@ macro_rules! id_newtype {
impl AsRef<[u8]> for $t {
fn as_ref(&self) -> &[u8] {
&self.0 .0
&self.0.0
}
}
@@ -367,9 +368,8 @@ impl FromStr for NodeId {
mod tests {
use serde_assert::{Deserializer, Serializer, Token, Tokens};
use crate::bin_ser::BeSer;
use super::*;
use crate::bin_ser::BeSer;
#[test]
fn test_id_serde_non_human_readable() {

View File

@@ -21,15 +21,12 @@
//!
//! Another explaination can be found here: <https://brandur.org/rate-limiting>
use std::{
sync::{
atomic::{AtomicU64, Ordering},
Mutex,
},
time::Duration,
};
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use tokio::{sync::Notify, time::Instant};
use tokio::sync::Notify;
use tokio::time::Instant;
pub struct LeakyBucketConfig {
/// This is the "time cost" of a single request unit.

View File

@@ -2,21 +2,23 @@
//!
//! <https://elixir.bootlin.com/linux/v6.1.128/source/include/uapi/linux/sockios.h#L25-L27>
use std::{
io,
mem::MaybeUninit,
os::{fd::RawFd, raw::c_int},
};
use std::io;
use std::mem::MaybeUninit;
use std::os::fd::RawFd;
use std::os::raw::c_int;
use nix::libc::{FIONREAD, TIOCOUTQ};
unsafe fn do_ioctl(socket_fd: RawFd, cmd: nix::libc::Ioctl) -> io::Result<c_int> {
let mut inq: MaybeUninit<c_int> = MaybeUninit::uninit();
let err = nix::libc::ioctl(socket_fd, cmd, inq.as_mut_ptr());
if err == 0 {
Ok(inq.assume_init())
} else {
Err(io::Error::last_os_error())
// SAFETY: encapsulating fn is unsafe, we require `socket_fd` to be a valid file descriptor
unsafe {
let err = nix::libc::ioctl(socket_fd, cmd, inq.as_mut_ptr());
if err == 0 {
Ok(inq.assume_init())
} else {
Err(io::Error::last_os_error())
}
}
}
@@ -24,12 +26,14 @@ unsafe fn do_ioctl(socket_fd: RawFd, cmd: nix::libc::Ioctl) -> io::Result<c_int>
///
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
pub unsafe fn inq(socket_fd: RawFd) -> io::Result<c_int> {
do_ioctl(socket_fd, FIONREAD)
// SAFETY: encapsulating fn is unsafe
unsafe { do_ioctl(socket_fd, FIONREAD) }
}
/// # Safety
///
/// Caller must ensure that `socket_fd` is a valid TCP socket file descriptor.
pub unsafe fn outq(socket_fd: RawFd) -> io::Result<c_int> {
do_ioctl(socket_fd, TIOCOUTQ)
// SAFETY: encapsulating fn is unsafe
unsafe { do_ioctl(socket_fd, TIOCOUTQ) }
}

View File

@@ -6,16 +6,15 @@
//! there for potential pitfalls with lock files that are used
//! to store PIDs (pidfiles).
use std::{
fs,
io::{Read, Write},
ops::Deref,
os::unix::prelude::AsRawFd,
};
use std::fs;
use std::io::{Read, Write};
use std::ops::Deref;
use std::os::unix::prelude::AsRawFd;
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use nix::{errno::Errno::EAGAIN, fcntl};
use nix::errno::Errno::EAGAIN;
use nix::fcntl;
use crate::crashsafe;

View File

@@ -273,7 +273,9 @@ fn log_panic_to_stderr(
location: Option<PrettyLocation<'_, '_>>,
backtrace: &std::backtrace::Backtrace,
) {
eprintln!("panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}");
eprintln!(
"panic while tracing is unconfigured: thread '{thread}' panicked at '{msg}', {location:?}\nStack backtrace:\n{backtrace}"
);
}
struct PrettyLocation<'a, 'b>(&'a std::panic::Location<'b>);
@@ -361,7 +363,8 @@ pub async fn log_slow<O>(name: &str, threshold: Duration, f: impl Future<Output
#[cfg(test)]
mod tests {
use metrics::{core::Opts, IntCounterVec};
use metrics::IntCounterVec;
use metrics::core::Opts;
use crate::logging::{TracingEventCountLayer, TracingEventCountMetric};

View File

@@ -1,11 +1,13 @@
#![warn(missing_docs)]
use serde::{de::Visitor, Deserialize, Serialize};
use std::fmt;
use std::ops::{Add, AddAssign};
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use serde::de::Visitor;
use serde::{Deserialize, Serialize};
use crate::seqwait::MonotonicCounter;
/// Transaction log block size in bytes
@@ -407,11 +409,10 @@ impl rand::distributions::uniform::UniformSampler for LsnSampler {
#[cfg(test)]
mod tests {
use crate::bin_ser::BeSer;
use serde_assert::{Deserializer, Serializer, Token, Tokens};
use super::*;
use serde_assert::{Deserializer, Serializer, Token, Tokens};
use crate::bin_ser::BeSer;
#[test]
fn test_lsn_strings() {

View File

@@ -1,7 +1,8 @@
use pin_project_lite::pin_project;
use std::io::Read;
use std::pin::Pin;
use std::{io, task};
use pin_project_lite::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
pin_project! {

View File

@@ -1,7 +1,7 @@
use std::time::{Duration, SystemTime};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use pq_proto::{read_cstr, PG_EPOCH};
use pq_proto::{PG_EPOCH, read_cstr};
use serde::{Deserialize, Serialize};
use tracing::{trace, warn};

View File

@@ -3,7 +3,7 @@
//! postgres_connection crate.
use anyhow::Context;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use postgres_connection::{PgConnectionConfig, parse_host_port};
use crate::id::TenantTimelineId;

View File

@@ -53,10 +53,11 @@ mod tests {
#[test]
fn basics() {
use super::RateLimit;
use std::sync::atomic::Ordering::Relaxed;
use std::time::Duration;
use super::RateLimit;
let called = AtomicUsize::new(0);
let mut f = RateLimit::new(Duration::from_millis(100));

View File

@@ -1,7 +1,7 @@
use sentry::ClientInitGuard;
use std::borrow::Cow;
use std::env;
use sentry::ClientInitGuard;
pub use sentry::release_name;
#[must_use]

View File

@@ -5,6 +5,7 @@ use std::collections::BinaryHeap;
use std::mem;
use std::sync::Mutex;
use std::time::Duration;
use tokio::sync::watch::{self, channel};
use tokio::time::timeout;
@@ -248,11 +249,7 @@ where
let internal = self.internal.lock().unwrap();
let cnt = internal.current.cnt_value();
drop(internal);
if cnt >= num {
Ok(())
} else {
Err(cnt)
}
if cnt >= num { Ok(()) } else { Err(cnt) }
}
/// Register and return a channel that will be notified when a number arrives,
@@ -325,9 +322,10 @@ where
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use super::*;
impl MonotonicCounter<i32> for i32 {
fn cnt_advance(&mut self, val: i32) {
assert!(*self <= val);

View File

@@ -12,11 +12,7 @@ pub struct Percent(#[serde(deserialize_with = "deserialize_pct_0_to_100")] u8);
impl Percent {
pub const fn new(pct: u8) -> Option<Self> {
if pct <= 100 {
Some(Percent(pct))
} else {
None
}
if pct <= 100 { Some(Percent(pct)) } else { None }
}
pub fn get(&self) -> u8 {

View File

@@ -1,6 +1,7 @@
//! See `pageserver_api::shard` for description on sharding.
use std::{ops::RangeInclusive, str::FromStr};
use std::ops::RangeInclusive;
use std::str::FromStr;
use hex::FromHex;
use serde::{Deserialize, Serialize};
@@ -59,11 +60,7 @@ impl ShardCount {
/// This method returns the actual number of shards, i.e. if our internal value is
/// zero, we return 1 (unsharded tenants have 1 shard).
pub fn count(&self) -> u8 {
if self.0 > 0 {
self.0
} else {
1
}
if self.0 > 0 { self.0 } else { 1 }
}
/// The literal internal value: this is **not** the number of shards in the

View File

@@ -1,7 +1,7 @@
pub use signal_hook::consts::TERM_SIGNALS;
pub use signal_hook::consts::signal::*;
use signal_hook::iterator::Signals;
pub use signal_hook::consts::{signal::*, TERM_SIGNALS};
pub enum Signal {
Quit,
Interrupt,

View File

@@ -44,8 +44,7 @@
#![warn(missing_docs)]
use std::ops::Deref;
use std::sync::{Arc, Weak};
use std::sync::{RwLock, RwLockWriteGuard};
use std::sync::{Arc, RwLock, RwLockWriteGuard, Weak};
use tokio::sync::watch;
@@ -219,10 +218,11 @@ impl RcuWaitList {
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
use std::time::Duration;
use super::*;
#[tokio::test]
async fn two_writers() {
let rcu = Rcu::new(1);

View File

@@ -1,10 +1,6 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
/// Gates are a concurrency helper, primarily used for implementing safe shutdown.
///

View File

@@ -1,7 +1,6 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex, MutexGuard,
};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use tokio::sync::Semaphore;
/// Custom design like [`tokio::sync::OnceCell`] but using [`OwnedSemaphorePermit`] instead of
@@ -301,14 +300,13 @@ impl Drop for InitPermit {
#[cfg(test)]
mod tests {
use std::convert::Infallible;
use std::pin::{Pin, pin};
use std::time::Duration;
use futures::Future;
use super::*;
use std::{
convert::Infallible,
pin::{pin, Pin},
time::Duration,
};
#[tokio::test]
async fn many_initializers() {

View File

@@ -1,4 +1,5 @@
use core::{future::poll_fn, task::Poll};
use core::future::poll_fn;
use core::task::Poll;
use std::sync::{Arc, Mutex};
use diatomic_waker::DiatomicWaker;

View File

@@ -1,9 +1,8 @@
use std::{
io,
net::{TcpListener, ToSocketAddrs},
};
use std::io;
use std::net::{TcpListener, ToSocketAddrs};
use nix::sys::socket::{setsockopt, sockopt::ReuseAddr};
use nix::sys::socket::setsockopt;
use nix::sys::socket::sockopt::ReuseAddr;
/// Bind a [`TcpListener`] to addr with `SO_REUSEADDR` set to true.
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<TcpListener> {

View File

@@ -172,16 +172,14 @@ fn tracing_subscriber_configured() -> bool {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::fmt::{self};
use std::hash::{Hash, Hasher};
use tracing_subscriber::prelude::*;
use super::*;
use std::{
collections::HashSet,
fmt::{self},
hash::{Hash, Hasher},
};
struct MemoryIdentity<'a>(&'a dyn Extractor);
impl MemoryIdentity<'_> {

View File

@@ -44,10 +44,12 @@ where
#[cfg(test)]
mod tests {
use super::*;
use arc_swap::ArcSwap;
use std::sync::Arc;
use arc_swap::ArcSwap;
use super::*;
#[test]
fn test_try_rcu_success() {
let swap = ArcSwap::from(Arc::new(42));

View File

@@ -1,4 +1,6 @@
use std::{alloc::Layout, cmp::Ordering, ops::RangeBounds};
use std::alloc::Layout;
use std::cmp::Ordering;
use std::ops::RangeBounds;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum VecMapOrdering {
@@ -214,7 +216,8 @@ fn extract_key<K, V>(entry: &(K, V)) -> &K {
#[cfg(test)]
mod tests {
use std::{collections::BTreeMap, ops::Bound};
use std::collections::BTreeMap;
use std::ops::Bound;
use super::{VecMap, VecMapOrdering};

View File

@@ -1,19 +1,14 @@
use std::io::SeekFrom;
use anyhow::{Context, Result};
use async_compression::{
tokio::{bufread::ZstdDecoder, write::ZstdEncoder},
zstd::CParameter,
Level,
};
use async_compression::Level;
use async_compression::tokio::bufread::ZstdDecoder;
use async_compression::tokio::write::ZstdEncoder;
use async_compression::zstd::CParameter;
use camino::Utf8Path;
use nix::NixPath;
use tokio::{
fs::{File, OpenOptions},
io::AsyncBufRead,
io::AsyncSeekExt,
io::AsyncWriteExt,
};
use tokio::fs::{File, OpenOptions};
use tokio::io::{AsyncBufRead, AsyncSeekExt, AsyncWriteExt};
use tokio_tar::{Archive, Builder, HeaderMode};
use walkdir::WalkDir;

View File

@@ -1,7 +1,8 @@
use std::io::Read;
use bytes::{Buf, BytesMut};
use hex_literal::hex;
use serde::Deserialize;
use std::io::Read;
use utils::bin_ser::LeSer;
#[derive(Debug, PartialEq, Eq, Deserialize)]

View File

@@ -1,23 +1,25 @@
use anyhow::Context;
use criterion::{criterion_group, criterion_main, Criterion};
use futures::{stream::FuturesUnordered, StreamExt};
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
use postgres_ffi::{waldecoder::WalStreamDecoder, MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
use pprof::criterion::{Output, PProfProfiler};
use serde::Deserialize;
use std::{env, num::NonZeroUsize, sync::Arc};
use std::env;
use std::num::NonZeroUsize;
use std::sync::Arc;
use anyhow::Context;
use camino::{Utf8Path, Utf8PathBuf};
use camino_tempfile::Utf8TempDir;
use criterion::{Criterion, criterion_group, criterion_main};
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use pageserver_api::shard::{ShardIdentity, ShardStripeSize};
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::{MAX_SEND_SIZE, WAL_SEGMENT_SIZE};
use pprof::criterion::{Output, PProfProfiler};
use remote_storage::{
DownloadOpts, GenericRemoteStorage, ListingMode, RemoteStorageConfig, RemoteStorageKind,
S3Config,
};
use serde::Deserialize;
use tokio_util::sync::CancellationToken;
use utils::{
lsn::Lsn,
shard::{ShardCount, ShardNumber},
};
use utils::lsn::Lsn;
use utils::shard::{ShardCount, ShardNumber};
use wal_decoder::models::InterpretedWalRecord;
const S3_BUCKET: &str = "neon-github-public-dev";
@@ -31,7 +33,7 @@ const METADATA_FILENAME: &str = "metadata.json";
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
#[allow(non_upper_case_globals)]
#[export_name = "malloc_conf"]
#[unsafe(export_name = "malloc_conf")]
pub static malloc_conf: &[u8] = b"prof:true,prof_active:true,lg_prof_sample:20\0";
async fn create_s3_client() -> anyhow::Result<Arc<GenericRemoteStorage>> {

View File

@@ -3,8 +3,6 @@
use std::collections::HashMap;
use crate::models::*;
use crate::serialized_batch::SerializedValueBatch;
use bytes::{Buf, Bytes};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::reltag::{RelTag, SlruKind};
@@ -14,6 +12,9 @@ use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
use postgres_ffi::walrecord::*;
use utils::lsn::Lsn;
use crate::models::*;
use crate::serialized_batch::SerializedValueBatch;
impl InterpretedWalRecord {
/// Decode and interpreted raw bytes which represent one Postgres WAL record.
/// Data blocks which do not match any of the provided shard identities are filtered out.

View File

@@ -8,20 +8,18 @@
use std::collections::{BTreeSet, HashMap};
use bytes::{Bytes, BytesMut};
use pageserver_api::key::rel_block_to_key;
use pageserver_api::key::{CompactKey, Key, rel_block_to_key};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::record::NeonWalRecord;
use pageserver_api::reltag::RelTag;
use pageserver_api::shard::ShardIdentity;
use pageserver_api::{key::CompactKey, value::Value};
use pageserver_api::value::Value;
use postgres_ffi::walrecord::{DecodedBkpBlock, DecodedWALRecord};
use postgres_ffi::{page_is_new, page_set_lsn, pg_constants, BLCKSZ};
use postgres_ffi::{BLCKSZ, page_is_new, page_set_lsn, pg_constants};
use serde::{Deserialize, Serialize};
use utils::bin_ser::BeSer;
use utils::lsn::Lsn;
use pageserver_api::key::Key;
use crate::models::InterpretedWalRecord;
static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
@@ -515,10 +513,11 @@ impl SerializedValueBatch {
let empty = self.raw.is_empty();
if cfg!(debug_assertions) && empty {
assert!(self
.metadata
.iter()
.all(|meta| matches!(meta, ValueMeta::Observed(_))));
assert!(
self.metadata
.iter()
.all(|meta| matches!(meta, ValueMeta::Observed(_)))
);
}
!empty

View File

@@ -7,15 +7,12 @@ use utils::lsn::Lsn;
use utils::postgres_client::{Compression, InterpretedFormat};
use crate::models::{
FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord,
FlushUncommittedRecords, InterpretedWalRecord, InterpretedWalRecords, MetadataRecord, proto,
};
use crate::serialized_batch::{
ObservedValueMeta, SerializedValueBatch, SerializedValueMeta, ValueMeta,
};
use crate::models::proto;
#[derive(Debug, thiserror::Error)]
pub enum ToWireFormatError {
#[error("{0}")]
@@ -83,8 +80,8 @@ impl ToWireFormat for InterpretedWalRecords {
format: InterpretedFormat,
compression: Option<Compression>,
) -> Result<Bytes, ToWireFormatError> {
use async_compression::tokio::write::ZstdEncoder;
use async_compression::Level;
use async_compression::tokio::write::ZstdEncoder;
let encode_res: Result<Bytes, ToWireFormatError> = match format {
InterpretedFormat::Bincode => {

View File

@@ -1,9 +1,11 @@
//! Links with walproposer, pgcommon, pgport and runs bindgen on walproposer.h
//! to generate Rust bindings for it.
use std::{env, path::PathBuf, process::Command};
use std::env;
use std::path::PathBuf;
use std::process::Command;
use anyhow::{anyhow, Context};
use anyhow::{Context, anyhow};
const WALPROPOSER_PG_VERSION: &str = "v17";

View File

@@ -3,27 +3,14 @@
#![allow(dead_code)]
use std::ffi::CStr;
use std::ffi::CString;
use std::ffi::{CStr, CString};
use crate::bindings::uint32;
use crate::bindings::walproposer_api;
use crate::bindings::NeonWALReadResult;
use crate::bindings::PGAsyncReadResult;
use crate::bindings::PGAsyncWriteResult;
use crate::bindings::Safekeeper;
use crate::bindings::Size;
use crate::bindings::StringInfoData;
use crate::bindings::TimestampTz;
use crate::bindings::WalProposer;
use crate::bindings::WalProposerConnStatusType;
use crate::bindings::WalProposerConnectPollStatusType;
use crate::bindings::WalProposerExecStatusType;
use crate::bindings::WalproposerShmemState;
use crate::bindings::XLogRecPtr;
use crate::walproposer::ApiImpl;
use crate::walproposer::StreamingCallback;
use crate::walproposer::WaitResult;
use crate::bindings::{
NeonWALReadResult, PGAsyncReadResult, PGAsyncWriteResult, Safekeeper, Size, StringInfoData,
TimestampTz, WalProposer, WalProposerConnStatusType, WalProposerConnectPollStatusType,
WalProposerExecStatusType, WalproposerShmemState, XLogRecPtr, uint32, walproposer_api,
};
use crate::walproposer::{ApiImpl, StreamingCallback, WaitResult};
extern "C" fn get_shmem_state(wp: *mut WalProposer) -> *mut WalproposerShmemState {
unsafe {

View File

@@ -2,15 +2,15 @@
use std::ffi::CString;
use crate::{
api_bindings::{create_api, take_vec_u8, Level},
bindings::{
NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
},
};
use postgres_ffi::WAL_SEGMENT_SIZE;
use utils::{id::TenantTimelineId, lsn::Lsn};
use utils::id::TenantTimelineId;
use utils::lsn::Lsn;
use crate::api_bindings::{Level, create_api, take_vec_u8};
use crate::bindings::{
NeonWALReadResult, Safekeeper, WalProposer, WalProposerBroadcast, WalProposerConfig,
WalProposerCreate, WalProposerFree, WalProposerPoll, WalProposerStart,
};
/// Rust high-level wrapper for C walproposer API. Many methods are not required
/// for simple cases, hence todo!() in default implementations.
@@ -275,22 +275,17 @@ impl StreamingCallback {
#[cfg(test)]
mod tests {
use core::panic;
use std::{
cell::Cell,
ffi::CString,
sync::{atomic::AtomicUsize, mpsc::sync_channel},
};
use std::cell::{Cell, UnsafeCell};
use std::ffi::CString;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::sync_channel;
use std::cell::UnsafeCell;
use utils::id::TenantTimelineId;
use crate::{
api_bindings::Level,
bindings::{NeonWALReadResult, PG_VERSION_NUM},
walproposer::Wrapper,
};
use super::ApiImpl;
use crate::api_bindings::Level;
use crate::bindings::{NeonWALReadResult, PG_VERSION_NUM};
use crate::walproposer::Wrapper;
#[derive(Clone, Copy, Debug)]
struct WaitEventsData {

View File

@@ -1,17 +1,15 @@
use std::{collections::HashMap, error::Error as _};
use std::collections::HashMap;
use std::error::Error as _;
use bytes::Bytes;
use reqwest::{IntoUrl, Method, StatusCode};
use detach_ancestor::AncestorDetached;
use http_utils::error::HttpErrorBody;
use pageserver_api::{models::*, shard::TenantShardId};
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use pageserver_api::models::*;
use pageserver_api::shard::TenantShardId;
pub use reqwest::Body as ReqwestBody;
use reqwest::{IntoUrl, Method, StatusCode};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use crate::BlockUnblock;

View File

@@ -1,23 +1,16 @@
use std::sync::{Arc, Mutex};
use futures::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt,
};
use pageserver_api::{
models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest,
PagestreamGetPageResponse,
},
reltag::RelTag,
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use pageserver_api::models::{
PagestreamBeMessage, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
};
use pageserver_api::reltag::RelTag;
use tokio::task::JoinHandle;
use tokio_postgres::CopyOutStream;
use tokio_util::sync::CancellationToken;
use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
pub struct Client {
client: tokio_postgres::Client,

Some files were not shown because too many files have changed in this diff Show More