WIP: half-done refactoring of download_extension logic.

this breaks tests
This commit is contained in:
Anastasia Lubennikova
2023-06-23 16:11:43 +03:00
parent fd3dfe9d52
commit a4ddad92c1
4 changed files with 164 additions and 68 deletions

View File

@@ -49,7 +49,9 @@ use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::{download_extension, init_remote_storage, ExtensionType};
use compute_tools::extension_server::{
download_extension, get_availiable_extensions, init_remote_storage,
};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
use compute_tools::monitor::launch_monitor;
@@ -78,11 +80,12 @@ fn main() -> Result<()> {
let rt = Runtime::new().unwrap();
let copy_remote_storage = ext_remote_storage.clone();
rt.block_on(async move {
download_extension(&copy_remote_storage, ExtensionType::Shared, pgbin)
.await
.expect("download extension should work");
});
// rt.block_on(async move {
// download_extension(&copy_remote_storage, ExtensionType::Shared, pgbin)
// .await
// .expect("download extension should work");
// });
let http_port = *matches
.get_one::<u16>("http-port")
@@ -199,6 +202,8 @@ fn main() -> Result<()> {
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage,
availiable_extensions: Vec::new(),
availiable_libraries: Vec::new(),
};
let compute = Arc::new(compute_node);
@@ -209,6 +214,19 @@ fn main() -> Result<()> {
let extension_server_port: u16 = http_port;
// exen before we have spec, we can get public availiable extensions
// TODO turn get_availiable_extensions() & other functions into ComputeNode method,
// we pass to many params from it anyways..
compute_node.availiable_extensions = get_availiable_extensions(
ext_remote_storage,
pg_version, //TODO
pgbin,
None,
);
// TODO same for libraries
if !spec_set {
// No spec provided, hang waiting for it.
info!("no compute spec provided, waiting");
@@ -246,6 +264,17 @@ fn main() -> Result<()> {
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
// download private tenant extensions before postgres start
// TODO
// compute_node.availiable_extensions = get_availiable_extensions(ext_remote_storage,
// pg_version, //TODO
// pgbin,
// tenant_id); //TODO get tenant_id from spec
// download preload shared libraries before postgres start (if any)
// TODO
// download_library_file();
// Start Postgres
let mut delay_exit = false;
let mut exit_code = None;

View File

@@ -16,7 +16,7 @@ use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeMode, ComputeSpec};
use remote_storage::GenericRemoteStorage;
use remote_storage::{GenericRemoteStorage, RemotePath};
use crate::config;
use crate::pg_helpers::*;
@@ -49,6 +49,8 @@ pub struct ComputeNode {
pub state_changed: Condvar,
/// S3 extensions configuration variables
pub ext_remote_storage: Option<GenericRemoteStorage>,
pub availiable_extensions: Vec<RemotePath>,
pub availiable_libraries: Vec<RemotePath>,
}
#[derive(Clone, Debug)]

View File

@@ -1,4 +1,4 @@
use anyhow::{self, bail};
use anyhow::{self, bail, Result};
use remote_storage::*;
use serde_json::{self, Value};
use std::fs::File;
@@ -8,6 +8,7 @@ use std::path::{Path, PathBuf};
use std::str;
use tokio::io::AsyncReadExt;
use tracing::info;
use utils::id::TenantId;
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
@@ -55,60 +56,92 @@ async fn download_helper(
Ok(())
}
pub enum ExtensionType {
Shared, // we just use the public folder here
Tenant(String), // String is tenant_id
Library(String), // String is name of the extension
}
pub async fn download_extension(
remote_storage: &Option<GenericRemoteStorage>,
ext_type: ExtensionType,
// download extension control files
//
// return list of all extension files to use it in the future searches
//
// if tenant_id is provided - search in a private per-tenant extension path,
// otherwise - in public extension path
//
pub async fn get_availiable_extensions(
remote_storage: &GenericRemoteStorage,
pg_version: &str,
pgbin: &str,
) -> anyhow::Result<()> {
let remote_storage = match remote_storage {
Some(remote_storage) => remote_storage,
None => return Ok(()),
tenant_id: Option<TenantId>,
) -> anyhow::Result<Vec<RemotePath>> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let remote_sharedir = match tenant_id {
None => RemotePath::new(&Path::new(&pg_version).join("share/postgresql/extension"))?,
Some(tenant_id) => RemotePath::new(
&Path::new(&pg_version)
.join(&tenant_id.to_string())
.join("share/postgresql/extension"),
)?,
};
let pg_version = get_pg_version(pgbin);
match ext_type {
ExtensionType::Shared => {
// 1. Download control files from s3-bucket/public/*.control to SHAREDIR/extension
// We can do this step even before we have spec,
// because public extensions are common for all projects.
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let remote_sharedir = Path::new(&pg_version).join("share/postgresql/extension");
let remote_sharedir = RemotePath::new(Path::new(&remote_sharedir))?;
let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?;
for remote_from_path in from_paths {
if remote_from_path.extension() == Some("control") {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
}
}
}
ExtensionType::Tenant(tenant_id) => {
// 2. After we have spec, before project start
// Download control files from s3-bucket/[tenant-id]/*.control to SHAREDIR/extension
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let remote_path = RemotePath::new(Path::new(&tenant_id.to_string()))?;
let from_paths = remote_storage.list_files(Some(&remote_path)).await?;
for remote_from_path in from_paths {
if remote_from_path.extension() == Some("control") {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
}
}
}
ExtensionType::Library(library_name) => {
// 3. After we have spec, before postgres start
// Download preload_shared_libraries from s3-bucket/public/[library-name].control into LIBDIR/
let from_paths = remote_storage.list_files(Some(&remote_sharedir)).await?;
let local_libdir: PathBuf = Path::new(&get_pg_config("--libdir", pgbin)).into();
let remote_path = format!("{library_name}.control");
let remote_from_path = RemotePath::new(Path::new(&remote_path))?;
download_helper(remote_storage, &remote_from_path, &local_libdir).await?;
// download all found control files
for remote_from_path in &from_paths {
if remote_from_path.extension() == Some("control") {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
}
}
Ok(from_paths)
}
// download all sql files for a given extension name
//
pub async fn download_extension_sql_files(
ext_name: &str,
availiable_extensions: Vec<RemotePath>,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
) -> Result<()> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
// check if extension files exist
let files_to_download: Vec<&RemotePath> = availiable_extensions
.iter()
.filter(|ext| {
ext.extension() == Some("sql") && ext.object_name().unwrap().starts_with(ext_name)
})
.collect();
if files_to_download.is_empty() {
bail!("Files for extension {ext_name} are not found in the extension store");
}
for remote_from_path in files_to_download {
download_helper(remote_storage, &remote_from_path, &local_sharedir).await?;
}
Ok(())
}
// download shared library file
pub async fn download_library_file(
lib_name: &str,
availiable_libraries: Vec<RemotePath>,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
) -> Result<()> {
let local_libdir: PathBuf = Path::new(&get_pg_config("--libdir", pgbin)).into();
// check if the library file exists
let lib = availiable_libraries
.iter()
.find(|lib: &&RemotePath| lib.object_name().unwrap() == lib_name);
match lib {
None => bail!("Shared library file {lib_name} is not found in the extension store"),
Some(lib) => {
download_helper(remote_storage, &lib, &local_libdir).await?;
}
}
Ok(())
}

View File

@@ -16,7 +16,7 @@ use tokio::task;
use tracing::{error, info};
use tracing_utils::http::OtelName;
use crate::extension_server::{self, ExtensionType};
use crate::extension_server::{download_extension_sql_files, download_library_file};
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
@@ -127,6 +127,8 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
(&Method::POST, route) if route.starts_with("/extension_server/") => {
info!("serving {:?} POST request", route);
let is_library = false;
let filename = route.split('/').last().unwrap();
info!(
@@ -134,17 +136,47 @@ async fn routes(req: Request<Body>, compute: &Arc<ComputeNode>) -> Response<Body
filename
);
match extension_server::download_extension(
&compute.ext_remote_storage,
ExtensionType::Shared,
&compute.pgbin,
)
.await
{
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("download_extension failed: {}", e);
Response::new(Body::from(e.to_string()))
if compute.ext_remote_storage.is_none() {
error!("Remote extension storage is not set up");
let mut resp = Response::new(Body::from("Remote extension storage is not set up"));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
return resp;
}
let ext_storage = &compute.ext_remote_storage.unwrap();
if !is_library {
match download_extension_sql_files(
filename,
&compute.availiable_extensions,
&ext_storage,
&compute.pgbin,
)
.await
{
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("extension download failed: {}", e);
let mut resp = Response::new(Body::from(e.to_string()));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
resp
}
}
} else {
match download_library_file(
filename,
&compute.availiable_libraries,
&ext_storage,
&compute.pgbin,
)
.await
{
Ok(_) => Response::new(Body::from("OK")),
Err(e) => {
error!("library download failed: {}", e);
let mut resp = Response::new(Body::from(e.to_string()));
*resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
resp
}
}
}
}