From 7ed120a98ef2abc6a3150338add477d61e124795 Mon Sep 17 00:00:00 2001 From: Alek Westover Date: Tue, 11 Jul 2023 19:50:37 -0400 Subject: [PATCH] zip file downloading is successful --- compute_tools/src/compute.rs | 9 ++- compute_tools/src/extension_server.rs | 54 ++++++++------- .../regress/test_download_extensions.py | 68 ++++++++++--------- 3 files changed, 75 insertions(+), 56 deletions(-) diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index f9efb155b4..ba01731687 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -723,7 +723,13 @@ LIMIT 100", match &self.ext_remote_storage { None => anyhow::bail!("No remote extension storage"), Some(remote_storage) => { - extension_server::download_extension(ext_name, remote_storage, &self.pgbin).await + extension_server::download_extension( + ext_name, + remote_storage, + &self.pgbin, + &self.pgversion, + ) + .await } } } @@ -732,6 +738,7 @@ LIMIT 100", pub async fn prepare_preload_libraries(&self, compute_state: &ComputeState) -> Result<()> { // TODO: revive some of the old logic for downloading shared preload libaries info!("I HAVENT IMPLEMENTED DOWNLOADING SHARED PRELOAD LIBRARIES YET"); + dbg!(compute_state); Ok(()) } } diff --git a/compute_tools/src/extension_server.rs b/compute_tools/src/extension_server.rs index 4284f21a88..43ad21249f 100644 --- a/compute_tools/src/extension_server.rs +++ b/compute_tools/src/extension_server.rs @@ -17,12 +17,10 @@ extensions enabled for specific tenant-ids. */ use crate::compute::ComputeNode; use anyhow::{self, bail, Result}; -use futures::future::ok; use remote_storage::*; -use serde_json::{self, Map, Value}; +use serde_json::{self, Value}; use std::collections::HashSet; use std::fs::File; -use std::hash::Hash; use std::io::BufWriter; use std::io::Write; use std::num::{NonZeroU32, NonZeroUsize}; @@ -78,6 +76,7 @@ pub async fn get_available_extensions( dbg!(all_files); + // TODO: if index_path already exists, don't re-download it, just read it. let mut download = remote_storage.download(&index_path).await?; let mut write_data_buffer = Vec::new(); download @@ -137,33 +136,40 @@ pub async fn get_available_extensions( } // download all sqlfiles (and possibly data files) for a given extension name -// pub async fn download_extension( ext_name: &str, remote_storage: &GenericRemoteStorage, pgbin: &str, + pg_version: &str, ) -> Result<()> { - todo!(); - // let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); - // let local_libdir = Path::new(&get_pg_config("--libdir", pgbin)).to_owned(); - // info!("Start downloading extension {:?}", ext_name); - // let mut download = remote_storage.download(&ext_path).await?; - // let mut write_data_buffer = Vec::new(); - // download - // .download_stream - // .read_to_end(&mut write_data_buffer) - // .await?; - // let zip_name = ext_path.object_name().expect("invalid extension path"); - // let mut output_file = BufWriter::new(File::create(zip_name)?); - // output_file.write_all(&write_data_buffer)?; - // info!("Download {:?} completed successfully", &ext_path); - // info!("Unzipping extension {:?}", zip_name); + let ext_path = RemotePath::new( + &Path::new(pg_version) + .join("extensions") + .join(ext_name.to_owned() + ".tar.gz"), + )?; + let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"); + let local_libdir = Path::new(&get_pg_config("--libdir", pgbin)).to_owned(); + info!( + "Start downloading extension {:?} from {:?}", + ext_name, ext_path + ); + let mut download = remote_storage.download(&ext_path).await?; + let mut write_data_buffer = Vec::new(); + download + .download_stream + .read_to_end(&mut write_data_buffer) + .await?; + let zip_name = ext_path.object_name().expect("invalid extension path"); + let mut output_file = BufWriter::new(File::create(zip_name)?); + output_file.write_all(&write_data_buffer)?; + info!("Download {:?} completed successfully", &ext_path); + info!("Unzipping extension {:?}", zip_name); - // // TODO unzip and place files in appropriate locations - // info!("unzip {zip_name:?}"); - // info!("place extension files in {local_sharedir:?}"); - // info!("place library files in {local_libdir:?}"); - // Ok(()) + // TODO unzip and place files in appropriate locations + info!("unzip {zip_name:?}"); + info!("place extension files in {local_sharedir:?}"); + info!("place library files in {local_libdir:?}"); + Ok(()) } // This function initializes the necessary structs to use remmote storage (should be fairly cheap) diff --git a/test_runner/regress/test_download_extensions.py b/test_runner/regress/test_download_extensions.py index 0fe7775bd4..ff27c5eb9e 100644 --- a/test_runner/regress/test_download_extensions.py +++ b/test_runner/regress/test_download_extensions.py @@ -1,3 +1,4 @@ +import os from contextlib import closing import pytest @@ -54,45 +55,50 @@ def test_remote_extensions( env.remote_storage_client.upload_fileobj( f, env.ext_remote_storage.bucket_name, - f"ext/v{pg_version}/anon.tar.gz", + f"ext/v{pg_version}/extensions/anon.tar.gz", ) with open("test_runner/regress/data/extension_test/embedding.tar.gz", "rb") as f: env.remote_storage_client.upload_fileobj( f, env.ext_remote_storage.bucket_name, - f"ext/v{pg_version}/embedding.tar.gz", + f"ext/v{pg_version}/extensions/embedding.tar.gz", ) - # Start a compute node and check that it can download the extensions - # and use them to CREATE EXTENSION and LOAD - endpoint = env.endpoints.create_start( - "test_remote_extensions", - tenant_id=tenant_id, - remote_ext_config=env.ext_remote_storage.to_string(), - # config_lines=["log_min_messages=debug3"], - ) - with closing(endpoint.connect()) as conn: - with conn.cursor() as cur: - # Check that appropriate control files were downloaded - cur.execute("SELECT * FROM pg_available_extensions") - all_extensions = [x[0] for x in cur.fetchall()] - log.info(all_extensions) - assert "anon" in all_extensions - assert "embedding" in all_extensions - # TODO: check that we don't have download custom extensions for other tenant ids - # TODO: not sure how private extension will work with REAL_S3 test. can we rig the tenant id? + try: + # Start a compute node and check that it can download the extensions + # and use them to CREATE EXTENSION and LOAD + endpoint = env.endpoints.create_start( + "test_remote_extensions", + tenant_id=tenant_id, + remote_ext_config=env.ext_remote_storage.to_string(), + # config_lines=["log_min_messages=debug3"], + ) + with closing(endpoint.connect()) as conn: + with conn.cursor() as cur: + # Check that appropriate control files were downloaded + cur.execute("SELECT * FROM pg_available_extensions") + all_extensions = [x[0] for x in cur.fetchall()] + log.info(all_extensions) + assert "anon" in all_extensions + assert "embedding" in all_extensions + # TODO: check that we don't have download custom extensions for other tenant ids + # TODO: not sure how private extension will work with REAL_S3 test. can we rig the tenant id? - # check that we can download public extension - cur.execute("CREATE EXTENSION embedding") - cur.execute("SELECT extname FROM pg_extension") - assert "embedding" in [x[0] for x in cur.fetchall()] + # check that we can download public extension + cur.execute("CREATE EXTENSION embedding") + cur.execute("SELECT extname FROM pg_extension") + assert "embedding" in [x[0] for x in cur.fetchall()] - # check that we can download private extension - # TODO: this will fail locally because we don't have the required dependencies - cur.execute("CREATE EXTENSION anon") - cur.execute("SELECT extname FROM pg_extension") - assert "embedding" in [x[0] for x in cur.fetchall()] + # check that we can download private extension + # TODO: this will fail locally because we don't have the required dependencies + cur.execute("CREATE EXTENSION anon") + cur.execute("SELECT extname FROM pg_extension") + assert "embedding" in [x[0] for x in cur.fetchall()] - # TODO: should we try libraries too? + # TODO: should we try libraries too? - # TODO: cleanup downloaded files in mock tests. + finally: + cleanup_files = ["embedding.tar.gz", "anon.tar.gz"] + # for file in cleanup_files: + # os.remove(file) + log.info("For now, please manually cleanup ", cleanup_files)