Compare commits

...

8 Commits

Author SHA1 Message Date
Andy Hattemer
489c7a20f4 Update README logo and links from neon.tech to neon.com (#12850)
## Problem

## Summary of changes
2026-01-14 21:09:27 -05:00
Stas Kelvich
015b1c7cb3 Update README (#12827)
Subj
2025-10-03 15:07:57 -07:00
dotdister
5e85c02f37 neon_local: fix mismatched comment about local SSL certificate generation (#12814)
## Problem
In control_plane, the local SSL certificate generation uses `ed25519`,
but the comment still remained `rsa:2048`, resulting in a mismatch.
This mismatch was introduced in #11542.

## Summary of changes
The comment has been corrected from `rsa:2048` to `ed25519` to ensure
consistency with the implementation.
2025-09-30 13:38:49 +02:00
Yongtao Huang
c17d3fe645 Fix typos (#12819)
Fix typo: `Falied` -> `Failed`

Signed-off-by: Yongtao Huang <yongtaoh2022@gmail.com>
2025-09-30 13:37:00 +02:00
dotdister
4ac447c75d fix(control_plane): Fix incorrect file path of identity.toml in error message (#12826)
## Problem
Control_plane shows incorrect file path when identity.toml file open
fails.

## Summary of changes
In the error context, when writing identity.toml, I changed it to use
identity_file_path instead of config_file_path.
2025-09-30 13:35:42 +02:00
Junhyeog Lee
26b47b5beb feat: Add configurable Direct IO alignment support (#12821)
## Problem

Neon's storage system currently has hard-coded 512-byte block size for
Direct IO operations, which causes I/O errors on systems with disks that
have 4096-byte block sizes.

This results in errors like "vec read failed" and "Invalid argument (os
error 22)" on certain hardware configurations.

See issue #12623 for details.

## Summary of changes

Make Direct IO alignment configurable at build time to support both
512-byte and 4096-byte block sizes:

- Add `io-align-512` and `io-align-4k` cargo features (default: 512-byte
for backward compatibility)
- Make `DEFAULT_IO_BUFFER_ALIGNMENT` configurable via cargo features in
`pageserver_api`
- Update `DIO_CHUNK_SIZE` in vectored_dio_read to use the configured
alignment value dynamically
- Add `IO_ALIGNMENT` build argument to Dockerfile to allow building
images with different alignment settings
- Add startup logging to display the configured IO buffer alignment for
operational visibility
- Fix validation logic in `virtual_file.rs` to use the configured
alignment instead of hard-coded 512

This change allows Neon to run on systems with different disk block
sizes by building with the appropriate feature flag, addressing the
compatibility issues described in the RFC on Direct IO implementation

## Performance Note

Benchmarks show 512-byte alignment performs significantly better than
4k:
- Write: 512-byte is 21-71% faster across percentiles (p99: 71% faster)
  - Read: 512-byte is slightly faster (5-21% improvement)

This is why 512-byte remains the default.
However, some storage systems require 4k alignment and will fail with
EINVAL otherwise. This change adds build-time configuration to support
both environments.
2025-09-26 14:43:53 +01:00
John G. Crowley
85ce109361 Initial implementation of GCS provider. (#11666)
## Problem
We are currently using GCS through the AWS API instead of directly to
the GCS API.

## Summary of changes
Draft implementation of a GCS provider. We run Neon on GCS with the AWS
provider via [this
patch](https://github.com/neondatabase/neon/pull/10277), but want to use
GCS API directly. This implementation attempts to do so without adding a
GCS library dependency or new SDK, except for `gcp_auth`.
2025-09-16 10:18:25 +02:00
Peter Bendel
77e22e4bf0 remove obsolete comment - this is a dummy commit (#12816)
## Problem

we ran out of commit comment on same commit sha,
[see](https://github.com/neondatabase/neon/actions/runs/17190868211/job/48766305883#step:10:591)

## Summary of changes

Push another commit to neondatabase/neon.git to create a new commit sha
on main branch
2025-08-25 07:36:41 +00:00
21 changed files with 1977 additions and 28 deletions

View File

@@ -2,9 +2,6 @@ name: large oltp growth
# workflow to grow the reuse branch of large oltp benchmark continuously (about 16 GB per run)
on:
# uncomment to run on push for debugging your PR
# push:
# branches: [ bodobolero/increase_large_oltp_workload ]
schedule:
# * is a special character in YAML so you have to quote this string

98
Cargo.lock generated
View File

@@ -145,9 +145,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.94"
version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
dependencies = [
"backtrace",
]
@@ -2402,9 +2402,9 @@ dependencies = [
[[package]]
name = "futures"
version = "0.3.28"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
@@ -2433,9 +2433,9 @@ checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.28"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
@@ -2510,6 +2510,33 @@ dependencies = [
"slab",
]
[[package]]
name = "gcp_auth"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf67f30198e045a039264c01fb44659ce82402d7771c50938beb41a5ac87733"
dependencies = [
"async-trait",
"base64 0.22.1",
"bytes",
"chrono",
"home",
"http 1.3.1",
"http-body-util",
"hyper 1.4.1",
"hyper-rustls 0.27.5",
"hyper-util",
"ring",
"rustls-pemfile 2.1.1",
"serde",
"serde_json",
"thiserror 1.0.69",
"tokio",
"tracing",
"tracing-futures",
"url",
]
[[package]]
name = "gen_ops"
version = "0.4.0"
@@ -2840,6 +2867,15 @@ dependencies = [
"digest",
]
[[package]]
name = "home"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "hostname"
version = "0.4.0"
@@ -3075,6 +3111,24 @@ dependencies = [
"tower-service",
]
[[package]]
name = "hyper-rustls"
version = "0.27.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2"
dependencies = [
"futures-util",
"http 1.3.1",
"hyper 1.4.1",
"hyper-util",
"rustls 0.23.29",
"rustls-native-certs 0.8.0",
"rustls-pki-types",
"tokio",
"tokio-rustls 0.26.2",
"tower-service",
]
[[package]]
name = "hyper-timeout"
version = "0.5.1"
@@ -3852,6 +3906,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@@ -5872,8 +5936,11 @@ dependencies = [
"bytes",
"camino",
"camino-tempfile",
"chrono",
"futures",
"futures-util",
"gcp_auth",
"http 1.3.1",
"http-body-util",
"http-types",
"humantime-serde",
@@ -5894,7 +5961,9 @@ dependencies = [
"tokio-util",
"toml_edit",
"tracing",
"url",
"utils",
"uuid",
]
[[package]]
@@ -5924,6 +5993,7 @@ dependencies = [
"js-sys",
"log",
"mime",
"mime_guess",
"once_cell",
"percent-encoding",
"pin-project-lite",
@@ -8033,6 +8103,16 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
@@ -8208,6 +8288,12 @@ dependencies = [
"libc",
]
[[package]]
name = "unicase"
version = "2.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539"
[[package]]
name = "unicode-bidi"
version = "0.3.17"

View File

@@ -78,6 +78,7 @@ WORKDIR /home/nonroot
ARG GIT_VERSION=local
ARG BUILD_TAG
ARG ADDITIONAL_RUSTFLAGS=""
ARG IO_ALIGNMENT=512
ENV CARGO_FEATURES="default"
# 3. Build cargo dependencies. Note that this step doesn't depend on anything else than
@@ -101,7 +102,12 @@ COPY --chown=nonroot --from=plan /home/nonroot/Cargo.lock Carg
RUN --mount=type=secret,uid=1000,id=SUBZERO_ACCESS_TOKEN \
set -e \
&& if [ -s /run/secrets/SUBZERO_ACCESS_TOKEN ]; then \
export CARGO_FEATURES="rest_broker"; \
export CARGO_FEATURES="${CARGO_FEATURES},rest_broker"; \
fi \
&& if [ "$IO_ALIGNMENT" = "4k" ]; then \
export CARGO_FEATURES="${CARGO_FEATURES},io-align-4k"; \
elif [ "$IO_ALIGNMENT" = "512" ]; then \
export CARGO_FEATURES="${CARGO_FEATURES},io-align-512"; \
fi \
&& RUSTFLAGS="-Clinker=clang -Clink-arg=-fuse-ld=mold -Clink-arg=-Wl,--no-rosegment -Cforce-frame-pointers=yes ${ADDITIONAL_RUSTFLAGS}" cargo auditable build \
--features $CARGO_FEATURES \

View File

@@ -1,13 +1,13 @@
[![Neon](https://github.com/neondatabase/neon/assets/11527560/f15a17f0-836e-40c5-b35d-030606a6b660)](https://neon.tech)
[![Neon](https://github.com/user-attachments/assets/fd91da5f-44a9-41c7-9075-36a5b5608083)](https://neon.com)
# Neon
Neon is a serverless open-source alternative to AWS Aurora Postgres. It separates storage and compute and substitutes the PostgreSQL storage layer by redistributing data across a cluster of nodes.
Neon is an open-source serverless Postgres database platform. It separates storage and compute and substitutes the PostgreSQL storage layer by redistributing data across a cluster of nodes.
## Quick start
Try the [Neon Free Tier](https://neon.tech/github) to create a serverless Postgres instance. Then connect to it with your preferred Postgres client (psql, dbeaver, etc) or use the online [SQL Editor](https://neon.tech/docs/get-started-with-neon/query-with-neon-sql-editor/). See [Connect from any application](https://neon.tech/docs/connect/connect-from-any-app/) for connection instructions.
Try the [Neon Free Tier](https://neon.com/signup) to create a serverless Postgres instance. Then connect to it with your preferred Postgres client (psql, dbeaver, etc) or use the online [SQL Editor](https://neon.com/docs/get-started-with-neon/query-with-neon-sql-editor/). See [Connect from any application](https://neon.com/docs/connect/connect-from-any-app/) for connection instructions.
Alternatively, compile and run the project [locally](#running-local-installation).
@@ -301,8 +301,8 @@ See also README files in some source directories, and `rustdoc` style documentat
Other resources:
- [SELECT 'Hello, World'](https://neon.tech/blog/hello-world/): Blog post by Nikita Shamgunov on the high level architecture
- [Architecture decisions in Neon](https://neon.tech/blog/architecture-decisions-in-neon/): Blog post by Heikki Linnakangas
- [SELECT 'Hello, World'](https://neon.com/blog/hello-world/): Blog post by Nikita Shamgunov on the high level architecture
- [Architecture decisions in Neon](https://neon.com/blog/architecture-decisions-in-neon/): Blog post by Heikki Linnakangas
- [Neon: Serverless PostgreSQL!](https://www.youtube.com/watch?v=rES0yzeERns): Presentation on storage system by Heikki Linnakangas in the CMU Database Group seminar series
### Postgres-specific terms

View File

@@ -1074,7 +1074,7 @@ fn generate_auth_keys(private_key_path: &Path, public_key_path: &Path) -> anyhow
}
fn generate_ssl_ca_cert(cert_path: &Path, key_path: &Path) -> anyhow::Result<()> {
// openssl req -x509 -newkey rsa:2048 -nodes -subj "/CN=Neon Local CA" -days 36500 \
// openssl req -x509 -newkey ed25519 -nodes -subj "/CN=Neon Local CA" -days 36500 \
// -out rootCA.crt -keyout rootCA.key
let keygen_output = Command::new("openssl")
.args([
@@ -1104,7 +1104,7 @@ fn generate_ssl_cert(
let mut csr_path = cert_path.to_path_buf();
csr_path.set_extension(".csr");
// openssl req -new -nodes -newkey rsa:2048 -keyout server.key -out server.csr \
// openssl req -new -nodes -newkey ed25519 -keyout server.key -out server.csr \
// -subj "/CN=localhost" -addext "subjectAltName=DNS:localhost,IP:127.0.0.1"
let keygen_output = Command::new("openssl")
.args(["req", "-new", "-nodes"])

View File

@@ -233,8 +233,8 @@ impl PageServerNode {
let mut identity_file = std::fs::OpenOptions::new()
.create_new(true)
.write(true)
.open(identity_file_path)
.with_context(|| format!("open identity toml for write: {config_file_path:?}"))?;
.open(&identity_file_path)
.with_context(|| format!("open identity toml for write: {identity_file_path:?}"))?;
let identity_toml = self.pageserver_make_identity_toml(node_id);
identity_file
.write_all(identity_toml.to_string().as_bytes())
@@ -560,12 +560,12 @@ impl PageServerNode {
.remove("sampling_ratio")
.map(serde_json::from_str)
.transpose()
.context("Falied to parse 'sampling_ratio'")?,
.context("Failed to parse 'sampling_ratio'")?,
relsize_snapshot_cache_capacity: settings
.remove("relsize snapshot cache capacity")
.map(|x| x.parse::<usize>())
.transpose()
.context("Falied to parse 'relsize_snapshot_cache_capacity' as integer")?,
.context("Failed to parse 'relsize_snapshot_cache_capacity' as integer")?,
basebackup_cache_enabled: settings
.remove("basebackup_cache_enabled")
.map(|x| x.parse::<bool>())

View File

@@ -5,8 +5,12 @@ edition = "2024"
license.workspace = true
[features]
default = ["io-align-512"]
# See pageserver/Cargo.toml
testing = ["dep:nix"]
# Direct IO alignment options (mutually exclusive)
io-align-512 = []
io-align-4k = []
[dependencies]
serde.workspace = true

View File

@@ -703,6 +703,11 @@ pub mod defaults {
pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0;
#[cfg(feature = "io-align-4k")]
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 4096;
#[cfg(all(feature = "io-align-512", not(feature = "io-align-4k")))]
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
#[cfg(not(any(feature = "io-align-512", feature = "io-align-4k")))]
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 512;
pub const DEFAULT_SSL_KEY_FILE: &str = "server.key";

View File

@@ -19,7 +19,8 @@ camino = { workspace = true, features = ["serde1"] }
humantime-serde.workspace = true
hyper = { workspace = true, features = ["client"] }
futures.workspace = true
reqwest.workspace = true
reqwest = { workspace = true, features = ["multipart", "stream"] }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
serde.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["sync", "fs", "io-util"] }
@@ -41,6 +42,10 @@ http-types.workspace = true
http-body-util.workspace = true
itertools.workspace = true
sync_wrapper = { workspace = true, features = ["futures"] }
gcp_auth = "0.12.3"
url.workspace = true
http.workspace = true
uuid.workspace = true
byteorder = "1.4"
rand.workspace = true

View File

@@ -41,6 +41,7 @@ impl RemoteStorageKind {
RemoteStorageKind::LocalFs { .. } => None,
RemoteStorageKind::AwsS3(config) => Some(&config.bucket_name),
RemoteStorageKind::AzureContainer(config) => Some(&config.container_name),
RemoteStorageKind::GCS(config) => Some(&config.bucket_name),
}
}
}
@@ -51,6 +52,7 @@ impl RemoteStorageConfig {
match &self.storage {
RemoteStorageKind::LocalFs { .. } => DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT,
RemoteStorageKind::AwsS3(c) => c.concurrency_limit.into(),
RemoteStorageKind::GCS(c) => c.concurrency_limit.into(),
RemoteStorageKind::AzureContainer(c) => c.concurrency_limit.into(),
}
}
@@ -85,6 +87,9 @@ pub enum RemoteStorageKind {
/// Azure Blob based storage, storing all files in the container
/// specified by the config
AzureContainer(AzureConfig),
/// Google Cloud based storage, storing all files in the GCS bucket
/// specified by the config
GCS(GCSConfig),
}
#[derive(Deserialize)]
@@ -176,6 +181,32 @@ impl Debug for S3Config {
}
}
#[derive(Clone, PartialEq, Eq, Deserialize, Serialize)]
pub struct GCSConfig {
/// Name of the bucket to connect to.
pub bucket_name: String,
/// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
pub prefix_in_bucket: Option<String>,
#[serde(default = "default_remote_storage_s3_concurrency_limit")]
pub concurrency_limit: NonZeroUsize,
#[serde(default = "default_max_keys_per_list_response")]
pub max_keys_per_list_response: Option<i32>,
}
impl Debug for GCSConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GCSConfig")
.field("bucket_name", &self.bucket_name)
.field("prefix_in_bucket", &self.prefix_in_bucket)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
.finish()
}
}
/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AzureConfig {
@@ -304,6 +335,30 @@ timeout = '5s'";
);
}
#[test]
fn test_gcs_parsing() {
let toml = "\
bucket_name = 'foo-bar'
prefix_in_bucket = '/pageserver'
";
let config = parse(toml).unwrap();
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::GCS(GCSConfig {
bucket_name: "foo-bar".into(),
prefix_in_bucket: Some("pageserver/".into()),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
concurrency_limit: std::num::NonZero::new(100).unwrap(),
}),
timeout: Duration::from_secs(120),
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
}
);
}
#[test]
fn test_s3_parsing() {
let toml = "\

File diff suppressed because it is too large Load Diff

View File

@@ -12,6 +12,7 @@
mod azure_blob;
mod config;
mod error;
mod gcs_bucket;
mod local_fs;
mod metrics;
mod s3_bucket;
@@ -43,10 +44,11 @@ use tokio_util::sync::CancellationToken;
use tracing::info;
pub use self::azure_blob::AzureBlobStorage;
pub use self::gcs_bucket::GCSBucket;
pub use self::local_fs::LocalFs;
pub use self::s3_bucket::S3Bucket;
pub use self::simulate_failures::UnreliableWrapper;
pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
pub use crate::config::{AzureConfig, GCSConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
/// Default concurrency limit for S3 operations
///
@@ -81,8 +83,12 @@ pub const MAX_KEYS_PER_DELETE_S3: usize = 1000;
/// <https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch>
pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256;
pub const MAX_KEYS_PER_DELETE_GCS: usize = 1000;
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
const GCS_SCOPES: &[&str] = &["https://www.googleapis.com/auth/cloud-platform"];
/// Path on the remote storage, relative to some inner prefix.
/// The prefix is an implementation detail, that allows representing local paths
/// as the remote ones, stripping the local storage prefix away.
@@ -182,6 +188,7 @@ pub struct VersionListing {
pub versions: Vec<Version>,
}
#[derive(Debug)]
pub struct Version {
pub key: RemotePath,
pub last_modified: SystemTime,
@@ -203,6 +210,47 @@ pub enum VersionKind {
Version(VersionId),
}
// I was going to do an `enum GenericVersion` but this feels cleaner.
#[derive(Default)]
pub struct GCSVersionListing {
pub versions: Vec<GCSVersion>,
}
#[derive(Debug)]
pub struct GCSVersion {
pub key: RemotePath,
pub last_modified: SystemTime,
pub id: VersionId,
pub time_deleted: Option<SystemTime>,
}
impl From<GCSVersionListing> for VersionListing {
fn from(gcs_listing: GCSVersionListing) -> Self {
let version_listing = gcs_listing
.versions
.into_iter()
.map(
|GCSVersion {
key,
last_modified,
id,
..
}| {
Version {
key,
last_modified,
kind: VersionKind::Version(VersionId(id.0)),
}
},
)
.collect::<Vec<Version>>();
VersionListing {
versions: version_listing,
}
}
}
/// Options for downloads. The default value is a plain GET.
pub struct DownloadOpts {
/// If given, returns [`DownloadError::Unmodified`] if the object still has
@@ -481,6 +529,7 @@ pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
AwsS3(Arc<S3Bucket>),
AzureBlob(Arc<AzureBlobStorage>),
Unreliable(Other),
GCS(Arc<GCSBucket>),
}
impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
@@ -497,6 +546,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await,
Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await,
Self::GCS(s) => s.list(prefix, mode, max_keys, cancel).await,
}
}
@@ -514,6 +564,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::GCS(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
}
}
@@ -530,6 +581,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::AzureBlob(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::Unreliable(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::GCS(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
}
}
@@ -544,6 +596,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.head_object(key, cancel).await,
Self::AzureBlob(s) => s.head_object(key, cancel).await,
Self::Unreliable(s) => s.head_object(key, cancel).await,
Self::GCS(s) => s.head_object(key, cancel).await,
}
}
@@ -561,6 +614,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
Self::GCS(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
}
}
@@ -576,6 +630,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.download(from, opts, cancel).await,
Self::AzureBlob(s) => s.download(from, opts, cancel).await,
Self::Unreliable(s) => s.download(from, opts, cancel).await,
Self::GCS(s) => s.download(from, opts, cancel).await,
}
}
@@ -590,6 +645,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.delete(path, cancel).await,
Self::AzureBlob(s) => s.delete(path, cancel).await,
Self::Unreliable(s) => s.delete(path, cancel).await,
Self::GCS(s) => s.delete(path, cancel).await,
}
}
@@ -604,6 +660,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.delete_objects(paths, cancel).await,
Self::AzureBlob(s) => s.delete_objects(paths, cancel).await,
Self::Unreliable(s) => s.delete_objects(paths, cancel).await,
Self::GCS(s) => s.delete_objects(paths, cancel).await,
}
}
@@ -614,6 +671,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.max_keys_per_delete(),
Self::AzureBlob(s) => s.max_keys_per_delete(),
Self::Unreliable(s) => s.max_keys_per_delete(),
Self::GCS(s) => s.max_keys_per_delete(),
}
}
@@ -628,6 +686,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await,
Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await,
Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await,
Self::GCS(s) => s.delete_prefix(prefix, cancel).await,
}
}
@@ -643,6 +702,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.copy(from, to, cancel).await,
Self::AzureBlob(s) => s.copy(from, to, cancel).await,
Self::Unreliable(s) => s.copy(from, to, cancel).await,
Self::GCS(s) => s.copy(from, to, cancel).await,
}
}
@@ -672,6 +732,10 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
Self::GCS(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
}
}
}
@@ -687,11 +751,18 @@ impl GenericRemoteStorage {
}
pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
info!("RemoteStorageConfig: {:?}", storage_config);
let timeout = storage_config.timeout;
// If somkeone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
// If someone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
let small_timeout = std::cmp::min(storage_config.small_timeout, timeout);
info!(
"RemoteStorageConfig's storage attribute: {:?}",
storage_config.storage
);
Ok(match &storage_config.storage {
RemoteStorageKind::LocalFs { local_path: path } => {
info!("Using fs root '{path}' as a remote storage");
@@ -729,6 +800,16 @@ impl GenericRemoteStorage {
small_timeout,
)?))
}
RemoteStorageKind::GCS(gcs_config) => {
let google_application_credentials =
std::env::var("GOOGLE_APPLICATION_CREDENTIALS")
.unwrap_or_else(|_| "<none>".into());
info!(
"Using gcs bucket '{}' as a remote storage, prefix in bucket: '{:?}', GOOGLE_APPLICATION_CREDENTIALS: {google_application_credentials }",
gcs_config.bucket_name, gcs_config.prefix_in_bucket
);
Self::GCS(Arc::new(GCSBucket::new(gcs_config, timeout).await?))
}
})
}
@@ -764,6 +845,7 @@ impl GenericRemoteStorage {
Self::AwsS3(s) => Some(s.bucket_name()),
Self::AzureBlob(s) => Some(s.container_name()),
Self::Unreliable(_s) => None,
Self::GCS(s) => Some(s.bucket_name()),
}
}
}

View File

@@ -63,6 +63,7 @@ impl UnreliableWrapper {
GenericRemoteStorage::Unreliable(_s) => {
panic!("Can't wrap unreliable wrapper unreliably")
}
GenericRemoteStorage::GCS(s) => GenericRemoteStorage::GCS(s),
};
let actual_attempt_failure_probability = cmp::min(attempt_failure_probability, 100);
UnreliableWrapper {

View File

@@ -0,0 +1,255 @@
#![allow(dead_code)]
#![allow(unused)]
mod common;
use crate::common::{download_to_vec, upload_stream};
use anyhow::Context;
use camino::Utf8Path;
use futures::StreamExt;
use futures::stream::Stream;
use remote_storage::{
DownloadKind, DownloadOpts, GCSConfig, GenericRemoteStorage, ListingMode, RemotePath,
RemoteStorageConfig, RemoteStorageKind, StorageMetadata,
};
use std::collections::HashMap;
#[path = "common/tests.rs"]
use std::collections::HashSet;
use std::fmt::{Debug, Display};
use std::io::Cursor;
use std::ops::Bound;
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use test_context::{AsyncTestContext, test_context};
use tokio_util::sync::CancellationToken;
use utils::backoff;
// A minimal working GCS client I can pass around in async context
const BASE_PREFIX: &str = "test";
async fn create_gcs_client() -> anyhow::Result<Arc<GenericRemoteStorage>> {
let bucket_name = std::env::var("GCS_TEST_BUCKET").expect("GCS_TEST_BUCKET must be set");
let gcs_config = GCSConfig {
bucket_name,
prefix_in_bucket: Some("testing-path/".into()),
max_keys_per_list_response: Some(100),
concurrency_limit: std::num::NonZero::new(100).unwrap(),
};
let remote_storage_config = RemoteStorageConfig {
storage: RemoteStorageKind::GCS(gcs_config),
timeout: Duration::from_secs(120),
small_timeout: std::time::Duration::from_secs(120),
};
Ok(Arc::new(
GenericRemoteStorage::from_config(&remote_storage_config)
.await
.context("remote storage init")?,
))
}
struct EnabledGCS {
client: Arc<GenericRemoteStorage>,
base_prefix: &'static str,
}
impl EnabledGCS {
async fn setup() -> Self {
let client = create_gcs_client()
.await
.context("gcs client creation")
.expect("gcs client creation failed");
EnabledGCS {
client,
base_prefix: BASE_PREFIX,
}
}
}
impl AsyncTestContext for EnabledGCS {
async fn setup() -> Self {
Self::setup().await
}
}
#[test_context(EnabledGCS)]
#[tokio::test]
async fn gcs_test_suite(ctx: &mut EnabledGCS) -> anyhow::Result<()> {
// ------------------------------------------------
// --- `time_travel_recover`, showcasing `upload`, `delete_objects`, `copy`
// ------------------------------------------------
// Our test depends on discrepancies in the clock between S3 and the environment the tests
// run in. Therefore, wait a little bit before and after. The alternative would be
// to take the time from S3 response headers.
const WAIT_TIME: Duration = Duration::from_millis(3_000);
async fn retry<T, O, F, E>(op: O) -> Result<T, E>
where
E: Display + Debug + 'static,
O: FnMut() -> F,
F: Future<Output = Result<T, E>>,
{
let warn_threshold = 3;
let max_retries = 10;
backoff::retry(
op,
|_e| false,
warn_threshold,
max_retries,
"test retry",
&CancellationToken::new(),
)
.await
.expect("never cancelled")
}
async fn time_point() -> SystemTime {
tokio::time::sleep(WAIT_TIME).await;
let ret = SystemTime::now();
tokio::time::sleep(WAIT_TIME).await;
ret
}
async fn list_files(
client: &Arc<GenericRemoteStorage>,
cancel: &CancellationToken,
) -> anyhow::Result<HashSet<RemotePath>> {
Ok(
retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel))
.await
.context("list root files failure")?
.keys
.into_iter()
.map(|o| o.key)
.collect::<HashSet<_>>(),
)
}
let cancel = CancellationToken::new();
let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;
let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;
let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;
// ---------------- t0 ---------------
// Upload 'path1'
retry(|| {
let (data, len) = upload_stream("remote blob data1".as_bytes().into());
ctx.client.upload(data, len, &path1, None, &cancel)
})
.await?;
let t0_files = list_files(&ctx.client, &cancel).await?;
let t0 = time_point().await;
// Show 'path1'
println!("at t0: {t0_files:?}");
// Upload 'path2'
let old_data = "remote blob data2";
retry(|| {
let (data, len) = upload_stream(old_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None, &cancel)
})
.await?;
// ---------------- t1 ---------------
// Show 'path1' and 'path2'
let t1_files = list_files(&ctx.client, &cancel).await?;
let t1 = time_point().await;
println!("at t1: {t1_files:?}");
{
let opts = DownloadOpts::default();
let dl = retry(|| ctx.client.download(&path2, &opts, &cancel)).await?;
let last_modified = dl.last_modified;
let half_wt = WAIT_TIME.mul_f32(0.5);
let t0_hwt = t0 + half_wt;
let t1_hwt = t1 - half_wt;
if !(t0_hwt..=t1_hwt).contains(&last_modified) {
panic!(
"last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
This likely means a large lock discrepancy between S3 and the local clock."
);
}
}
// Upload 'path3'
retry(|| {
let (data, len) = upload_stream("remote blob data3".as_bytes().into());
ctx.client.upload(data, len, &path3, None, &cancel)
})
.await?;
// Overwrite 'path2'
let new_data = "new remote blob data2";
retry(|| {
let (data, len) = upload_stream(new_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None, &cancel)
})
.await?;
// Delete 'path1'
retry(|| ctx.client.delete(&path1, &cancel)).await?;
// Show 'path2' and `path3`
let t2_files = list_files(&ctx.client, &cancel).await?;
let t2 = time_point().await;
println!("at t2: {t2_files:?}");
// No changes after recovery to t2 (no-op)
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t2, t_final, &cancel, None)
.await?;
let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t2: {t2_files_recovered:?}");
assert_eq!(t2_files, t2_files_recovered);
let path2_recovered_t2 = download_to_vec(
ctx.client
.download(&path2, &DownloadOpts::default(), &cancel)
.await?,
)
.await?;
assert_eq!(path2_recovered_t2, new_data.as_bytes());
// after recovery to t1: path1 is back, path2 has the old content
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t1, t_final, &cancel, None)
.await?;
let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t1: {t1_files_recovered:?}");
assert_eq!(t1_files, t1_files_recovered);
let path2_recovered_t1 = download_to_vec(
ctx.client
.download(&path2, &DownloadOpts::default(), &cancel)
.await?,
)
.await?;
assert_eq!(path2_recovered_t1, old_data.as_bytes());
// after recovery to t0: everything is gone except for path1
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t0, t_final, &cancel, None)
.await?;
let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t0: {t0_files_recovered:?}");
assert_eq!(t0_files, t0_files_recovered);
// cleanup
let paths = &[path1, path2, path3];
retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
Ok(())
}

View File

@@ -10,6 +10,10 @@ default = []
# which adds some runtime cost to run tests on outage conditions
testing = ["fail/failpoints", "pageserver_api/testing", "wal_decoder/testing", "pageserver_client/testing"]
# Direct IO alignment options (propagated to pageserver_api)
io-align-512 = ["pageserver_api/io-align-512"]
io-align-4k = ["pageserver_api/io-align-4k"]
fuzz-read-path = ["testing"]
# Enables benchmarking only APIs

View File

@@ -353,6 +353,10 @@ fn start_pageserver(
launch_ts.to_string(),
BUILD_TAG,
);
info!(
"IO buffer alignment: {} bytes",
pageserver_api::config::defaults::DEFAULT_IO_BUFFER_ALIGNMENT
);
set_build_info_metric(GIT_VERSION, BUILD_TAG);
set_launch_timestamp_metric(launch_ts);
#[cfg(target_os = "linux")]

View File

@@ -110,6 +110,12 @@ impl FeatureResolver {
PostHogFlagFilterPropertyValue::String("local".to_string()),
);
}
RemoteStorageKind::GCS { .. } => {
properties.insert(
"region".to_string(),
PostHogFlagFilterPropertyValue::String("local".to_string()),
);
}
}
}
// TODO: move this to a background task so that we don't block startup in case of slow disk

View File

@@ -158,6 +158,7 @@ pub(super) async fn upload_timeline_layer<'a>(
GenericRemoteStorage::LocalFs(_) => {}
GenericRemoteStorage::AwsS3(_) => {}
GenericRemoteStorage::Unreliable(_) => {}
GenericRemoteStorage::GCS(_) => {}
};
/* END_HADRON */
let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);

View File

@@ -73,7 +73,7 @@ pub trait Buffer: std::ops::Deref<Target = [u8]> {
}
/// The minimum alignment and size requirement for disk offsets and memory buffer size for direct IO.
const DIO_CHUNK_SIZE: usize = 512;
const DIO_CHUNK_SIZE: usize = crate::virtual_file::get_io_buffer_alignment();
/// If multiple chunks need to be read, merge adjacent chunk reads into batches of max size `MAX_CHUNK_BATCH_SIZE`.
/// (The unit is the number of chunks.)

View File

@@ -852,7 +852,7 @@ impl VirtualFileInner {
// Because the alloctor might return _more_ aligned addresses than requested,
// there is a chance that testing would not catch violations of a runtime requirement stricter than 512.
{
let requirement = 512;
let requirement = get_io_buffer_alignment();
let remainder = addr % requirement;
assert!(
remainder == 0,
@@ -866,7 +866,7 @@ impl VirtualFileInner {
// So enforce just that and not anything more restrictive.
// Even the shallowest testing will expose more restrictive requirements if those ever arise.
{
let requirement = 512;
let requirement = get_io_buffer_alignment() as u64;
let remainder = offset % requirement;
assert!(
remainder == 0,
@@ -879,7 +879,7 @@ impl VirtualFileInner {
// The requirement in Linux 6.1 is bdev_logical_block_size().
// On our production systems, that is 512.
{
let requirement = 512;
let requirement = get_io_buffer_alignment();
let remainder = size % requirement;
assert!(
remainder == 0,

View File

@@ -266,6 +266,7 @@ impl BucketConfig {
"container {}, storage account {:?}, region {}",
config.container_name, config.storage_account, config.container_region
),
RemoteStorageKind::GCS(config) => format!("bucket {}", config.bucket_name),
}
}
pub fn bucket_name(&self) -> Option<&str> {
@@ -381,6 +382,9 @@ async fn init_remote(
config.prefix_in_container.get_or_insert(default_prefix);
}
RemoteStorageKind::LocalFs { .. } => (),
RemoteStorageKind::GCS(config) => {
config.prefix_in_bucket.get_or_insert(default_prefix);
}
}
// We already pass the prefix to the remote client above