From 214ecacfc40bf893bbd0dc981927f02db4c07ecf Mon Sep 17 00:00:00 2001 From: Alek Westover Date: Thu, 15 Jun 2023 10:20:01 -0400 Subject: [PATCH] clippy --- Cargo.lock | 2 + compute_tools/Cargo.toml | 2 + compute_tools/src/extension_server.rs | 64 ++++++++++++++----- compute_tools/src/http/api.rs | 5 +- compute_tools/src/lib.rs | 1 + libs/remote_storage/src/lib.rs | 4 +- libs/remote_storage/src/s3_bucket.rs | 7 -- .../regress/test_download_extensions.py | 2 + 8 files changed, 60 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c078510129..0f8115d7ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -924,12 +924,14 @@ dependencies = [ "opentelemetry", "postgres", "regex", + "remote_storage", "reqwest", "serde", "serde_json", "tar", "tokio", "tokio-postgres", + "toml_edit", "tracing", "tracing-opentelemetry", "tracing-subscriber", diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 21226249cf..43d122c90d 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -30,3 +30,5 @@ url.workspace = true compute_api.workspace = true utils.workspace = true workspace_hack.workspace = true +toml_edit.workspace = true +remote_storage = { version = "0.1", path = "../libs/remote_storage/" } diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index 371a043d1f..b01da48b1e 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -1,10 +1,11 @@ +use remote_storage::*; use std::io::{Read, Write}; use std::net::{TcpListener, TcpStream}; use std::path::Path; use std::str; use std::{fs, thread}; use toml_edit; -use remote_storage::*; +use BuffWriter; use anyhow::Context; use tracing::info; @@ -12,6 +13,9 @@ use tracing::info; pub fn download_file(mut stream: TcpStream) -> anyhow::Result<()> { let mut buf = [0; 512]; + println!("ALEK: calling download file"); + // fs.write("world.txt", "hello")?; + stream.read(&mut buf).expect("Error reading from stream"); let filename = str::from_utf8(&buf) @@ -20,7 +24,7 @@ pub fn download_file(mut stream: TcpStream) -> anyhow::Result<()> { println!("requested file {}", filename); - download_extension(get_s3_config(), ExtensionType::Shared); + // download_extension(get_s3_config(), ExtensionType::Shared); let from_prefix = "/tmp/from_prefix"; let to_prefix = "/tmp/to_prefix"; @@ -48,24 +52,41 @@ fn get_pg_config(argument: &str) -> String { stdout.trim().to_string() } -fn download_helper(remote_storage: &GenericRemoteStorage, remote_from_path: &RemotePath, to_path: &str) -> anyhow::Result<()> { +async fn download_helper( + remote_storage: &GenericRemoteStorage, + remote_from_path: &RemotePath, + to_path: &str, +) -> anyhow::Result<()> { let file_name = remote_from_path.object_name().expect("it must exist"); - info!("Downloading {:?}",file_name); + info!("Downloading {:?}", file_name); let mut download = remote_storage.download(&remote_from_path).await?; - let mut write_data_buffer = Vec::new(); - download.download_stream.read_to_end(&mut write_data_buffer).await?; + let mut write_data_buffer = Vec::new(); + download + .download_stream + .read_to_end(&mut write_data_buffer) + .await?; let mut output_file = BufWriter::new(File::create(file_name)?); output_file.write_all(&mut write_data_buffer)?; Ok(()) } pub enum ExtensionType { - Shared, // we just use the public folder here - Tenant(String), // String is tenant_id - Library(String) // String is name of the extension + Shared, // we just use the public folder here + Tenant(String), // String is tenant_id + Library(String), // String is name of the extension } -pub async fn download_extension(config: &RemoteStorageConfig, ext_type: ExtensionType) -> anyhow::Result<()>{ +/* +separate stroage and compute +storage: pageserver stores pages, accepts WAL. communicates with S3. +compute: postgres, runs in kubernetes/ VM, started by compute_ctl. rust service. accepts some spec. + +pass config to compute_ctl + */ +pub async fn download_extension( + config: &RemoteStorageConfig, + ext_type: ExtensionType, +) -> anyhow::Result<()> { let sharedir = get_pg_config("--sharedir"); let sharedir = format!("{}/extension", sharedir); let libdir = get_pg_config("--libdir"); @@ -80,8 +101,8 @@ pub async fn download_extension(config: &RemoteStorageConfig, ext_type: Extensio let from_paths = remote_storage.list_files(Some(&folder)).await?; for remote_from_path in from_paths { if remote_from_path.extension() == Some("control") { - // FIXME: CAUTION: if you run this, it will actually write stuff to your postgress directory - // only run if you are ok with that + // NOTE: if you run this, it will actually write stuff to your postgress directory + // only run if you are ok with that. TODO: delete this comment download_helper(&remote_storage, &remote_from_path, &sharedir)?; } } @@ -112,15 +133,24 @@ pub fn get_s3_config() -> anyhow::Result { // TODO: Right now we are using the same config parameters as pageserver; but should we have our own configs? // TODO: Should we read the s3_config from CLI arguments? let cfg_file_path = Path::new("./../.neon/pageserver.toml"); - let cfg_file_contents = std::fs::read_to_string(cfg_file_path) - .with_context(|| format!( "Failed to read pageserver config at '{}'", cfg_file_path.display()))?; + let cfg_file_contents = std::fs::read_to_string(cfg_file_path).with_context(|| { + format!( + "Failed to read pageserver config at '{}'", + cfg_file_path.display() + ) + })?; let toml = cfg_file_contents .parse::() - .with_context(|| format!( "Failed to parse '{}' as pageserver config", cfg_file_path.display()))?; - let remote_storage_data = toml.get("remote_storage") + .with_context(|| { + format!( + "Failed to parse '{}' as pageserver config", + cfg_file_path.display() + ) + })?; + let remote_storage_data = toml + .get("remote_storage") .context("field should be present")?; let remote_storage_config = RemoteStorageConfig::from_toml(remote_storage_data)? .context("error configuring remote storage")?; Ok(remote_storage_config) } - diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index b5a4f90756..c00c5a0088 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -17,6 +17,8 @@ use tokio::task; use tracing::{error, info}; use tracing_utils::http::OtelName; +use crate::extension_server; + fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse { ComputeStatusResponse { start_time: state.start_time, @@ -133,7 +135,8 @@ async fn routes(req: Request, compute: &Arc) -> Response Response::new(Body::from("OK")), Err(e) => { error!("download_file failed: {}", e); diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 24811f75ee..c061ab2da3 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -9,6 +9,7 @@ pub mod http; #[macro_use] pub mod logger; pub mod compute; +pub mod extension_server; pub mod monitor; pub mod params; pub mod pg_helpers; diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index a02343cda8..b893602f41 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -90,10 +90,10 @@ pub trait RemoteStorage: Send + Sync + 'static { prefix: Option<&RemotePath>, ) -> Result, DownloadError>; - /// Lists all files in a subdirectories + /// Lists all files in a folder async fn list_files( &self, - prefix: Option<&RemotePath>, + folder: Option<&RemotePath>, ) -> anyhow::Result>; /// Streams the local file contents into remote into the remote storage entry. diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index 14786e7820..ee3a6cb8f0 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -336,7 +336,6 @@ impl RemoteStorage for S3Bucket { &self, folder: Option<&RemotePath> ) -> anyhow::Result>{ - // TODO: should we use DownloadError error type instead of anyhow::Error? let folder_name = folder.map(|x| String::from(x.object_name().expect("invalid folder name")) ); @@ -348,7 +347,6 @@ impl RemoteStorage for S3Bucket { .acquire() .await .context("Concurrency limiter semaphore got closed during S3 list_files")?; - metrics::inc_list_objects(); let response = self .client @@ -359,15 +357,10 @@ impl RemoteStorage for S3Bucket { .set_max_keys(self.max_keys_per_list_response) .send() .await - .map_err(|e| { - metrics::inc_list_objects_fail(); - e - }) .context("Failed to list files in S3 bucket")?; for object in response.contents().unwrap_or_default() { let object_path = object.key().unwrap(); - println!("{:?}", object_path); let remote_path = self.s3_object_to_relative_path(object_path); all_files.push(remote_path); } diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py index 64953f49ec..8c1dd55fc1 100644 --- a/test_runner/regress/test_download_extensions.py +++ b/test_runner/regress/test_download_extensions.py @@ -44,6 +44,8 @@ def test_file_download( env.neon_cli.create_timeline("test_file_download", tenant_id=tenant) endpoint = env.endpoints.create_start("test_file_download", tenant_id=tenant) + # download the control file from MockS3 + with closing(endpoint.connect()) as conn: with conn.cursor() as cur: cur.execute("CREATE EXTENSION test_load");