From bb931f2ce037a50816ca400e5844a387a783d8bf Mon Sep 17 00:00:00 2001 From: Alek Westover Date: Wed, 14 Jun 2023 10:50:25 -0400 Subject: [PATCH] alek: add download files changes --- compute_tools/src/extension_server.rs | 91 ++++++++++++++++++++ libs/remote_storage/src/lib.rs | 27 ++++++ libs/remote_storage/src/local_fs.rs | 28 ++++++ libs/remote_storage/src/s3_bucket.rs | 47 ++++++++++ libs/remote_storage/src/simulate_failures.rs | 8 ++ 5 files changed, 201 insertions(+) diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index 6d910874f9..371a043d1f 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -3,6 +3,8 @@ use std::net::{TcpListener, TcpStream}; use std::path::Path; use std::str; use std::{fs, thread}; +use toml_edit; +use remote_storage::*; use anyhow::Context; use tracing::info; @@ -18,6 +20,7 @@ pub fn download_file(mut stream: TcpStream) -> anyhow::Result<()> { println!("requested file {}", filename); + download_extension(get_s3_config(), ExtensionType::Shared); let from_prefix = "/tmp/from_prefix"; let to_prefix = "/tmp/to_prefix"; @@ -33,3 +36,91 @@ pub fn download_file(mut stream: TcpStream) -> anyhow::Result<()> { Ok(()) } + +fn get_pg_config(argument: &str) -> String { + // FIXME: this function panics if it runs into any issues + let config_output = std::process::Command::new("pg_config") + .arg(argument) + .output() + .expect("pg_config should be installed"); + assert!(config_output.status.success()); + let stdout = std::str::from_utf8(&config_output.stdout).unwrap(); + stdout.trim().to_string() +} + +fn download_helper(remote_storage: &GenericRemoteStorage, remote_from_path: &RemotePath, to_path: &str) -> anyhow::Result<()> { + let file_name = remote_from_path.object_name().expect("it must exist"); + info!("Downloading {:?}",file_name); + let mut download = remote_storage.download(&remote_from_path).await?; + let mut write_data_buffer = Vec::new(); + download.download_stream.read_to_end(&mut write_data_buffer).await?; + let mut output_file = BufWriter::new(File::create(file_name)?); + output_file.write_all(&mut write_data_buffer)?; + Ok(()) +} + +pub enum ExtensionType { + Shared, // we just use the public folder here + Tenant(String), // String is tenant_id + Library(String) // String is name of the extension +} + +pub async fn download_extension(config: &RemoteStorageConfig, ext_type: ExtensionType) -> anyhow::Result<()>{ + let sharedir = get_pg_config("--sharedir"); + let sharedir = format!("{}/extension", sharedir); + let libdir = get_pg_config("--libdir"); + let remote_storage = GenericRemoteStorage::from_config(config)?; + + match ext_type { + ExtensionType::Shared => { + // 1. Download control files from s3-bucket/public/*.control to SHAREDIR/extension + // We can do this step even before we have spec, + // because public extensions are common for all projects. + let folder = RemotePath::new(Path::new("public_extensions"))?; + let from_paths = remote_storage.list_files(Some(&folder)).await?; + for remote_from_path in from_paths { + if remote_from_path.extension() == Some("control") { + // FIXME: CAUTION: if you run this, it will actually write stuff to your postgress directory + // only run if you are ok with that + download_helper(&remote_storage, &remote_from_path, &sharedir)?; + } + } + } + ExtensionType::Tenant(tenant_id) => { + // 2. After we have spec, before project start + // Download control files from s3-bucket/[tenant-id]/*.control to SHAREDIR/extension + let folder = RemotePath::new(Path::new(format!("{tenant_id}")))?; + let from_paths = remote_storage.list_files(Some(&folder)).await?; + for remote_from_path in from_paths { + if remote_from_path.extension() == Some("control") { + download_helper(&remote_storage, &remote_from_path, &sharedir)?; + } + } + } + ExtensionType::Library(library_name) => { + // 3. After we have spec, before postgres start + // Download preload_shared_libraries from s3-bucket/public/[library-name].control into LIBDIR/ + let from_path = format!("neon-dev-extensions/public/{library_name}.control"); + let remote_from_path = RemotePath::new(Path::new(&from_path))?; + download_helper(&remote_storage, &remote_from_path, &libdir)?; + } + } + Ok(()) +} + +pub fn get_s3_config() -> anyhow::Result { + // TODO: Right now we are using the same config parameters as pageserver; but should we have our own configs? + // TODO: Should we read the s3_config from CLI arguments? + let cfg_file_path = Path::new("./../.neon/pageserver.toml"); + let cfg_file_contents = std::fs::read_to_string(cfg_file_path) + .with_context(|| format!( "Failed to read pageserver config at '{}'", cfg_file_path.display()))?; + let toml = cfg_file_contents + .parse::() + .with_context(|| format!( "Failed to parse '{}' as pageserver config", cfg_file_path.display()))?; + let remote_storage_data = toml.get("remote_storage") + .context("field should be present")?; + let remote_storage_config = RemoteStorageConfig::from_toml(remote_storage_data)? + .context("error configuring remote storage")?; + Ok(remote_storage_config) +} + diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index e0cc3ca543..a02343cda8 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -70,6 +70,10 @@ impl RemotePath { pub fn join(&self, segment: &Path) -> Self { Self(self.0.join(segment)) } + + pub fn extension(&self) -> Option<&str> { + self.0.extension()?.to_str() + } } /// Storage (potentially remote) API to manage its state. @@ -86,6 +90,12 @@ pub trait RemoteStorage: Send + Sync + 'static { prefix: Option<&RemotePath>, ) -> Result, DownloadError>; + /// Lists all files in a subdirectories + async fn list_files( + &self, + prefix: Option<&RemotePath>, + ) -> anyhow::Result>; + /// Streams the local file contents into remote into the remote storage entry. async fn upload( &self, @@ -161,6 +171,23 @@ pub enum GenericRemoteStorage { } impl GenericRemoteStorage { + // A function for listing all the files in a "directory" + // Example: + // list_files("foo/bar") = ["foo/bar/a.txt", "foo/bar/b.txt"] + pub async fn list_files( + &self, + folder: Option<&RemotePath> + ) -> anyhow::Result>{ + match self { + Self::LocalFs(s) => s.list_files(folder).await, + Self::AwsS3(s) => s.list_files(folder).await, + Self::Unreliable(s) => s.list_files(folder).await, + } + } + + // lists common *prefixes*, if any of files + // Example: + // list_prefixes("foo123","foo567","bar123","bar432") = ["foo", "bar"] pub async fn list_prefixes( &self, prefix: Option<&RemotePath>, diff --git a/libs/remote_storage/src/local_fs.rs b/libs/remote_storage/src/local_fs.rs index c081a6d361..d07a415e5d 100644 --- a/libs/remote_storage/src/local_fs.rs +++ b/libs/remote_storage/src/local_fs.rs @@ -116,6 +116,34 @@ impl RemoteStorage for LocalFs { .collect()) } + async fn list_files( + &self, + folder: Option<&RemotePath> + ) -> anyhow::Result> { + /* Note: if you want, you can return a DownloadError instead of an anyhow::Error + as follows: replace all ?'s with: + .map_err(|e| DownloadError::Other(anyhow::Error::from(e)))?; + */ + let full_path = match folder.clone() { + Some(folder) => folder.with_base(&self.storage_root), + None => self.storage_root.clone(), + }; + let mut entries = fs::read_dir(full_path).await?; + let mut files = vec![]; + while let Some(entry) = entries.next_entry().await? { + let file_name: PathBuf = entry.file_name().into(); + let file_type = entry.file_type().await?; + if file_type.is_file() { + let mut file_remote_path = RemotePath::new(&file_name)?; + if let Some(folder) = folder { + file_remote_path = folder.join(&file_name); + } + files.push(file_remote_path); + } + } + Ok(files) + } + async fn upload( &self, data: impl io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 0be8c72fe0..14786e7820 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -332,6 +332,53 @@ impl RemoteStorage for S3Bucket { Ok(document_keys) } + async fn list_files( + &self, + folder: Option<&RemotePath> + ) -> anyhow::Result>{ + // TODO: should we use DownloadError error type instead of anyhow::Error? + let folder_name = folder.map(|x| + String::from(x.object_name().expect("invalid folder name")) + ); + let mut continuation_token = None; + let mut all_files = vec![]; + loop { + let _guard = self + .concurrency_limiter + .acquire() + .await + .context("Concurrency limiter semaphore got closed during S3 list_files")?; + metrics::inc_list_objects(); + + let response = self + .client + .list_objects_v2() + .bucket(self.bucket_name.clone()) + .set_prefix(folder_name.clone()) + .set_continuation_token(continuation_token) + .set_max_keys(self.max_keys_per_list_response) + .send() + .await + .map_err(|e| { + metrics::inc_list_objects_fail(); + e + }) + .context("Failed to list files in S3 bucket")?; + + for object in response.contents().unwrap_or_default() { + let object_path = object.key().unwrap(); + println!("{:?}", object_path); + let remote_path = self.s3_object_to_relative_path(object_path); + all_files.push(remote_path); + } + match response.next_continuation_token { + Some(new_token) => continuation_token = Some(new_token), + None => break, + } + } + Ok(all_files) + } + async fn upload( &self, from: impl io::AsyncRead + Unpin + Send + Sync + 'static, diff --git a/libs/remote_storage/src/simulate_failures.rs b/libs/remote_storage/src/simulate_failures.rs index cb40859831..bd1841ae21 100644 --- a/libs/remote_storage/src/simulate_failures.rs +++ b/libs/remote_storage/src/simulate_failures.rs @@ -82,6 +82,14 @@ impl RemoteStorage for UnreliableWrapper { self.inner.list_prefixes(prefix).await } + async fn list_files( + &self, + folder: Option<&RemotePath> + ) -> anyhow::Result>{ + self.attempt(RemoteOp::ListPrefixes(folder.cloned()))?; + self.inner.list_files(folder).await + } + async fn upload( &self, data: impl tokio::io::AsyncRead + Unpin + Send + Sync + 'static,