From 85ce109361be068ff890f9fece786a81f0724136 Mon Sep 17 00:00:00 2001 From: "John G. Crowley" <53502854+johngcrowley@users.noreply.github.com> Date: Tue, 16 Sep 2025 03:18:25 -0500 Subject: [PATCH] Initial implementation of GCS provider. (#11666) ## Problem We are currently using GCS through the AWS API instead of directly to the GCS API. ## Summary of changes Draft implementation of a GCS provider. We run Neon on GCS with the AWS provider via [this patch](https://github.com/neondatabase/neon/pull/10277), but want to use GCS API directly. This implementation attempts to do so without adding a GCS library dependency or new SDK, except for `gcp_auth`. --- Cargo.lock | 98 +- libs/remote_storage/Cargo.toml | 7 +- libs/remote_storage/src/config.rs | 55 + libs/remote_storage/src/gcs_bucket.rs | 1434 +++++++++++++++++ libs/remote_storage/src/lib.rs | 86 +- libs/remote_storage/src/simulate_failures.rs | 1 + libs/remote_storage/tests/test_real_gcs.rs | 255 +++ pageserver/src/feature_resolver.rs | 6 + .../tenant/remote_timeline_client/upload.rs | 1 + storage_scrubber/src/lib.rs | 4 + 10 files changed, 1938 insertions(+), 9 deletions(-) create mode 100644 libs/remote_storage/src/gcs_bucket.rs create mode 100644 libs/remote_storage/tests/test_real_gcs.rs diff --git a/Cargo.lock b/Cargo.lock index 9a0cc9076a..5ea4912c43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -145,9 +145,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.94" +version = "1.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1fd03a028ef38ba2276dce7e33fcd6369c158a1bca17946c4b1b701891c1ff7" +checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" dependencies = [ "backtrace", ] @@ -2402,9 +2402,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.28" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -2433,9 +2433,9 @@ checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.28" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -2510,6 +2510,33 @@ dependencies = [ "slab", ] +[[package]] +name = "gcp_auth" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf67f30198e045a039264c01fb44659ce82402d7771c50938beb41a5ac87733" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "chrono", + "home", + "http 1.3.1", + "http-body-util", + "hyper 1.4.1", + "hyper-rustls 0.27.5", + "hyper-util", + "ring", + "rustls-pemfile 2.1.1", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tracing", + "tracing-futures", + "url", +] + [[package]] name = "gen_ops" version = "0.4.0" @@ -2840,6 +2867,15 @@ dependencies = [ "digest", ] +[[package]] +name = "home" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf" +dependencies = [ + "windows-sys 0.59.0", +] + [[package]] name = "hostname" version = "0.4.0" @@ -3075,6 +3111,24 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-rustls" +version = "0.27.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d191583f3da1305256f22463b9bb0471acad48a4e534a5218b9963e9c1f59b2" +dependencies = [ + "futures-util", + "http 1.3.1", + "hyper 1.4.1", + "hyper-util", + "rustls 0.23.29", + "rustls-native-certs 0.8.0", + "rustls-pki-types", + "tokio", + "tokio-rustls 0.26.2", + "tower-service", +] + [[package]] name = "hyper-timeout" version = "0.5.1" @@ -3852,6 +3906,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -5872,8 +5936,11 @@ dependencies = [ "bytes", "camino", "camino-tempfile", + "chrono", "futures", "futures-util", + "gcp_auth", + "http 1.3.1", "http-body-util", "http-types", "humantime-serde", @@ -5894,7 +5961,9 @@ dependencies = [ "tokio-util", "toml_edit", "tracing", + "url", "utils", + "uuid", ] [[package]] @@ -5924,6 +5993,7 @@ dependencies = [ "js-sys", "log", "mime", + "mime_guess", "once_cell", "percent-encoding", "pin-project-lite", @@ -8033,6 +8103,16 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "tracing-futures" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" +dependencies = [ + "pin-project", + "tracing", +] + [[package]] name = "tracing-log" version = "0.2.0" @@ -8208,6 +8288,12 @@ dependencies = [ "libc", ] +[[package]] +name = "unicase" +version = "2.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" + [[package]] name = "unicode-bidi" version = "0.3.17" diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index ea06725cfd..bc70904958 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -19,7 +19,8 @@ camino = { workspace = true, features = ["serde1"] } humantime-serde.workspace = true hyper = { workspace = true, features = ["client"] } futures.workspace = true -reqwest.workspace = true +reqwest = { workspace = true, features = ["multipart", "stream"] } +chrono = { version = "0.4", default-features = false, features = ["clock"] } serde.workspace = true serde_json.workspace = true tokio = { workspace = true, features = ["sync", "fs", "io-util"] } @@ -41,6 +42,10 @@ http-types.workspace = true http-body-util.workspace = true itertools.workspace = true sync_wrapper = { workspace = true, features = ["futures"] } +gcp_auth = "0.12.3" +url.workspace = true +http.workspace = true +uuid.workspace = true byteorder = "1.4" rand.workspace = true diff --git a/libs/remote_storage/src/config.rs b/libs/remote_storage/src/config.rs index e13e17d544..8c8d5235b8 100644 --- a/libs/remote_storage/src/config.rs +++ b/libs/remote_storage/src/config.rs @@ -41,6 +41,7 @@ impl RemoteStorageKind { RemoteStorageKind::LocalFs { .. } => None, RemoteStorageKind::AwsS3(config) => Some(&config.bucket_name), RemoteStorageKind::AzureContainer(config) => Some(&config.container_name), + RemoteStorageKind::GCS(config) => Some(&config.bucket_name), } } } @@ -51,6 +52,7 @@ impl RemoteStorageConfig { match &self.storage { RemoteStorageKind::LocalFs { .. } => DEFAULT_REMOTE_STORAGE_LOCALFS_CONCURRENCY_LIMIT, RemoteStorageKind::AwsS3(c) => c.concurrency_limit.into(), + RemoteStorageKind::GCS(c) => c.concurrency_limit.into(), RemoteStorageKind::AzureContainer(c) => c.concurrency_limit.into(), } } @@ -85,6 +87,9 @@ pub enum RemoteStorageKind { /// Azure Blob based storage, storing all files in the container /// specified by the config AzureContainer(AzureConfig), + /// Google Cloud based storage, storing all files in the GCS bucket + /// specified by the config + GCS(GCSConfig), } #[derive(Deserialize)] @@ -176,6 +181,32 @@ impl Debug for S3Config { } } +#[derive(Clone, PartialEq, Eq, Deserialize, Serialize)] +pub struct GCSConfig { + /// Name of the bucket to connect to. + pub bucket_name: String, + /// A "subfolder" in the bucket, to use the same bucket separately by multiple remote storage users at once. + pub prefix_in_bucket: Option, + #[serde(default = "default_remote_storage_s3_concurrency_limit")] + pub concurrency_limit: NonZeroUsize, + #[serde(default = "default_max_keys_per_list_response")] + pub max_keys_per_list_response: Option, +} + +impl Debug for GCSConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GCSConfig") + .field("bucket_name", &self.bucket_name) + .field("prefix_in_bucket", &self.prefix_in_bucket) + .field("concurrency_limit", &self.concurrency_limit) + .field( + "max_keys_per_list_response", + &self.max_keys_per_list_response, + ) + .finish() + } +} + /// Azure bucket coordinates and access credentials to manage the bucket contents (read and write). #[derive(Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct AzureConfig { @@ -304,6 +335,30 @@ timeout = '5s'"; ); } + #[test] + fn test_gcs_parsing() { + let toml = "\ + bucket_name = 'foo-bar' + prefix_in_bucket = '/pageserver' + "; + + let config = parse(toml).unwrap(); + + assert_eq!( + config, + RemoteStorageConfig { + storage: RemoteStorageKind::GCS(GCSConfig { + bucket_name: "foo-bar".into(), + prefix_in_bucket: Some("pageserver/".into()), + max_keys_per_list_response: DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, + concurrency_limit: std::num::NonZero::new(100).unwrap(), + }), + timeout: Duration::from_secs(120), + small_timeout: RemoteStorageConfig::DEFAULT_SMALL_TIMEOUT + } + ); + } + #[test] fn test_s3_parsing() { let toml = "\ diff --git a/libs/remote_storage/src/gcs_bucket.rs b/libs/remote_storage/src/gcs_bucket.rs new file mode 100644 index 0000000000..f716a6b78b --- /dev/null +++ b/libs/remote_storage/src/gcs_bucket.rs @@ -0,0 +1,1434 @@ +use crate::config::GCSConfig; +use crate::error::Cancelled; +pub(super) use crate::metrics::RequestKind; +use crate::metrics::{AttemptOutcome, start_counting_cancelled_wait, start_measuring_requests}; +use crate::{ + ConcurrencyLimiter, Download, DownloadError, DownloadOpts, GCS_SCOPES, Listing, ListingMode, + ListingObject, MAX_KEYS_PER_DELETE_GCS, REMOTE_STORAGE_PREFIX_SEPARATOR, RemotePath, + RemoteStorage, StorageMetadata, TimeTravelError, TimeoutOrCancel, GCSVersion, VersionId, + GCSVersionListing, +}; +use anyhow::Context; +use azure_core::Etag; +use bytes::Bytes; +use bytes::BytesMut; +use chrono::DateTime; +use futures::stream::Stream; +use futures::stream::TryStreamExt; +use futures_util::StreamExt; +use gcp_auth::{Token, TokenProvider}; +use http::Method; +use http::StatusCode; +use reqwest::{Client, header}; +use scopeguard::ScopeGuard; +use serde::{Deserialize, Deserializer, Serialize, de}; +use std::collections::HashMap; +use std::fmt::Debug; +use std::num::{NonZeroU32, ParseIntError}; +use std::pin::{Pin, pin}; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; +use std::time::SystemTime; +use tokio_util::codec::{BytesCodec, FramedRead}; +use tokio_util::sync::CancellationToken; +use tracing; +use url::{ParseError, Url}; +use utils::backoff; +use uuid::Uuid; + +// --------- +fn to_system_time(timestamp: Option) -> Option { + timestamp + .and_then(|s| DateTime::parse_from_rfc3339(&s).ok()) + .map(|s| s.into()) +} + +// --------- +pub struct GCSBucket { + token_provider: Arc, + bucket_name: String, + prefix_in_bucket: Option, + max_keys_per_list_response: Option, + concurrency_limiter: ConcurrencyLimiter, + pub timeout: Duration, +} + +struct GetObjectRequest { + bucket: String, + key: String, + etag: Option, + range: Option, +} + +// --------- + +impl GCSBucket { + pub async fn new(remote_storage_config: &GCSConfig, timeout: Duration) -> anyhow::Result { + tracing::debug!( + "creating remote storage for gcs bucket {}", + remote_storage_config.bucket_name + ); + + // clean up 'prefix_in_bucket' if user provides '/pageserver' or 'pageserver/' + let prefix_in_bucket = remote_storage_config + .prefix_in_bucket + .as_deref() + .map(|prefix| { + let mut prefix = prefix; + while prefix.starts_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { + prefix = &prefix[1..]; + } + + let mut prefix = prefix.to_string(); + if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { + prefix.pop(); + } + + prefix + }); + + // get GOOGLE_APPLICATION_CREDENTIALS + let provider = gcp_auth::provider().await?; + + Ok(GCSBucket { + token_provider: Arc::clone(&provider), + bucket_name: remote_storage_config.bucket_name.clone(), + prefix_in_bucket, + timeout, + max_keys_per_list_response: remote_storage_config.max_keys_per_list_response, + concurrency_limiter: ConcurrencyLimiter::new( + remote_storage_config.concurrency_limit.get(), + ), + }) + } + + // convert `RemotePath` -> `String` + pub fn relative_path_to_gcs_object(&self, path: &RemotePath) -> String { + let path_string = path.get_path().as_str(); + match &self.prefix_in_bucket { + Some(prefix) => prefix.clone() + "/" + path_string, + None => path_string.to_string(), + } + } + + // convert `String` -> `RemotePath` + pub fn gcs_object_to_relative_path(&self, key: &str) -> RemotePath { + let relative_path = + match key.strip_prefix(self.prefix_in_bucket.as_deref().unwrap_or_default()) { + Some(stripped) => stripped, + // we rely on GCS to return properly prefixed paths + // for requests with a certain prefix + None => panic!( + "Key {} does not start with bucket prefix {:?}", + key, self.prefix_in_bucket + ), + }; + RemotePath( + relative_path + .split(REMOTE_STORAGE_PREFIX_SEPARATOR) + .collect(), + ) + } + + pub fn bucket_name(&self) -> &str { + &self.bucket_name + } + + fn max_keys_per_delete(&self) -> usize { + MAX_KEYS_PER_DELETE_GCS + } + + async fn permit( + &self, + kind: RequestKind, + cancel: &CancellationToken, + ) -> Result, Cancelled> { + let started_at = start_counting_cancelled_wait(kind); + let acquire = self.concurrency_limiter.acquire(kind); + + let permit = tokio::select! { + permit = acquire => permit.expect("semaphore is never closed"), + _ = cancel.cancelled() => return Err(Cancelled), + }; + + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .wait_seconds + .observe_elapsed(kind, started_at); + + Ok(permit) + } + + async fn owned_permit( + &self, + kind: RequestKind, + cancel: &CancellationToken, + ) -> Result { + let started_at = start_counting_cancelled_wait(kind); + let acquire = self.concurrency_limiter.acquire_owned(kind); + + let permit = tokio::select! { + permit = acquire => permit.expect("semaphore is never closed"), + _ = cancel.cancelled() => return Err(Cancelled), + }; + + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .wait_seconds + .observe_elapsed(kind, started_at); + Ok(permit) + } + + async fn list_versions_with_permit( + &self, + _permit: &tokio::sync::SemaphorePermit<'_>, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> Result { + + let warn_threshold = 3; + let max_retries = 10; + let is_permanent = |e: &_| matches!(e, DownloadError::Cancelled); + + // GCS only has versions, which may contain 'deleted_at'. + let mut versions = crate::GCSVersionListing::default(); + let mut continuation_token = None; + let mut uri: String; + + let list_prefix = prefix + .map(|p| self.relative_path_to_gcs_object(p)) + .or_else(|| { + self.prefix_in_bucket.clone().map(|mut s| { + s.push(REMOTE_STORAGE_PREFIX_SEPARATOR); + s + }) + }) + .unwrap(); + + let mut versions_base_uri = format!( + "https://storage.googleapis.com/storage/v1/b/{}/o?prefix={}&versions=true", + self.bucket_name.clone(), + list_prefix, + ); + + if let ListingMode::WithDelimiter = mode { + versions_base_uri.push_str(&format!( + "&delimiter={}", + REMOTE_STORAGE_PREFIX_SEPARATOR.to_string() + )); + } + + loop { + + match &continuation_token { + Some(token) => { + uri = format!("{}&pageToken={}", &versions_base_uri, token); + }, + None => { + uri = versions_base_uri.clone(); + }, + } + + let mut req_uri = versions_base_uri.clone(); + + let response = backoff::retry( + || async { + + // fetch an array of results, keep looping to get them + let op = Client::new() + .get(&uri) + .bearer_auth( + self.token_provider + .token(GCS_SCOPES) + .await + .map_err(|e: gcp_auth::Error| DownloadError::Other(e.into()))? + .as_str() + ) + .send(); + + tokio::select! { + res = op => res.map_err(|e| DownloadError::Other(e.into())), + _ = cancel.cancelled() => Err(DownloadError::Cancelled), + } + + }, + is_permanent, + warn_threshold, + max_retries, + "listing object versions", + cancel, + ) + .await + .ok_or_else(|| DownloadError::Cancelled) + .and_then(|x| x)?; + + let res = response.json::() + .await + .map_err(|e| DownloadError::Other(e.into()))?; + + // fill up our results vec, + continuation_token = res.next_page_token; + + let version_listing = + res.items + .ok_or_else(|| DownloadError::Other(anyhow::anyhow!("no items returned")))? + .into_iter() + .map(| GCSObject { name, updated, time_deleted, generation, .. } | { + // don't `filter_map`, a `None` for `last_modified` ('updated') is bad for + // time travel, so catch it. + if updated.is_none() { + return Err( + DownloadError::Other( + anyhow::anyhow!("no 'updated' field") + ) + ) + } + Ok( + GCSVersion { + key: self.gcs_object_to_relative_path(&name), + last_modified: to_system_time(updated).unwrap(), + id: VersionId(generation.expect("no version id")), + time_deleted: to_system_time(time_deleted), + } + ) + }).collect::, _>>(); + + versions.versions.extend(version_listing?); + + if let Some(max_keys) = max_keys { + if versions.versions.len() >= max_keys.get().try_into().unwrap() { + return Err(DownloadError::Other( + anyhow::anyhow!("max keys reached") + )); + } + } + + if continuation_token.is_none() { + break + } + } + + Ok(versions) + + } + + async fn put_object( + &self, + byte_stream: impl Stream> + Send + Sync + 'static, + fs_size: usize, + to: &RemotePath, + cancel: &CancellationToken, + metadata: Option, + ) -> anyhow::Result<()> { + let kind = RequestKind::Put; + let _permit = self.permit(kind, cancel).await?; + let started_at = start_measuring_requests(kind); + + let multipart_uri = format!( + "https://storage.googleapis.com/upload/storage/v1/b/{}/o?uploadType=multipart", + self.bucket_name.clone() + ); + + let mut metadata = metadata.clone(); + let gcs_path = self.relative_path_to_gcs_object(to); + + // Always specify destination via `RemotePath` in multipart uploads + if metadata.is_none() { + metadata = Some(StorageMetadata::from([("name", gcs_path.as_str())])); + } else { + metadata + .as_mut() + .map(|m| m.0.insert("name".to_string(), gcs_path)); + } + + let metadata_body = serde_json::to_string(&metadata.map(|m| m.0))?; + let metadata_part = reqwest::multipart::Part::text(metadata_body) + .mime_str("application/json; charset=UTF-8")?; + + let stream_body = reqwest::Body::wrap_stream(byte_stream); + let data_part = reqwest::multipart::Part::stream_with_length(stream_body, fs_size as u64) + .mime_str("application/octet-stream")?; + + let mut form = reqwest::multipart::Form::new() + .part("metadata", metadata_part) + .part("bodystream", data_part); + + let mut headers = header::HeaderMap::new(); + headers.insert( + header::CONTENT_TYPE, + header::HeaderValue::from_str(&format!( + "multipart/related; boundary={}", + form.boundary() + ))?, + ); + + let upload = Client::new() + .post(multipart_uri) + .bearer_auth(self.token_provider.token(GCS_SCOPES).await?.as_str()) + .multipart(form) + .headers(headers) + .send(); + + let upload = tokio::time::timeout(self.timeout, upload); + + let res = tokio::select! { + res = upload => res, + _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()), + }; + + if let Ok(inner) = &res { + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, inner, started_at); + } + match res { + Ok(Ok(res)) => { + if !res.status().is_success() { + match res.status() { + _ => Err(anyhow::anyhow!("GCS PUT error \n\t {:?}", res)), + } + } else { + let body = res + .text() + .await + .map_err(|e: reqwest::Error| DownloadError::Other(e.into()))?; + + let resp: GCSObject = serde_json::from_str(&body) + .map_err(|e: serde_json::Error| DownloadError::Other(e.into()))?; + + if !resp.size.is_some_and(|s| s == fs_size as i64) { + // very unlikely + return Err(anyhow::anyhow!( + "Boundary string from 'multipart/related' HTTP upload occurred in payload" + )); + }; + + Ok(()) + } + } + Ok(Err(reqw)) => Err(reqw.into()), + Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()), + } + } + + async fn delete_oids( + &self, + delete_objects: &[String], + cancel: &CancellationToken, + _permit: &tokio::sync::SemaphorePermit<'_>, + ) -> anyhow::Result<()> { + let kind = RequestKind::Delete; + let mut cancel = std::pin::pin!(cancel.cancelled()); + + for chunk in delete_objects.chunks(MAX_KEYS_PER_DELETE_GCS) { + let started_at = start_measuring_requests(kind); + + // Use this to report keys that didn't delete based on 'content_id' + let mut delete_objects_status = HashMap::new(); + + let mut form = reqwest::multipart::Form::new(); + let bulk_uri = "https://storage.googleapis.com/batch/storage/v1"; + + for (index, path) in delete_objects.iter().enumerate() { + delete_objects_status.insert(index + 1, path.clone()); + + let path_to_delete: String = + url::form_urlencoded::byte_serialize(path.trim_start_matches("/").as_bytes()) + .collect(); + + let delete_req = format!( + " + DELETE /storage/v1/b/{}/o/{} HTTP/1.1\r\n\ + Content-Type: application/json\r\n\ + accept: application/json\r\n\ + content-length: 0\r\n + ", + self.bucket_name.clone(), + path_to_delete + ) + .trim() + .to_string(); + + let content_id = format!("<{}+{}>", Uuid::new_v4(), index + 1); + + let mut part_headers = header::HeaderMap::new(); + part_headers.insert( + header::CONTENT_TYPE, + header::HeaderValue::from_static("application/http"), + ); + part_headers.insert( + header::TRANSFER_ENCODING, + header::HeaderValue::from_static("binary"), + ); + part_headers.insert( + header::HeaderName::from_static("content-id"), + header::HeaderValue::from_str(&content_id)?, + ); + let part = reqwest::multipart::Part::text(delete_req).headers(part_headers); + + form = form.part(format!("request-{}", index), part); + } + + let mut headers = header::HeaderMap::new(); + headers.insert( + header::CONTENT_TYPE, + header::HeaderValue::from_str(&format!( + "multipart/mixed; boundary={}", + form.boundary() + ))?, + ); + + let req = Client::new() + .post(bulk_uri) + .bearer_auth(self.token_provider.token(GCS_SCOPES).await?.as_str()) + .multipart(form) + .headers(headers) + .send(); + + let resp = tokio::select! { + resp = req => resp, + _ = tokio::time::sleep(self.timeout) => return Err(TimeoutOrCancel::Timeout.into()), + _ = &mut cancel => return Err(TimeoutOrCancel::Cancel.into()), + }; + + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &resp, started_at); + + let resp = resp.context("request deletion")?; + + crate::metrics::BUCKET_METRICS + .deleted_objects_total + .inc_by(chunk.len() as u64); + + let res_headers = resp.headers().to_owned(); + + let boundary = res_headers + .get(header::CONTENT_TYPE) + .unwrap() + .to_str()? + .split("=") + .last() + .unwrap(); + + let res_body = resp.text().await?; + + let parsed: HashMap = res_body + .split(&format!("--{}", boundary)) + .filter_map(|c| { + let mut lines = c.lines(); + + let id = lines.find_map(|line| { + line.strip_prefix("Content-ID:") + .and_then(|suf| suf.split('+').last()) + .and_then(|suf| suf.split('>').next()) + .map(|x| x.trim().to_string()) + }); + + let status_code = lines.find_map(|line| { + // Not sure if this protocol version shouldn't be so specific + line.strip_prefix("HTTP/1.1") + .and_then(|x| x.split_whitespace().next()) + .map(|x| x.trim().to_string()) + }); + + id.zip(status_code) + }) + .collect(); + + // Gather failures + let errors: HashMap = parsed + .iter() + .filter_map(|(x, y)| { + let id = x.parse::().ok(); + if y == "404" { + // GCS returns Error on 404, S3 doesn't. Warn and omit from failed count. + // https://cloud.google.com/storage/docs/xml-api/delete-object + tracing::warn!( + "DeleteObjects key {} {} NotFound. Already deleted.", + delete_objects_status.get(&id?).unwrap(), + y + ); + None + } else if y.chars().next() != Some('2') { + id.map(|v| (v, y)) + } else { + None + } + }) + .collect(); + + if !errors.is_empty() { + // Report 10 of them like S3 + const LOG_UP_TO_N_ERRORS: usize = 10; + for (id, code) in errors.iter().take(LOG_UP_TO_N_ERRORS) { + tracing::warn!( + "DeleteObjects key {} failed with code: {}", + delete_objects_status.get(id).unwrap(), + code + ); + } + + return Err(anyhow::anyhow!( + "Failed to delete {}/{} objects", + errors.len(), + chunk.len(), + )); + } + } + + Ok(()) + } + + async fn head_object( + &self, + key: String, + cancel: &CancellationToken, + ) -> Result { + let kind = RequestKind::Head; + let _permit = self.permit(kind, cancel).await?; + + let encoded_path: String = url::form_urlencoded::byte_serialize(key.as_bytes()).collect(); + + let metadata_uri_mod = "alt=json"; + let download_uri = format!( + "https://storage.googleapis.com/storage/v1/b/{}/o/{}?{}", + self.bucket_name.clone(), + encoded_path, + metadata_uri_mod + ); + + let head_future = Client::new() + .get(download_uri) + .bearer_auth( + self.token_provider + .token(GCS_SCOPES) + .await + .map_err(|e: gcp_auth::Error| DownloadError::Other(e.into()))? + .as_str(), + ) + .send(); + + let started_at = start_measuring_requests(kind); + + let head_future = tokio::time::timeout(self.timeout, head_future); + + let res = tokio::select! { + res = head_future => res, + _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()), + }; + + let res = res.map_err(|_e| DownloadError::Timeout)?; + + // do not incl. timeouts as errors in metrics but cancellations + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &res, started_at); + + let data = match res { + Ok(data) => { + if !data.status().is_success() { + match data.status() { + StatusCode::NOT_FOUND => return Err(DownloadError::NotFound), + _ => { + return Err(DownloadError::Other(anyhow::anyhow!( + "GCS head response contained no response body" + ))); + } + } + } else { + data + } + } + Err(e) => { + crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed( + kind, + AttemptOutcome::Err, + started_at, + ); + + return Err(DownloadError::Other( + anyhow::Error::new(e).context("error in HEAD of GCS object"), + )); + } + }; + + let body = data + .text() + .await + .map_err(|e: reqwest::Error| DownloadError::Other(e.into()))?; + + let resp: GCSObject = serde_json::from_str(&body) + .map_err(|e: serde_json::Error| DownloadError::Other(e.into()))?; + + Ok(resp) + } + + async fn list_objects_v2(&self, list_uri: String) -> anyhow::Result { + let res = Client::new() + .get(list_uri) + .bearer_auth(self.token_provider.token(GCS_SCOPES).await?.as_str()); + Ok(res) + } + + // need a 'bucket', a 'key', and a bytes 'range'. + async fn get_object( + &self, + request: GetObjectRequest, + cancel: &CancellationToken, + ) -> anyhow::Result { + let kind = RequestKind::Get; + + let permit = self.owned_permit(kind, cancel).await?; + + let started_at = start_measuring_requests(kind); + + let encoded_path: String = + url::form_urlencoded::byte_serialize(request.key.as_bytes()).collect(); + + /// We do this in two parts: + /// 1. Serialize the metadata of the first request to get Etag, last modified, etc + /// 2. We do not .await the second request pass on the pinned stream to the 'get_object' + /// caller + let metadata_uri_mod = "alt=json"; + let download_uri = format!( + "https://storage.googleapis.com/storage/v1/b/{}/o/{}?{}", + self.bucket_name.clone(), + encoded_path, + metadata_uri_mod + ); + + let res = Client::new() + .get(download_uri) + .bearer_auth( + self.token_provider + .token(GCS_SCOPES) + .await + .map_err(|e: gcp_auth::Error| DownloadError::Other(e.into()))? + .as_str(), + ) + .send(); + + let obj_metadata = tokio::select! { + res = res => res, + _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout), + _ = cancel.cancelled() => return Err(DownloadError::Cancelled), + }; + + let resp = match obj_metadata { + Ok(resp) => { + if !resp.status().is_success() { + match resp.status() { + StatusCode::NOT_FOUND => return Err(DownloadError::NotFound), + _ => { + return Err(DownloadError::Other(anyhow::anyhow!( + "GCS GET resposne contained no response body" + ))); + } + } + } else { + resp + } + } + _ => { + return Err(DownloadError::Other(anyhow::anyhow!("download gcs object"))); + } + }; + + let body = resp + .text() + .await + .map_err(|e: reqwest::Error| DownloadError::Other(e.into()))?; + + let resp: GCSObject = serde_json::from_str(&body) + .map_err(|e: serde_json::Error| DownloadError::Other(e.into()))?; + + // 2. Byte Stream request + let mut headers = header::HeaderMap::new(); + headers.insert(header::RANGE, header::HeaderValue::from_static("bytes=0-")); + + let encoded_path: String = + url::form_urlencoded::byte_serialize(request.key.as_bytes()).collect(); + + let stream_uri_mod = "alt=media"; + // See: https://cloud.google.com/storage/docs/streaming-downloads#stream_a_download + // REST APIs > JSON API > 1st bullet point + let generation = resp + .generation + .expect("object did not contain generation number"); + let generation_mod = format!("generation={generation}"); + let stream_uri = format!( + "https://storage.googleapis.com/storage/v1/b/{}/o/{}?{}&{}", + self.bucket_name.clone(), + encoded_path, + stream_uri_mod, + generation_mod, + ); + + let mut req = Client::new() + .get(stream_uri) + .headers(headers) + .bearer_auth( + self.token_provider + .token(GCS_SCOPES) + .await + .map_err(|e: gcp_auth::Error| DownloadError::Other(e.into()))? + .as_str(), + ) + .send(); + + let get_object = tokio::select! { + res = req => res, + _ = tokio::time::sleep(self.timeout) => return Err(DownloadError::Timeout), + _ = cancel.cancelled() => return Err(DownloadError::Cancelled), + }; + + let started_at = ScopeGuard::into_inner(started_at); + + let object_output = match get_object { + Ok(object_output) => { + if !object_output.status().is_success() { + match object_output.status() { + StatusCode::NOT_FOUND => return Err(DownloadError::NotFound), + _ => { + return Err(DownloadError::Other(anyhow::anyhow!( + "GCS GET response contained no response body" + ))); + } + } + } else { + object_output + } + } + Err(e) => { + crate::metrics::BUCKET_METRICS.req_seconds.observe_elapsed( + kind, + AttemptOutcome::Err, + started_at, + ); + + return Err(DownloadError::Other( + anyhow::Error::new(e).context("download s3 object"), + )); + } + }; + + let remaining = self.timeout.saturating_sub(started_at.elapsed()); + + let metadata = resp.metadata.map(StorageMetadata); + + let etag = resp + .etag + .ok_or(DownloadError::Other(anyhow::anyhow!("Missing ETag header")))? + .into(); + + let last_modified: SystemTime = to_system_time(resp.updated).unwrap_or(SystemTime::now()); + + // But let data stream pass through + Ok(Download { + download_stream: Box::pin(object_output.bytes_stream().map(|item| { + item.map_err(|e: reqwest::Error| std::io::Error::new(std::io::ErrorKind::Other, e)) + })), + etag, + last_modified, + metadata, + }) + } + + async fn copy_object( + &self, + from: &RemotePath, + to: &RemotePath, + cancel: &CancellationToken, + generation: Option<&String>, + ) -> anyhow::Result { + + let copy_from_path: String = + url::form_urlencoded::byte_serialize( + self.relative_path_to_gcs_object(to) + .trim_start_matches("/") + .as_bytes() + ) + .collect(); + + let copy_to_path: String = + url::form_urlencoded::byte_serialize( + self.relative_path_to_gcs_object(to) + .trim_start_matches("/") + .as_bytes() + ) + .collect(); + + let mut copy_uri = format!( + "https://storage.googleapis.com/storage/v1/b/{}/o/{}/rewriteTo/b/{}/o/{}", + self.bucket_name.clone(), + copy_from_path, + self.bucket_name.clone(), + copy_to_path, + ); + + if let Some(gen_id) = generation { + copy_uri += gen_id; + } + + Ok( + Client::new() + .post(copy_uri) + .bearer_auth(self.token_provider.token(GCS_SCOPES).await?.as_str()) + .header(header::CONTENT_TYPE, "application/json") + .header(header::CONTENT_LENGTH, "0") + ) + } + + +} + +impl RemoteStorage for GCSBucket { + // --------------------------------------- + // Neon wrappers for GCS client functions + // --------------------------------------- + + fn list_streaming( + &self, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> impl Stream> { + let kind = RequestKind::List; + + let mut max_keys = max_keys.map(|mk| mk.get() as i32); + + let list_prefix = prefix + .map(|p| self.relative_path_to_gcs_object(p)) + .or_else(|| { + self.prefix_in_bucket.clone().map(|mut s| { + s.push(REMOTE_STORAGE_PREFIX_SEPARATOR); + s + }) + }) + .unwrap(); + + let request_max_keys = self + .max_keys_per_list_response + .into_iter() + .chain(max_keys.into_iter()) + .min() + // https://cloud.google.com/storage/docs/json_api/v1/objects/list?hl=en#parameters + .unwrap_or(1000); + + // We pass URI in to `list_objects_v2` as we'll modify it with `NextPageToken`, hence + // `mut` + let mut list_uri = format!( + "https://storage.googleapis.com/storage/v1/b/{}/o?prefix={}&maxResults={}", + self.bucket_name.clone(), + list_prefix, + request_max_keys, + ); + + // on ListingMode: + // https://github.com/neondatabase/neon/blob/edc11253b65e12a10843711bd88ad277511396d7/libs/remote_storage/src/lib.rs#L158C1-L164C2 + if let ListingMode::WithDelimiter = mode { + list_uri.push_str(&format!( + "&delimiter={}", + REMOTE_STORAGE_PREFIX_SEPARATOR.to_string() + )); + } + + async_stream::stream! { + + let mut continuation_token = None; + + 'outer: loop { + let started_at = start_measuring_requests(kind); + + let request = self.list_objects_v2(list_uri.clone()) + .await + .map_err(DownloadError::Other)? + .send(); + + // this is like `await` + let response = tokio::select! { + res = request => Ok(res), + _ = tokio::time::sleep(self.timeout) => Err(DownloadError::Timeout), + _ = cancel.cancelled() => Err(DownloadError::Cancelled), + }?; + + // just mapping our `Result' error variant's type. + let response = response + .context("Failed to list GCS prefixes") + .map_err(DownloadError::Other); + + let started_at = ScopeGuard::into_inner(started_at); + + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &response, started_at); + + let response = match response { + Ok(response) => response, + Err(e) => { + // The error is potentially retryable, so we must rewind the loop after yielding. + yield Err(e); + continue 'outer; + }, + }; + + let body = response.text() + .await + .map_err(|e: reqwest::Error| DownloadError::Other(e.into()))?; + + let resp: GCSListResponse = serde_json::from_str(&body).map_err(|e: serde_json::Error| DownloadError::Other(e.into()))?; + + let prefixes = resp.common_prefixes(); + let keys = resp.contents(); + + tracing::debug!("list: {} prefixes, {} keys", prefixes.len(), keys.len()); + + let mut result = Listing::default(); + + for res in keys.iter() { + + let last_modified: SystemTime = to_system_time(res.updated.clone()).unwrap_or(SystemTime::now()); + + let size = res.size.unwrap_or(0) as u64; + + let key = res.name.clone(); + + result.keys.push( + ListingObject{ + key: self.gcs_object_to_relative_path(&key), + last_modified, + size + } + ); + + if let Some(mut mk) = max_keys { + assert!(mk > 0); + mk -= 1; + if mk == 0 { + tracing::debug!("reached limit set by max_keys"); + yield Ok(result); + break 'outer; + } + max_keys = Some(mk); + }; + } + + result.prefixes.extend(prefixes.iter().filter_map(|p| { + Some( + self.gcs_object_to_relative_path( + p.trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR) + ), + ) + })); + + yield Ok(result); + + continuation_token = match resp.next_page_token { + Some(token) => { + list_uri = list_uri + "&pageToken=" + &token; + Some(token) + }, + None => break + } + } + } + } + + async fn copy( + &self, + from: &RemotePath, + to: &RemotePath, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + let kind = RequestKind::Copy; + + let _permit = self.permit(kind, cancel).await?; + + let timeout = tokio::time::sleep(self.timeout); + + let started_at = start_measuring_requests(kind); + + let op = self.copy_object( + from, + to, + cancel, + None + ).await?.send(); + + let res = tokio::select! { + res = op => res, + _ = timeout => return Err(TimeoutOrCancel::Timeout.into()), + _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()), + }; + + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, &res, started_at); + + res?; + + Ok(()) + } + + + + async fn upload( + &self, + from: impl Stream> + Send + Sync + 'static, + from_size_bytes: usize, + to: &RemotePath, + metadata: Option, + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + let kind = RequestKind::Put; + let _permit = self.permit(kind, cancel).await?; + + let started_at = start_measuring_requests(kind); + + let upload = self.put_object(from, from_size_bytes, to, cancel, metadata); + + let upload = tokio::time::timeout(self.timeout, upload); + + let res = tokio::select! { + res = upload => res, + _ = cancel.cancelled() => return Err(TimeoutOrCancel::Cancel.into()), + }; + + if let Ok(inner) = &res { + // do not incl. timeouts as errors in metrics but cancellations + let started_at = ScopeGuard::into_inner(started_at); + crate::metrics::BUCKET_METRICS + .req_seconds + .observe_elapsed(kind, inner, started_at); + } + + match res { + Ok(Ok(_put)) => Ok(()), + Ok(Err(sdk)) => { + Err(sdk.into()) + } + Err(_timeout) => Err(TimeoutOrCancel::Timeout.into()), + } + } + + async fn download( + &self, + from: &RemotePath, + opts: &DownloadOpts, + cancel: &CancellationToken, + ) -> Result { + // if prefix is not none then download file `prefix/from` + // if prefix is none then download file `from` + + self.get_object( + GetObjectRequest { + bucket: self.bucket_name.clone(), + key: self + .relative_path_to_gcs_object(from) + .trim_start_matches("/") + .to_string(), + etag: opts.etag.as_ref().map(|e| e.to_string()), + range: opts.byte_range_header(), + }, + cancel, + ) + .await + } + + async fn delete_objects( + &self, + paths: &[RemotePath], + cancel: &CancellationToken, + ) -> anyhow::Result<()> { + let kind = RequestKind::Delete; + let permit = self.permit(kind, cancel).await?; + + let mut delete_objects: Vec = Vec::with_capacity(paths.len()); + + let delete_objects: Vec = paths + .iter() + .map(|i| self.relative_path_to_gcs_object(i)) + .collect(); + + self.delete_oids(&delete_objects, cancel, &permit).await + } + + fn max_keys_per_delete(&self) -> usize { + MAX_KEYS_PER_DELETE_GCS + } + + async fn delete(&self, path: &RemotePath, cancel: &CancellationToken) -> anyhow::Result<()> { + let paths = std::array::from_ref(path); + self.delete_objects(paths, cancel).await + } + + async fn time_travel_recover( + &self, + prefix: Option<&RemotePath>, + timestamp: SystemTime, + done_if_after: SystemTime, + cancel: &CancellationToken, + complexity_limit: Option, + ) -> Result<(), TimeTravelError> { + + let kind = RequestKind::TimeTravel; + let permit = self.permit(kind, cancel).await?; + + tracing::trace!("Target time: {timestamp:?}, done_if_after {done_if_after:?}"); + + let mode = ListingMode::NoDelimiter; + let version_listing = self + .list_versions_with_permit(&permit, prefix, mode, complexity_limit, cancel) + .await + .map_err(|err| match err { + DownloadError::Other(e) => TimeTravelError::Other(e), + DownloadError::Cancelled => TimeTravelError::Cancelled, + other => TimeTravelError::Other(other.into()), + })?; + let versions_and_deletes = version_listing.versions; + + tracing::info!( + "Built list for time travel with {} versions and deletions", + versions_and_deletes.len() + ); + + // Work on the list of references instead of the objects directly, + // otherwise we get lifetime errors in the sort_by_key call below. + let mut versions_and_deletes = versions_and_deletes.iter().collect::>(); + + versions_and_deletes.sort_by_key(|vd| (&vd.key, &vd.last_modified)); + + let mut vds_for_key = HashMap::<_, Vec<_>>::new(); + + for vd in &versions_and_deletes { + let GCSVersion { key, .. } = &vd; + if Some(vd.id.0.as_str()) == Some("null") { + // TODO: check the behavior of using the SDK on a non-versioned container + return Err(TimeTravelError::Other(anyhow::anyhow!( + "Received ListVersions response for key={key} with version_id='null', \ + indicating either disabled versioning, or legacy objects with null version id values" + ))); + } + tracing::trace!("Parsing version key={key} id={:?}", vd.id); + vds_for_key.entry(key).or_default().push(vd); + } + + let warn_threshold = 3; + let max_retries = 10; + let is_permanent = |e: &_| matches!(e, TimeTravelError::Cancelled); + + for (key, versions) in vds_for_key { + let last_vd = versions.last().unwrap(); + let key = self.relative_path_to_gcs_object(key); + if last_vd.last_modified > done_if_after { + /// Case 1: we have a recent object outside of our restore window. + tracing::trace!("Key {key} has version later than done_if_after, skipping"); + continue; + } + /// we get index in the array that we want whether its `v` or `e` + let version_to_restore_to = + match versions.binary_search_by_key(×tamp, |tpl| tpl.last_modified) { + Ok(v) => v, + Err(e) => e, + }; + + let mut do_delete = false; + if version_to_restore_to == 0 { + // All versions more recent, so the key didn't exist at the specified time point. + tracing::trace!( + "All {} versions more recent for {key}, deleting", + versions.len() + ); + do_delete = true; + + } else { + + let GCSVersion { + id: VersionId(version_id), + time_deleted: deletion_timestamp, + .. + } = &versions[version_to_restore_to - 1]; + + // GCS only has 'timeDeleted', not a version object per delete + version. + // A version is either replaced by an object or removed -- stomped or dropped. + // If `timeDeleted` < `time_travel_timestamp`, obj was removed and ought to be deleted. + // If its `None`, that means we have the most current object, no-op. + // Else, it was the same as the `updated` / `timeCreated` of the subsequent version, and ought to be restored. + match &deletion_timestamp { + + Some(time) => { + + if time < ×tamp { + // Case 2: version was last marked deleted before `timestamp` + do_delete = true; + + } else { + + // Case 3: restore state to this version via `copy_object` + tracing::trace!("Copying old version {version_id} for {key}..."); + + let source_id = + format!("?sourceGeneration={version_id}"); + + backoff::retry( + || async { + + let key_path = self.gcs_object_to_relative_path(&key); + + let op = self.copy_object( + &key_path, + &key_path, + cancel, + Some(&source_id), + ).await.map_err(|e| TimeTravelError::Other(e.into()))? + .send(); + + tokio::select! { + res = op => res.map_err(|e| TimeTravelError::Other(e.into())), + _ = cancel.cancelled() => Err(TimeTravelError::Cancelled), + } + }, + is_permanent, + warn_threshold, + max_retries, + "copying object version for time_travel_recover", + cancel, + ) + .await + .ok_or_else(|| TimeTravelError::Cancelled) + .and_then(|x| {x})?; + tracing::info!(%version_id, %key, "Copied old version in GCS"); + } + }, + _ => { + tracing::info!("most current object version, skipping"); + } + } + }; + if do_delete { + tracing::trace!("Deleting {key}..."); + self.delete_oids(&[key], cancel, &permit) + .await + .map_err(|e| { + // delete_oid0 will use TimeoutOrCancel + if TimeoutOrCancel::caused_by_cancel(&e) { + TimeTravelError::Cancelled + } else { + TimeTravelError::Other(e) + } + })?; + } + } + Ok(()) + } + async fn head_object( + &self, + key: &RemotePath, + cancel: &CancellationToken, + ) -> Result { + let path = self + .relative_path_to_gcs_object(key) + .trim_start_matches("/") + .to_string(); + + let resp = self.head_object(path.clone(), cancel).await?; + + let last_modified: SystemTime = to_system_time(resp.updated).unwrap_or(SystemTime::now()); + + let Some(size) = resp.size else { + return Err(DownloadError::Other(anyhow::anyhow!( + "Missing size (content length) header" + ))); + }; + + Ok(ListingObject { + key: self.gcs_object_to_relative_path(&path), + last_modified, + size: size as u64, + }) + } + + async fn list_versions( + &self, + prefix: Option<&RemotePath>, + mode: ListingMode, + max_keys: Option, + cancel: &CancellationToken, + ) -> Result { + let kind = RequestKind::ListVersions; + let permit = self.permit(kind, cancel).await?; + Ok( + self.list_versions_with_permit(&permit, prefix, mode, max_keys, cancel) + .await?.into() + ) + } +} + +// --------- + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "snake_case")] +pub struct GCSListResponse { + #[serde(rename = "nextPageToken")] + pub next_page_token: Option, + pub items: Option>, + pub prefixes: Option>, +} + +fn de_from_str<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let s = Option::::deserialize(deserializer)?; + match s { + Some(s) => i64::from_str(&s).map(Some).map_err(de::Error::custom), + None => Ok(None), + } +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "snake_case")] +pub struct GCSObject { + pub name: String, + pub bucket: String, + pub generation: Option, + pub metageneration: String, + #[serde(rename = "contentType")] + pub content_type: Option, + #[serde(rename = "storageClass")] + pub storage_class: String, + #[serde(deserialize_with = "de_from_str")] + pub size: Option, + #[serde(rename = "md5Hash")] + pub md5_hash: Option, + pub crc32c: String, + pub etag: Option, + #[serde(rename = "timeCreated")] + pub time_created: String, + pub updated: Option, + #[serde(rename = "timeStorageClassUpdated")] + pub time_storage_class_updated: String, + #[serde(rename = "timeDeleted")] + pub time_deleted: Option, + #[serde(rename = "timeFinalized")] + pub time_finalized: String, + pub metadata: Option>, +} + +impl GCSListResponse { + pub fn contents(&self) -> &[GCSObject] { + self.items.as_deref().unwrap_or_default() + } + pub fn common_prefixes(&self) -> &[String] { + self.prefixes.as_deref().unwrap_or_default() + } +} diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 5885c3e791..14245522ed 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -12,6 +12,7 @@ mod azure_blob; mod config; mod error; +mod gcs_bucket; mod local_fs; mod metrics; mod s3_bucket; @@ -43,10 +44,11 @@ use tokio_util::sync::CancellationToken; use tracing::info; pub use self::azure_blob::AzureBlobStorage; +pub use self::gcs_bucket::GCSBucket; pub use self::local_fs::LocalFs; pub use self::s3_bucket::S3Bucket; pub use self::simulate_failures::UnreliableWrapper; -pub use crate::config::{AzureConfig, RemoteStorageConfig, RemoteStorageKind, S3Config}; +pub use crate::config::{AzureConfig, GCSConfig, RemoteStorageConfig, RemoteStorageKind, S3Config}; /// Default concurrency limit for S3 operations /// @@ -81,8 +83,12 @@ pub const MAX_KEYS_PER_DELETE_S3: usize = 1000; /// pub const MAX_KEYS_PER_DELETE_AZURE: usize = 256; +pub const MAX_KEYS_PER_DELETE_GCS: usize = 1000; + const REMOTE_STORAGE_PREFIX_SEPARATOR: char = '/'; +const GCS_SCOPES: &[&str] = &["https://www.googleapis.com/auth/cloud-platform"]; + /// Path on the remote storage, relative to some inner prefix. /// The prefix is an implementation detail, that allows representing local paths /// as the remote ones, stripping the local storage prefix away. @@ -182,6 +188,7 @@ pub struct VersionListing { pub versions: Vec, } +#[derive(Debug)] pub struct Version { pub key: RemotePath, pub last_modified: SystemTime, @@ -203,6 +210,47 @@ pub enum VersionKind { Version(VersionId), } +// I was going to do an `enum GenericVersion` but this feels cleaner. +#[derive(Default)] +pub struct GCSVersionListing { + pub versions: Vec, +} + +#[derive(Debug)] +pub struct GCSVersion { + pub key: RemotePath, + pub last_modified: SystemTime, + pub id: VersionId, + pub time_deleted: Option, +} + +impl From for VersionListing { + fn from(gcs_listing: GCSVersionListing) -> Self { + let version_listing = gcs_listing + .versions + .into_iter() + .map( + |GCSVersion { + key, + last_modified, + id, + .. + }| { + Version { + key, + last_modified, + kind: VersionKind::Version(VersionId(id.0)), + } + }, + ) + .collect::>(); + + VersionListing { + versions: version_listing, + } + } +} + /// Options for downloads. The default value is a plain GET. pub struct DownloadOpts { /// If given, returns [`DownloadError::Unmodified`] if the object still has @@ -481,6 +529,7 @@ pub enum GenericRemoteStorage> { AwsS3(Arc), AzureBlob(Arc), Unreliable(Other), + GCS(Arc), } impl GenericRemoteStorage> { @@ -497,6 +546,7 @@ impl GenericRemoteStorage> { Self::AwsS3(s) => s.list(prefix, mode, max_keys, cancel).await, Self::AzureBlob(s) => s.list(prefix, mode, max_keys, cancel).await, Self::Unreliable(s) => s.list(prefix, mode, max_keys, cancel).await, + Self::GCS(s) => s.list(prefix, mode, max_keys, cancel).await, } } @@ -514,6 +564,7 @@ impl GenericRemoteStorage> { Self::AwsS3(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)), Self::AzureBlob(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)), Self::Unreliable(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)), + Self::GCS(s) => Box::pin(s.list_streaming(prefix, mode, max_keys, cancel)), } } @@ -530,6 +581,7 @@ impl GenericRemoteStorage> { Self::AwsS3(s) => s.list_versions(prefix, mode, max_keys, cancel).await, Self::AzureBlob(s) => s.list_versions(prefix, mode, max_keys, cancel).await, Self::Unreliable(s) => s.list_versions(prefix, mode, max_keys, cancel).await, + Self::GCS(s) => s.list_versions(prefix, mode, max_keys, cancel).await, } } @@ -544,6 +596,7 @@ impl GenericRemoteStorage> { Self::AwsS3(s) => s.head_object(key, cancel).await, Self::AzureBlob(s) => s.head_object(key, cancel).await, Self::Unreliable(s) => s.head_object(key, cancel).await, + Self::GCS(s) => s.head_object(key, cancel).await, } } @@ -561,6 +614,7 @@ impl GenericRemoteStorage> { Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await, Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await, Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await, + Self::GCS(s) => s.upload(from, data_size_bytes, to, metadata, cancel).await, } } @@ -576,6 +630,7 @@ impl GenericRemoteStorage> { Self::AwsS3(s) => s.download(from, opts, cancel).await, Self::AzureBlob(s) => s.download(from, opts, cancel).await, Self::Unreliable(s) => s.download(from, opts, cancel).await, + Self::GCS(s) => s.download(from, opts, cancel).await, } } @@ -590,6 +645,7 @@ impl GenericRemoteStorage> { Self::AwsS3(s) => s.delete(path, cancel).await, Self::AzureBlob(s) => s.delete(path, cancel).await, Self::Unreliable(s) => s.delete(path, cancel).await, + Self::GCS(s) => s.delete(path, cancel).await, } } @@ -604,6 +660,7 @@ impl GenericRemoteStorage> { Self::AwsS3(s) => s.delete_objects(paths, cancel).await, Self::AzureBlob(s) => s.delete_objects(paths, cancel).await, Self::Unreliable(s) => s.delete_objects(paths, cancel).await, + Self::GCS(s) => s.delete_objects(paths, cancel).await, } } @@ -614,6 +671,7 @@ impl GenericRemoteStorage> { Self::AwsS3(s) => s.max_keys_per_delete(), Self::AzureBlob(s) => s.max_keys_per_delete(), Self::Unreliable(s) => s.max_keys_per_delete(), + Self::GCS(s) => s.max_keys_per_delete(), } } @@ -628,6 +686,7 @@ impl GenericRemoteStorage> { Self::AwsS3(s) => s.delete_prefix(prefix, cancel).await, Self::AzureBlob(s) => s.delete_prefix(prefix, cancel).await, Self::Unreliable(s) => s.delete_prefix(prefix, cancel).await, + Self::GCS(s) => s.delete_prefix(prefix, cancel).await, } } @@ -643,6 +702,7 @@ impl GenericRemoteStorage> { Self::AwsS3(s) => s.copy(from, to, cancel).await, Self::AzureBlob(s) => s.copy(from, to, cancel).await, Self::Unreliable(s) => s.copy(from, to, cancel).await, + Self::GCS(s) => s.copy(from, to, cancel).await, } } @@ -672,6 +732,10 @@ impl GenericRemoteStorage> { s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit) .await } + Self::GCS(s) => { + s.time_travel_recover(prefix, timestamp, done_if_after, cancel, complexity_limit) + .await + } } } } @@ -687,11 +751,18 @@ impl GenericRemoteStorage { } pub async fn from_config(storage_config: &RemoteStorageConfig) -> anyhow::Result { + info!("RemoteStorageConfig: {:?}", storage_config); + let timeout = storage_config.timeout; - // If somkeone overrides timeout to be small without adjusting small_timeout, then adjust it automatically + // If someone overrides timeout to be small without adjusting small_timeout, then adjust it automatically let small_timeout = std::cmp::min(storage_config.small_timeout, timeout); + info!( + "RemoteStorageConfig's storage attribute: {:?}", + storage_config.storage + ); + Ok(match &storage_config.storage { RemoteStorageKind::LocalFs { local_path: path } => { info!("Using fs root '{path}' as a remote storage"); @@ -729,6 +800,16 @@ impl GenericRemoteStorage { small_timeout, )?)) } + RemoteStorageKind::GCS(gcs_config) => { + let google_application_credentials = + std::env::var("GOOGLE_APPLICATION_CREDENTIALS") + .unwrap_or_else(|_| "".into()); + info!( + "Using gcs bucket '{}' as a remote storage, prefix in bucket: '{:?}', GOOGLE_APPLICATION_CREDENTIALS: {google_application_credentials }", + gcs_config.bucket_name, gcs_config.prefix_in_bucket + ); + Self::GCS(Arc::new(GCSBucket::new(gcs_config, timeout).await?)) + } }) } @@ -764,6 +845,7 @@ impl GenericRemoteStorage { Self::AwsS3(s) => Some(s.bucket_name()), Self::AzureBlob(s) => Some(s.container_name()), Self::Unreliable(_s) => None, + Self::GCS(s) => Some(s.bucket_name()), } } } diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index f35d2a3081..58e491b56b 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -63,6 +63,7 @@ impl UnreliableWrapper { GenericRemoteStorage::Unreliable(_s) => { panic!("Can't wrap unreliable wrapper unreliably") } + GenericRemoteStorage::GCS(s) => GenericRemoteStorage::GCS(s), }; let actual_attempt_failure_probability = cmp::min(attempt_failure_probability, 100); UnreliableWrapper { diff --git a/libs/remote_storage/tests/test_real_gcs.rs b/libs/remote_storage/tests/test_real_gcs.rs new file mode 100644 index 0000000000..72937c6207 --- /dev/null +++ b/libs/remote_storage/tests/test_real_gcs.rs @@ -0,0 +1,255 @@ +#![allow(dead_code)] +#![allow(unused)] + +mod common; + +use crate::common::{download_to_vec, upload_stream}; +use anyhow::Context; +use camino::Utf8Path; +use futures::StreamExt; +use futures::stream::Stream; +use remote_storage::{ + DownloadKind, DownloadOpts, GCSConfig, GenericRemoteStorage, ListingMode, RemotePath, + RemoteStorageConfig, RemoteStorageKind, StorageMetadata, +}; +use std::collections::HashMap; +#[path = "common/tests.rs"] +use std::collections::HashSet; +use std::fmt::{Debug, Display}; +use std::io::Cursor; +use std::ops::Bound; +use std::pin::pin; +use std::sync::Arc; +use std::time::Duration; +use std::time::SystemTime; +use test_context::{AsyncTestContext, test_context}; +use tokio_util::sync::CancellationToken; +use utils::backoff; + +// A minimal working GCS client I can pass around in async context + +const BASE_PREFIX: &str = "test"; + +async fn create_gcs_client() -> anyhow::Result> { + let bucket_name = std::env::var("GCS_TEST_BUCKET").expect("GCS_TEST_BUCKET must be set"); + let gcs_config = GCSConfig { + bucket_name, + prefix_in_bucket: Some("testing-path/".into()), + max_keys_per_list_response: Some(100), + concurrency_limit: std::num::NonZero::new(100).unwrap(), + }; + + let remote_storage_config = RemoteStorageConfig { + storage: RemoteStorageKind::GCS(gcs_config), + timeout: Duration::from_secs(120), + small_timeout: std::time::Duration::from_secs(120), + }; + Ok(Arc::new( + GenericRemoteStorage::from_config(&remote_storage_config) + .await + .context("remote storage init")?, + )) +} + +struct EnabledGCS { + client: Arc, + base_prefix: &'static str, +} + +impl EnabledGCS { + async fn setup() -> Self { + let client = create_gcs_client() + .await + .context("gcs client creation") + .expect("gcs client creation failed"); + EnabledGCS { + client, + base_prefix: BASE_PREFIX, + } + } +} + +impl AsyncTestContext for EnabledGCS { + async fn setup() -> Self { + Self::setup().await + } +} + +#[test_context(EnabledGCS)] +#[tokio::test] +async fn gcs_test_suite(ctx: &mut EnabledGCS) -> anyhow::Result<()> { + // ------------------------------------------------ + // --- `time_travel_recover`, showcasing `upload`, `delete_objects`, `copy` + // ------------------------------------------------ + + // Our test depends on discrepancies in the clock between S3 and the environment the tests + // run in. Therefore, wait a little bit before and after. The alternative would be + // to take the time from S3 response headers. + const WAIT_TIME: Duration = Duration::from_millis(3_000); + + async fn retry(op: O) -> Result + where + E: Display + Debug + 'static, + O: FnMut() -> F, + F: Future>, + { + let warn_threshold = 3; + let max_retries = 10; + backoff::retry( + op, + |_e| false, + warn_threshold, + max_retries, + "test retry", + &CancellationToken::new(), + ) + .await + .expect("never cancelled") + } + + async fn time_point() -> SystemTime { + tokio::time::sleep(WAIT_TIME).await; + let ret = SystemTime::now(); + tokio::time::sleep(WAIT_TIME).await; + ret + } + + async fn list_files( + client: &Arc, + cancel: &CancellationToken, + ) -> anyhow::Result> { + Ok( + retry(|| client.list(None, ListingMode::NoDelimiter, None, cancel)) + .await + .context("list root files failure")? + .keys + .into_iter() + .map(|o| o.key) + .collect::>(), + ) + } + + let cancel = CancellationToken::new(); + + let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + // ---------------- t0 --------------- + // Upload 'path1' + retry(|| { + let (data, len) = upload_stream("remote blob data1".as_bytes().into()); + ctx.client.upload(data, len, &path1, None, &cancel) + }) + .await?; + let t0_files = list_files(&ctx.client, &cancel).await?; + let t0 = time_point().await; + + // Show 'path1' + println!("at t0: {t0_files:?}"); + + // Upload 'path2' + let old_data = "remote blob data2"; + retry(|| { + let (data, len) = upload_stream(old_data.as_bytes().into()); + ctx.client.upload(data, len, &path2, None, &cancel) + }) + .await?; + + // ---------------- t1 --------------- + // Show 'path1' and 'path2' + let t1_files = list_files(&ctx.client, &cancel).await?; + let t1 = time_point().await; + println!("at t1: {t1_files:?}"); + + { + let opts = DownloadOpts::default(); + let dl = retry(|| ctx.client.download(&path2, &opts, &cancel)).await?; + let last_modified = dl.last_modified; + let half_wt = WAIT_TIME.mul_f32(0.5); + let t0_hwt = t0 + half_wt; + let t1_hwt = t1 - half_wt; + if !(t0_hwt..=t1_hwt).contains(&last_modified) { + panic!( + "last_modified={last_modified:?} is not between t0_hwt={t0_hwt:?} and t1_hwt={t1_hwt:?}. \ + This likely means a large lock discrepancy between S3 and the local clock." + ); + } + } + + // Upload 'path3' + retry(|| { + let (data, len) = upload_stream("remote blob data3".as_bytes().into()); + ctx.client.upload(data, len, &path3, None, &cancel) + }) + .await?; + + // Overwrite 'path2' + let new_data = "new remote blob data2"; + retry(|| { + let (data, len) = upload_stream(new_data.as_bytes().into()); + ctx.client.upload(data, len, &path2, None, &cancel) + }) + .await?; + + // Delete 'path1' + retry(|| ctx.client.delete(&path1, &cancel)).await?; + + // Show 'path2' and `path3` + let t2_files = list_files(&ctx.client, &cancel).await?; + let t2 = time_point().await; + println!("at t2: {t2_files:?}"); + + // No changes after recovery to t2 (no-op) + let t_final = time_point().await; + ctx.client + .time_travel_recover(None, t2, t_final, &cancel, None) + .await?; + let t2_files_recovered = list_files(&ctx.client, &cancel).await?; + println!("after recovery to t2: {t2_files_recovered:?}"); + + assert_eq!(t2_files, t2_files_recovered); + let path2_recovered_t2 = download_to_vec( + ctx.client + .download(&path2, &DownloadOpts::default(), &cancel) + .await?, + ) + .await?; + assert_eq!(path2_recovered_t2, new_data.as_bytes()); + + // after recovery to t1: path1 is back, path2 has the old content + let t_final = time_point().await; + ctx.client + .time_travel_recover(None, t1, t_final, &cancel, None) + .await?; + let t1_files_recovered = list_files(&ctx.client, &cancel).await?; + println!("after recovery to t1: {t1_files_recovered:?}"); + assert_eq!(t1_files, t1_files_recovered); + let path2_recovered_t1 = download_to_vec( + ctx.client + .download(&path2, &DownloadOpts::default(), &cancel) + .await?, + ) + .await?; + assert_eq!(path2_recovered_t1, old_data.as_bytes()); + + // after recovery to t0: everything is gone except for path1 + let t_final = time_point().await; + ctx.client + .time_travel_recover(None, t0, t_final, &cancel, None) + .await?; + let t0_files_recovered = list_files(&ctx.client, &cancel).await?; + println!("after recovery to t0: {t0_files_recovered:?}"); + assert_eq!(t0_files, t0_files_recovered); + + // cleanup + let paths = &[path1, path2, path3]; + retry(|| ctx.client.delete_objects(paths, &cancel)).await?; + + Ok(()) +} diff --git a/pageserver/src/feature_resolver.rs b/pageserver/src/feature_resolver.rs index 678d7e052b..d2844fff12 100644 --- a/pageserver/src/feature_resolver.rs +++ b/pageserver/src/feature_resolver.rs @@ -110,6 +110,12 @@ impl FeatureResolver { PostHogFlagFilterPropertyValue::String("local".to_string()), ); } + RemoteStorageKind::GCS { .. } => { + properties.insert( + "region".to_string(), + PostHogFlagFilterPropertyValue::String("local".to_string()), + ); + } } } // TODO: move this to a background task so that we don't block startup in case of slow disk diff --git a/pageserver/src/tenant/remote_timeline_client/upload.rs b/pageserver/src/tenant/remote_timeline_client/upload.rs index f2fbf656a6..c29711b5d6 100644 --- a/pageserver/src/tenant/remote_timeline_client/upload.rs +++ b/pageserver/src/tenant/remote_timeline_client/upload.rs @@ -158,6 +158,7 @@ pub(super) async fn upload_timeline_layer<'a>( GenericRemoteStorage::LocalFs(_) => {} GenericRemoteStorage::AwsS3(_) => {} GenericRemoteStorage::Unreliable(_) => {} + GenericRemoteStorage::GCS(_) => {} }; /* END_HADRON */ let reader = tokio_util::io::ReaderStream::with_capacity(source_file, super::BUFFER_SIZE); diff --git a/storage_scrubber/src/lib.rs b/storage_scrubber/src/lib.rs index d3ed5a8357..60916846be 100644 --- a/storage_scrubber/src/lib.rs +++ b/storage_scrubber/src/lib.rs @@ -266,6 +266,7 @@ impl BucketConfig { "container {}, storage account {:?}, region {}", config.container_name, config.storage_account, config.container_region ), + RemoteStorageKind::GCS(config) => format!("bucket {}", config.bucket_name), } } pub fn bucket_name(&self) -> Option<&str> { @@ -381,6 +382,9 @@ async fn init_remote( config.prefix_in_container.get_or_insert(default_prefix); } RemoteStorageKind::LocalFs { .. } => (), + RemoteStorageKind::GCS(config) => { + config.prefix_in_bucket.get_or_insert(default_prefix); + } } // We already pass the prefix to the remote client above