Initial implementation of GCS provider. (#11666)

## Problem
We are currently using GCS through the AWS API instead of directly to
the GCS API.

## Summary of changes
Draft implementation of a GCS provider. We run Neon on GCS with the AWS
provider via [this
patch](https://github.com/neondatabase/neon/pull/10277), but want to use
GCS API directly. This implementation attempts to do so without adding a
GCS library dependency or new SDK, except for `gcp_auth`.
This commit is contained in:
John G. Crowley
2025-09-16 03:18:25 -05:00
committed by GitHub
parent 77e22e4bf0
commit 85ce109361
10 changed files with 1938 additions and 9 deletions

98
Cargo.lock generated
View File

@@ -145,9 +145,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.94"
version = "1.0.98"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7"
checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487"
dependencies = [
"backtrace",
]
@@ -2402,9 +2402,9 @@ dependencies = [
[[package]]
name = "futures"
version = "0.3.28"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876"
dependencies = [
"futures-channel",
"futures-core",
@@ -2433,9 +2433,9 @@ checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e"
[[package]]
name = "futures-executor"
version = "0.3.28"
version = "0.3.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f"
dependencies = [
"futures-core",
"futures-task",
@@ -2510,6 +2510,33 @@ dependencies = [
"slab",
]
[[package]]
name = "gcp_auth"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf67f30198e045a039264c01fb44659ce82402d7771c50938beb41a5ac87733"
dependencies = [
"async-trait",
"base64 0.22.1",
"bytes",
"chrono",
"home",
"http 1.3.1",
"http-body-util",
"hyper 1.4.1",
"hyper-rustls 0.27.5",
"hyper-util",
"ring",
"rustls-pemfile 2.1.1",
"serde",
"serde_json",
"thiserror 1.0.69",
"tokio",
"tracing",
"tracing-futures",
"url",
]
[[package]]
name = "gen_ops"
version = "0.4.0"
@@ -2840,6 +2867,15 @@ dependencies = [
"digest",
]
[[package]]
name = "home"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf"
dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "hostname"
version = "0.4.0"
@@ -3075,6 +3111,24 @@ dependencies = [
"tower-service",
]
[[package]]
name = "hyper-rustls"
version = "0.27.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2"
dependencies = [
"futures-util",
"http 1.3.1",
"hyper 1.4.1",
"hyper-util",
"rustls 0.23.29",
"rustls-native-certs 0.8.0",
"rustls-pki-types",
"tokio",
"tokio-rustls 0.26.2",
"tower-service",
]
[[package]]
name = "hyper-timeout"
version = "0.5.1"
@@ -3852,6 +3906,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@@ -5872,8 +5936,11 @@ dependencies = [
"bytes",
"camino",
"camino-tempfile",
"chrono",
"futures",
"futures-util",
"gcp_auth",
"http 1.3.1",
"http-body-util",
"http-types",
"humantime-serde",
@@ -5894,7 +5961,9 @@ dependencies = [
"tokio-util",
"toml_edit",
"tracing",
"url",
"utils",
"uuid",
]
[[package]]
@@ -5924,6 +5993,7 @@ dependencies = [
"js-sys",
"log",
"mime",
"mime_guess",
"once_cell",
"percent-encoding",
"pin-project-lite",
@@ -8033,6 +8103,16 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
@@ -8208,6 +8288,12 @@ dependencies = [
"libc",
]
[[package]]
name = "unicase"
version = "2.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539"
[[package]]
name = "unicode-bidi"
version = "0.3.17"

View File

@@ -19,7 +19,8 @@ camino = { workspace = true, features = ["serde1"] }
humantime-serde.workspace = true
hyper = { workspace = true, features = ["client"] }
futures.workspace = true
reqwest.workspace = true
reqwest = { workspace = true, features = ["multipart", "stream"] }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
serde.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["sync", "fs", "io-util"] }
@@ -41,6 +42,10 @@ http-types.workspace = true
http-body-util.workspace = true
itertools.workspace = true
sync_wrapper = { workspace = true, features = ["futures"] }
gcp_auth = "0.12.3"
url.workspace = true
http.workspace = true
uuid.workspace = true
byteorder = "1.4"
rand.workspace = true

View File

@@ -41,6 +41,7 @@ impl RemoteStorageKind {
RemoteStorageKind::LocalFs { .. } => None,
RemoteStorageKind::AwsS3(config) => Some(&config.bucket_name),
RemoteStorageKind::AzureContainer(config) => Some(&config.container_name),
RemoteStorageKind::GCS(config) => Some(&config.bucket_name),
}
}
}
@@ -51,6 +52,7 @@ impl RemoteStorageConfig {
match &self.storage {
RemoteStorageKind::LocalFs { .. } => DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT,
RemoteStorageKind::AwsS3(c) => c.concurrency_limit.into(),
RemoteStorageKind::GCS(c) => c.concurrency_limit.into(),
RemoteStorageKind::AzureContainer(c) => c.concurrency_limit.into(),
}
}
@@ -85,6 +87,9 @@ pub enum RemoteStorageKind {
/// Azure Blob based storage, storing all files in the container
/// specified by the config
AzureContainer(AzureConfig),
/// Google Cloud based storage, storing all files in the GCS bucket
/// specified by the config
GCS(GCSConfig),
}
#[derive(Deserialize)]
@@ -176,6 +181,32 @@ impl Debug for S3Config {
}
}
#[derive(Clone, PartialEq, Eq, Deserialize, Serialize)]
pub struct GCSConfig {
/// Name of the bucket to connect to.
pub bucket_name: String,
/// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once.
pub prefix_in_bucket: Option<String>,
#[serde(default = "default_remote_storage_s3_concurrency_limit")]
pub concurrency_limit: NonZeroUsize,
#[serde(default = "default_max_keys_per_list_response")]
pub max_keys_per_list_response: Option<i32>,
}
impl Debug for GCSConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GCSConfig")
.field("bucket_name", &self.bucket_name)
.field("prefix_in_bucket", &self.prefix_in_bucket)
.field("concurrency_limit", &self.concurrency_limit)
.field(
"max_keys_per_list_response",
&self.max_keys_per_list_response,
)
.finish()
}
}
/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write).
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AzureConfig {
@@ -304,6 +335,30 @@ timeout = '5s'";
);
}
#[test]
fn test_gcs_parsing() {
let toml = "\
bucket_name = 'foo-bar'
prefix_in_bucket = '/pageserver'
";
let config = parse(toml).unwrap();
assert_eq!(
config,
RemoteStorageConfig {
storage: RemoteStorageKind::GCS(GCSConfig {
bucket_name: "foo-bar".into(),
prefix_in_bucket: Some("pageserver/".into()),
max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE,
concurrency_limit: std::num::NonZero::new(100).unwrap(),
}),
timeout: Duration::from_secs(120),
small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT
}
);
}
#[test]
fn test_s3_parsing() {
let toml = "\

File diff suppressed because it is too large Load Diff

View File

@@ -12,6 +12,7 @@
mod azure_blob;
mod config;
mod error;
mod gcs_bucket;
mod local_fs;
mod metrics;
mod s3_bucket;
@@ -43,10 +44,11 @@ use tokio_util::sync::CancellationToken;
use tracing::info;
pub use self::azure_blob::AzureBlobStorage;
pub use self::gcs_bucket::GCSBucket;
pub use self::local_fs::LocalFs;
pub use self::s3_bucket::S3Bucket;
pub use self::simulate_failures::UnreliableWrapper;
pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
pub use crate::config::{AzureConfig, GCSConfig, RemoteStorageConfig, RemoteStorageKind, S3Config};
/// Default concurrency limit for S3 operations
///
@@ -81,8 +83,12 @@ pub const MAX_KEYS_PER_DELETE_S3: usize = 1000;
/// <https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch>
pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256;
pub const MAX_KEYS_PER_DELETE_GCS: usize = 1000;
const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/';
const GCS_SCOPES: &[&str] = &["https://www.googleapis.com/auth/cloud-platform"];
/// Path on the remote storage, relative to some inner prefix.
/// The prefix is an implementation detail, that allows representing local paths
/// as the remote ones, stripping the local storage prefix away.
@@ -182,6 +188,7 @@ pub struct VersionListing {
pub versions: Vec<Version>,
}
#[derive(Debug)]
pub struct Version {
pub key: RemotePath,
pub last_modified: SystemTime,
@@ -203,6 +210,47 @@ pub enum VersionKind {
Version(VersionId),
}
// I was going to do an `enum GenericVersion` but this feels cleaner.
#[derive(Default)]
pub struct GCSVersionListing {
pub versions: Vec<GCSVersion>,
}
#[derive(Debug)]
pub struct GCSVersion {
pub key: RemotePath,
pub last_modified: SystemTime,
pub id: VersionId,
pub time_deleted: Option<SystemTime>,
}
impl From<GCSVersionListing> for VersionListing {
fn from(gcs_listing: GCSVersionListing) -> Self {
let version_listing = gcs_listing
.versions
.into_iter()
.map(
|GCSVersion {
key,
last_modified,
id,
..
}| {
Version {
key,
last_modified,
kind: VersionKind::Version(VersionId(id.0)),
}
},
)
.collect::<Vec<Version>>();
VersionListing {
versions: version_listing,
}
}
}
/// Options for downloads. The default value is a plain GET.
pub struct DownloadOpts {
/// If given, returns [`DownloadError::Unmodified`] if the object still has
@@ -481,6 +529,7 @@ pub enum GenericRemoteStorage<Other: Clone = Arc<UnreliableWrapper>> {
AwsS3(Arc<S3Bucket>),
AzureBlob(Arc<AzureBlobStorage>),
Unreliable(Other),
GCS(Arc<GCSBucket>),
}
impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
@@ -497,6 +546,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await,
Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await,
Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await,
Self::GCS(s) => s.list(prefix, mode, max_keys, cancel).await,
}
}
@@ -514,6 +564,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
Self::GCS(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)),
}
}
@@ -530,6 +581,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::AzureBlob(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::Unreliable(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
Self::GCS(s) => s.list_versions(prefix, mode, max_keys, cancel).await,
}
}
@@ -544,6 +596,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.head_object(key, cancel).await,
Self::AzureBlob(s) => s.head_object(key, cancel).await,
Self::Unreliable(s) => s.head_object(key, cancel).await,
Self::GCS(s) => s.head_object(key, cancel).await,
}
}
@@ -561,6 +614,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
Self::GCS(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await,
}
}
@@ -576,6 +630,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.download(from, opts, cancel).await,
Self::AzureBlob(s) => s.download(from, opts, cancel).await,
Self::Unreliable(s) => s.download(from, opts, cancel).await,
Self::GCS(s) => s.download(from, opts, cancel).await,
}
}
@@ -590,6 +645,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.delete(path, cancel).await,
Self::AzureBlob(s) => s.delete(path, cancel).await,
Self::Unreliable(s) => s.delete(path, cancel).await,
Self::GCS(s) => s.delete(path, cancel).await,
}
}
@@ -604,6 +660,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.delete_objects(paths, cancel).await,
Self::AzureBlob(s) => s.delete_objects(paths, cancel).await,
Self::Unreliable(s) => s.delete_objects(paths, cancel).await,
Self::GCS(s) => s.delete_objects(paths, cancel).await,
}
}
@@ -614,6 +671,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.max_keys_per_delete(),
Self::AzureBlob(s) => s.max_keys_per_delete(),
Self::Unreliable(s) => s.max_keys_per_delete(),
Self::GCS(s) => s.max_keys_per_delete(),
}
}
@@ -628,6 +686,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await,
Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await,
Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await,
Self::GCS(s) => s.delete_prefix(prefix, cancel).await,
}
}
@@ -643,6 +702,7 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
Self::AwsS3(s) => s.copy(from, to, cancel).await,
Self::AzureBlob(s) => s.copy(from, to, cancel).await,
Self::Unreliable(s) => s.copy(from, to, cancel).await,
Self::GCS(s) => s.copy(from, to, cancel).await,
}
}
@@ -672,6 +732,10 @@ impl<Other: RemoteStorage> GenericRemoteStorage<Arc<Other>> {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
Self::GCS(s) => {
s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit)
.await
}
}
}
}
@@ -687,11 +751,18 @@ impl GenericRemoteStorage {
}
pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result<Self> {
info!("RemoteStorageConfig: {:?}", storage_config);
let timeout = storage_config.timeout;
// If somkeone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
// If someone overrides timeout to be small without adjusting small_timeout, then adjust it automatically
let small_timeout = std::cmp::min(storage_config.small_timeout, timeout);
info!(
"RemoteStorageConfig's storage attribute: {:?}",
storage_config.storage
);
Ok(match &storage_config.storage {
RemoteStorageKind::LocalFs { local_path: path } => {
info!("Using fs root '{path}' as a remote storage");
@@ -729,6 +800,16 @@ impl GenericRemoteStorage {
small_timeout,
)?))
}
RemoteStorageKind::GCS(gcs_config) => {
let google_application_credentials =
std::env::var("GOOGLE_APPLICATION_CREDENTIALS")
.unwrap_or_else(|_| "<none>".into());
info!(
"Using gcs bucket '{}' as a remote storage, prefix in bucket: '{:?}', GOOGLE_APPLICATION_CREDENTIALS: {google_application_credentials }",
gcs_config.bucket_name, gcs_config.prefix_in_bucket
);
Self::GCS(Arc::new(GCSBucket::new(gcs_config, timeout).await?))
}
})
}
@@ -764,6 +845,7 @@ impl GenericRemoteStorage {
Self::AwsS3(s) => Some(s.bucket_name()),
Self::AzureBlob(s) => Some(s.container_name()),
Self::Unreliable(_s) => None,
Self::GCS(s) => Some(s.bucket_name()),
}
}
}

View File

@@ -63,6 +63,7 @@ impl UnreliableWrapper {
GenericRemoteStorage::Unreliable(_s) => {
panic!("Can't wrap unreliable wrapper unreliably")
}
GenericRemoteStorage::GCS(s) => GenericRemoteStorage::GCS(s),
};
let actual_attempt_failure_probability = cmp::min(attempt_failure_probability, 100);
UnreliableWrapper {

View File

@@ -0,0 +1,255 @@
#![allow(dead_code)]
#![allow(unused)]
mod common;
use crate::common::{download_to_vec, upload_stream};
use anyhow::Context;
use camino::Utf8Path;
use futures::StreamExt;
use futures::stream::Stream;
use remote_storage::{
DownloadKind, DownloadOpts, GCSConfig, GenericRemoteStorage, ListingMode, RemotePath,
RemoteStorageConfig, RemoteStorageKind, StorageMetadata,
};
use std::collections::HashMap;
#[path = "common/tests.rs"]
use std::collections::HashSet;
use std::fmt::{Debug, Display};
use std::io::Cursor;
use std::ops::Bound;
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;
use std::time::SystemTime;
use test_context::{AsyncTestContext, test_context};
use tokio_util::sync::CancellationToken;
use utils::backoff;
// A minimal working GCS client I can pass around in async context
const BASE_PREFIX: &str = "test";
async fn create_gcs_client() -> anyhow::Result<Arc<GenericRemoteStorage>> {
let bucket_name = std::env::var("GCS_TEST_BUCKET").expect("GCS_TEST_BUCKET must be set");
let gcs_config = GCSConfig {
bucket_name,
prefix_in_bucket: Some("testing-path/".into()),
max_keys_per_list_response: Some(100),
concurrency_limit: std::num::NonZero::new(100).unwrap(),
};
let remote_storage_config = RemoteStorageConfig {
storage: RemoteStorageKind::GCS(gcs_config),
timeout: Duration::from_secs(120),
small_timeout: std::time::Duration::from_secs(120),
};
Ok(Arc::new(
GenericRemoteStorage::from_config(&remote_storage_config)
.await
.context("remote storage init")?,
))
}
struct EnabledGCS {
client: Arc<GenericRemoteStorage>,
base_prefix: &'static str,
}
impl EnabledGCS {
async fn setup() -> Self {
let client = create_gcs_client()
.await
.context("gcs client creation")
.expect("gcs client creation failed");
EnabledGCS {
client,
base_prefix: BASE_PREFIX,
}
}
}
impl AsyncTestContext for EnabledGCS {
async fn setup() -> Self {
Self::setup().await
}
}
#[test_context(EnabledGCS)]
#[tokio::test]
async fn gcs_test_suite(ctx: &mut EnabledGCS) -> anyhow::Result<()> {
// ------------------------------------------------
// --- `time_travel_recover`, showcasing `upload`, `delete_objects`, `copy`
// ------------------------------------------------
// Our test depends on discrepancies in the clock between S3 and the environment the tests
// run in. Therefore, wait a little bit before and after. The alternative would be
// to take the time from S3 response headers.
const WAIT_TIME: Duration = Duration::from_millis(3_000);
async fn retry<T, O, F, E>(op: O) -> Result<T, E>
where
E: Display + Debug + 'static,
O: FnMut() -> F,
F: Future<Output = Result<T, E>>,
{
let warn_threshold = 3;
let max_retries = 10;
backoff::retry(
op,
|_e| false,
warn_threshold,
max_retries,
"test retry",
&CancellationToken::new(),
)
.await
.expect("never cancelled")
}
async fn time_point() -> SystemTime {
tokio::time::sleep(WAIT_TIME).await;
let ret = SystemTime::now();
tokio::time::sleep(WAIT_TIME).await;
ret
}
async fn list_files(
client: &Arc<GenericRemoteStorage>,
cancel: &CancellationToken,
) -> anyhow::Result<HashSet<RemotePath>> {
Ok(
retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel))
.await
.context("list root files failure")?
.keys
.into_iter()
.map(|o| o.key)
.collect::<HashSet<_>>(),
)
}
let cancel = CancellationToken::new();
let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;
let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;
let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str()))
.with_context(|| "RemotePath conversion")?;
// ---------------- t0 ---------------
// Upload 'path1'
retry(|| {
let (data, len) = upload_stream("remote blob data1".as_bytes().into());
ctx.client.upload(data, len, &path1, None, &cancel)
})
.await?;
let t0_files = list_files(&ctx.client, &cancel).await?;
let t0 = time_point().await;
// Show 'path1'
println!("at t0: {t0_files:?}");
// Upload 'path2'
let old_data = "remote blob data2";
retry(|| {
let (data, len) = upload_stream(old_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None, &cancel)
})
.await?;
// ---------------- t1 ---------------
// Show 'path1' and 'path2'
let t1_files = list_files(&ctx.client, &cancel).await?;
let t1 = time_point().await;
println!("at t1: {t1_files:?}");
{
let opts = DownloadOpts::default();
let dl = retry(|| ctx.client.download(&path2, &opts, &cancel)).await?;
let last_modified = dl.last_modified;
let half_wt = WAIT_TIME.mul_f32(0.5);
let t0_hwt = t0 + half_wt;
let t1_hwt = t1 - half_wt;
if !(t0_hwt..=t1_hwt).contains(&last_modified) {
panic!(
"last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \
This likely means a large lock discrepancy between S3 and the local clock."
);
}
}
// Upload 'path3'
retry(|| {
let (data, len) = upload_stream("remote blob data3".as_bytes().into());
ctx.client.upload(data, len, &path3, None, &cancel)
})
.await?;
// Overwrite 'path2'
let new_data = "new remote blob data2";
retry(|| {
let (data, len) = upload_stream(new_data.as_bytes().into());
ctx.client.upload(data, len, &path2, None, &cancel)
})
.await?;
// Delete 'path1'
retry(|| ctx.client.delete(&path1, &cancel)).await?;
// Show 'path2' and `path3`
let t2_files = list_files(&ctx.client, &cancel).await?;
let t2 = time_point().await;
println!("at t2: {t2_files:?}");
// No changes after recovery to t2 (no-op)
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t2, t_final, &cancel, None)
.await?;
let t2_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t2: {t2_files_recovered:?}");
assert_eq!(t2_files, t2_files_recovered);
let path2_recovered_t2 = download_to_vec(
ctx.client
.download(&path2, &DownloadOpts::default(), &cancel)
.await?,
)
.await?;
assert_eq!(path2_recovered_t2, new_data.as_bytes());
// after recovery to t1: path1 is back, path2 has the old content
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t1, t_final, &cancel, None)
.await?;
let t1_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t1: {t1_files_recovered:?}");
assert_eq!(t1_files, t1_files_recovered);
let path2_recovered_t1 = download_to_vec(
ctx.client
.download(&path2, &DownloadOpts::default(), &cancel)
.await?,
)
.await?;
assert_eq!(path2_recovered_t1, old_data.as_bytes());
// after recovery to t0: everything is gone except for path1
let t_final = time_point().await;
ctx.client
.time_travel_recover(None, t0, t_final, &cancel, None)
.await?;
let t0_files_recovered = list_files(&ctx.client, &cancel).await?;
println!("after recovery to t0: {t0_files_recovered:?}");
assert_eq!(t0_files, t0_files_recovered);
// cleanup
let paths = &[path1, path2, path3];
retry(|| ctx.client.delete_objects(paths, &cancel)).await?;
Ok(())
}

View File

@@ -110,6 +110,12 @@ impl FeatureResolver {
PostHogFlagFilterPropertyValue::String("local".to_string()),
);
}
RemoteStorageKind::GCS { .. } => {
properties.insert(
"region".to_string(),
PostHogFlagFilterPropertyValue::String("local".to_string()),
);
}
}
}
// TODO: move this to a background task so that we don't block startup in case of slow disk

View File

@@ -158,6 +158,7 @@ pub(super) async fn upload_timeline_layer<'a>(
GenericRemoteStorage::LocalFs(_) => {}
GenericRemoteStorage::AwsS3(_) => {}
GenericRemoteStorage::Unreliable(_) => {}
GenericRemoteStorage::GCS(_) => {}
};
/* END_HADRON */
let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE);

View File

@@ -266,6 +266,7 @@ impl BucketConfig {
"container {}, storage account {:?}, region {}",
config.container_name, config.storage_account, config.container_region
),
RemoteStorageKind::GCS(config) => format!("bucket {}", config.bucket_name),
}
}
pub fn bucket_name(&self) -> Option<&str> {
@@ -381,6 +382,9 @@ async fn init_remote(
config.prefix_in_container.get_or_insert(default_prefix);
}
RemoteStorageKind::LocalFs { .. } => (),
RemoteStorageKind::GCS(config) => {
config.prefix_in_bucket.get_or_insert(default_prefix);
}
}
// We already pass the prefix to the remote client above