Add Versioning + move Access to control plane for remote ext (#4760)

This commit is contained in:
Alek Westover
2023-07-20 12:55:07 -04:00
parent 285f687e1b
commit fcc57f49d1
30 changed files with 373 additions and 276 deletions

View File

@@ -30,11 +30,10 @@
//! -C 'postgresql://cloud_admin@localhost/postgres' \
//! -S /var/db/postgres/specs/current.json \
//! -b /usr/local/bin/postgres \
//! -r {"bucket": "my-bucket", "region": "eu-central-1", "endpoint": "http:://localhost:9000",
//! (optionally) "key": "AWS_SECRET_ACCESS_KEY", "id": "AWS_ACCESS_KEY_ID"}
//! -r {"bucket": "my-bucket", "region": "eu-central-1", "endpoint": "http:://localhost:9000"}
//! ```
//!
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::panic;
use std::path::Path;
@@ -52,7 +51,6 @@ use compute_api::responses::ComputeStatus;
use compute_tools::compute::{ComputeNode, ComputeState, ParsedSpec};
use compute_tools::configurator::launch_configurator;
use compute_tools::extension_server::launch_download_extensions;
use compute_tools::extension_server::{get_pg_version, init_remote_storage};
use compute_tools::http::api::launch_http_server;
use compute_tools::logger::*;
@@ -60,12 +58,14 @@ use compute_tools::monitor::launch_monitor;
use compute_tools::params::*;
use compute_tools::spec::*;
const BUILD_TAG_DEFAULT: &str = "local";
const BUILD_TAG_DEFAULT: &str = "111"; // TODO: change back to local; I need 111 for my test
fn main() -> Result<()> {
init_tracing_and_logging(DEFAULT_LOG_LEVEL)?;
let build_tag = option_env!("BUILD_TAG").unwrap_or(BUILD_TAG_DEFAULT);
let build_tag = option_env!("BUILD_TAG")
.unwrap_or(BUILD_TAG_DEFAULT)
.to_string();
info!("build_tag: {build_tag}");
let matches = cli().get_matches();
@@ -74,8 +74,7 @@ fn main() -> Result<()> {
let remote_ext_config = matches.get_one::<String>("remote-ext-config");
let ext_remote_storage = remote_ext_config.map(|x| {
init_remote_storage(x, build_tag)
.expect("cannot initialize remote extension storage from config")
init_remote_storage(x).expect("cannot initialize remote extension storage from config")
});
let http_port = *matches
@@ -195,7 +194,9 @@ fn main() -> Result<()> {
state: Mutex::new(new_state),
state_changed: Condvar::new(),
ext_remote_storage,
available_extensions: OnceLock::new(),
ext_remote_paths: OnceLock::new(),
already_downloaded_extensions: Mutex::new(HashSet::new()),
build_tag,
};
let compute = Arc::new(compute_node);
@@ -243,9 +244,6 @@ fn main() -> Result<()> {
let _configurator_handle =
launch_configurator(&compute).expect("cannot launch configurator thread");
let _download_extensions_handle =
launch_download_extensions(&compute).expect("cannot launch download extensions thread");
// Start Postgres
let mut delay_exit = false;
let mut exit_code = None;

View File

@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::fs;
use std::io::BufRead;
@@ -21,7 +22,7 @@ use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::{ComputeMode, ComputeSpec};
use utils::measured_stream::MeasuredReader;
use remote_storage::GenericRemoteStorage;
use remote_storage::{GenericRemoteStorage, RemotePath};
use crate::pg_helpers::*;
use crate::spec::*;
@@ -55,8 +56,10 @@ pub struct ComputeNode {
pub state_changed: Condvar,
/// the S3 bucket that we search for extensions in
pub ext_remote_storage: Option<GenericRemoteStorage>,
// cached lists of available extensions and libraries
pub available_extensions: OnceLock<HashSet<String>>,
// (key: extension name, value: path to extension archive in remote storage)
pub ext_remote_paths: OnceLock<HashMap<String, RemotePath>>,
pub already_downloaded_extensions: Mutex<HashSet<String>>,
pub build_tag: String,
}
#[derive(Clone, Debug)]
@@ -735,23 +738,23 @@ LIMIT 100",
// If remote extension storage is configured,
// download extension control files
#[tokio::main]
pub async fn prepare_external_extensions(&self, compute_state: &ComputeState) -> Result<()> {
if let Some(ref ext_remote_storage) = self.ext_remote_storage {
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let custom_ext_prefixes = spec.custom_extensions.clone().unwrap_or(Vec::new());
info!("custom_ext_prefixes: {:?}", &custom_ext_prefixes);
let available_extensions = extension_server::get_available_extensions(
let custom_ext = spec.custom_extensions.clone().unwrap_or(Vec::new());
info!("custom extensions: {:?}", &custom_ext);
let ext_remote_paths = extension_server::get_available_extensions(
ext_remote_storage,
&self.pgbin,
&self.pgversion,
&custom_ext_prefixes,
&custom_ext,
&self.build_tag,
)
.await?;
self.available_extensions
.set(available_extensions)
.expect("available_extensions.set error");
self.ext_remote_paths
.set(ext_remote_paths)
.expect("ext_remote_paths.set error");
}
Ok(())
}
@@ -760,11 +763,31 @@ LIMIT 100",
match &self.ext_remote_storage {
None => anyhow::bail!("No remote extension storage"),
Some(remote_storage) => {
// TODO: eliminate useless LOAD Library calls to this function (if possible)
// not clear that we can distinguish between useful and useless
// library calls better than the below code
let ext_name = ext_name.replace(".so", "");
{
let mut already_downloaded_extensions =
self.already_downloaded_extensions.lock().expect("bad lock");
if already_downloaded_extensions.contains(&ext_name) {
info!(
"extension {:?} already exists, skipping download",
&ext_name
);
return Ok(());
} else {
already_downloaded_extensions.insert(ext_name.clone());
}
}
extension_server::download_extension(
ext_name,
&ext_name,
&self
.ext_remote_paths
.get()
.expect("error accessing ext_remote_paths")[&ext_name],
remote_storage,
&self.pgbin,
&self.pgversion,
)
.await
}
@@ -809,6 +832,9 @@ LIMIT 100",
libs_vec.extend(preload_libs_vec);
}
info!("Download ext_index.json, find the extension paths");
self.prepare_external_extensions(compute_state).await?;
info!("Downloading to shared preload libraries: {:?}", &libs_vec);
let mut download_tasks = Vec::new();
for library in &libs_vec {

View File

@@ -1,35 +1,75 @@
// AT LEAST download_extension / get_available_extensions need large changes
// refactor custom_extensions to be clear that we have moved the access logic to control plane
// Download extension files from the extension store
// and put them in the right place in the postgres directory
/*
The layout of the S3 bucket is as follows:
├── 111
│   ├── v14
│   │   ├── extensions
│   │   │   ├── anon.tar.zst
│   │   │   └── embedding.tar.zst
│   │   └── ext_index.json
│   └── v15
│   ├── extensions
│   │   ├── anon.tar.zst
│   │   └── embedding.tar.zst
│   └── ext_index.json
├── 112
│   ├── v14
│   │   ├── extensions
│   │   │   └── anon.tar.zst
│   │   └── ext_index.json
│   └── v15
│   ├── extensions
│   │   └── anon.tar.zst
│   └── ext_index.json
└── 113
├── v14
│   ├── extensions
│   │   └── embedding.tar.zst
│   └── ext_index.json
└── v15
├── extensions
│   └── embedding.tar.zst
└── ext_index.json
v14/ext_index.json
-- this contains information necessary to create control files
v14/extensions/test_ext1.tar.gz
-- this contains the library files and sql files necessary to create this extension
v14/extensions/custom_ext1.tar.gz
Note that build number cannot be part of prefix because we might need extensions
from other build numbers.
The difference between a private and public extensions is determined by who can
load the extension this is specified in ext_index.json
ext_index.json stores the control files and location of extension archives
Speicially, ext_index.json has a list of public extensions, and a list of
extensions enabled for specific tenant-ids.
We do not duplicate extension.tar.zst files.
We only upload a new one if it is updated.
*access* is controlled by spec
More specifically, here is an example ext_index.json
{
"embedding": {
"control_file_content": "comment = 'hnsw index' \ndefault_version = '0.1.0' \nmodule_pathname = '$libdir/embedding' \nrelocatable = true \ntrusted = true",
"extension_archive": "111/v14/extensions/embedding.tar.zst"
},
"anon": {
"control_file_content": "# PostgreSQL Anonymizer (anon) extension \ncomment = 'Data anonymization tools' \ndefault_version = '1.1.0' \ndirectory='extension/anon' \nrelocatable = false \nrequires = 'pgcrypto' \nsuperuser = false \nmodule_pathname = '$libdir/anon' \ntrusted = true \n",
"extension_archive": "111/v14/extensions/anon.tar.zst"
}
}
*/
use crate::compute::ComputeNode;
use anyhow::Context;
use anyhow::{self, Result};
use flate2::read::GzDecoder;
use futures::future::join_all;
use remote_storage::*;
use serde_json::{self, Value};
use std::collections::HashSet;
use std::collections::HashMap;
use std::io::Read;
use std::num::{NonZeroU32, NonZeroUsize};
use std::path::Path;
use std::str;
use std::sync::Arc;
use std::thread;
use tar::Archive;
use tokio::io::AsyncReadExt;
use tracing::info;
use zstd::stream::read::Decoder;
fn get_pg_config(argument: &str, pgbin: &str) -> String {
// gives the result of `pg_config [argument]`
@@ -61,133 +101,111 @@ pub fn get_pg_version(pgbin: &str) -> String {
panic!("Unsuported postgres version {human_version}");
}
// download extension control files
// if custom_ext_prefixes is provided - search also in custom extension paths
// download control files for enabled_extensions
// return the paths in s3 to the archives containing the actual extension files
// for use in creating the extension
pub async fn get_available_extensions(
remote_storage: &GenericRemoteStorage,
pgbin: &str,
pg_version: &str,
custom_ext_prefixes: &[String],
) -> Result<HashSet<String>> {
enabled_extensions: &[String],
build_tag: &str,
) -> Result<HashMap<String, RemotePath>> {
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let index_path = pg_version.to_owned() + "/ext_index.json";
let index_path = format!("{build_tag}/{pg_version}/ext_index.json");
let index_path = RemotePath::new(Path::new(&index_path)).context("error forming path")?;
info!("download ext_index.json: {:?}", &index_path);
info!("download ext_index.json from: {:?}", &index_path);
// TODO: potential optimization: cache ext_index.json
let mut download = remote_storage.download(&index_path).await?;
let mut write_data_buffer = Vec::new();
let mut ext_idx_buffer = Vec::new();
download
.download_stream
.read_to_end(&mut write_data_buffer)
.read_to_end(&mut ext_idx_buffer)
.await?;
let ext_index_str = match str::from_utf8(&write_data_buffer) {
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
let ext_index_str = str::from_utf8(&ext_idx_buffer).expect("error parsing json");
let ext_index_full: Value = serde_json::from_str(ext_index_str)?;
let ext_index_full = ext_index_full.as_object().context("error parsing json")?;
let control_data = ext_index_full["control_data"]
.as_object()
.context("json parse error")?;
let enabled_extensions = ext_index_full["enabled_extensions"]
.as_object()
.context("json parse error")?;
info!("{:?}", control_data.clone());
info!("{:?}", enabled_extensions.clone());
info!("ext_index: {:?}", &ext_index_full);
let mut prefixes = vec!["public".to_string()];
prefixes.extend(custom_ext_prefixes.to_owned());
info!("{:?}", &prefixes);
let mut all_extensions = HashSet::new();
for prefix in prefixes {
let prefix_extensions = match enabled_extensions.get(&prefix) {
Some(Value::Array(ext_name)) => ext_name,
_ => {
info!("prefix {} has no extensions", prefix);
continue;
}
};
info!("{:?}", prefix_extensions);
for ext_name in prefix_extensions {
all_extensions.insert(ext_name.as_str().context("json parse error")?.to_string());
}
info!("enabled_extensions: {:?}", enabled_extensions);
let mut ext_remote_paths = HashMap::new();
let mut file_create_tasks = Vec::new();
for extension in enabled_extensions {
let ext_data = ext_index_full[extension]
.as_object()
.context("error parsing json")?;
let control_contents = ext_data["control_file_content"]
.as_str()
.context("error parsing json")?;
let control_path = local_sharedir.join(extension.to_owned() + ".control");
info!("writing file {:?}{:?}", control_path, control_contents);
file_create_tasks.push(tokio::fs::write(control_path, control_contents));
let ext_archive_path = ext_data["extension_archive"]
.as_str()
.context("error parsing json")?;
ext_remote_paths.insert(
extension.to_string(),
RemotePath::from_string(ext_archive_path)?,
);
}
for prefix in &all_extensions {
let control_contents = control_data[prefix].as_str().context("json parse error")?;
let control_path = local_sharedir.join(prefix.to_owned() + ".control");
info!("WRITING FILE {:?}{:?}", control_path, control_contents);
std::fs::write(control_path, control_contents)?;
let results = join_all(file_create_tasks).await;
for result in results {
result?;
}
Ok(all_extensions.into_iter().collect())
Ok(ext_remote_paths)
}
// download all sqlfiles (and possibly data files) for a given extension name
pub async fn download_extension(
ext_name: &str,
ext_path: &RemotePath,
remote_storage: &GenericRemoteStorage,
pgbin: &str,
pg_version: &str,
) -> Result<()> {
// TODO: potential optimization: only download the extension if it doesn't exist
// problem: how would we tell if it exists?
let ext_name = ext_name.replace(".so", "");
let ext_name_targz = ext_name.to_owned() + ".tar.gz";
if Path::new(&ext_name_targz).exists() {
info!("extension {:?} already exists", ext_name_targz);
return Ok(());
}
let ext_path = RemotePath::new(
&Path::new(pg_version)
.join("extensions")
.join(ext_name_targz.clone()),
)?;
info!(
"Start downloading extension {:?} from {:?}",
ext_name, ext_path
);
let mut download = remote_storage.download(&ext_path).await?;
let mut write_data_buffer = Vec::new();
info!("Download extension {:?} from {:?}", ext_name, ext_path);
let mut download = remote_storage.download(ext_path).await?;
let mut download_buffer = Vec::new();
download
.download_stream
.read_to_end(&mut write_data_buffer)
.read_to_end(&mut download_buffer)
.await?;
let unzip_dest = pgbin.strip_suffix("/bin/postgres").expect("bad pgbin");
let tar = GzDecoder::new(write_data_buffer.as_slice());
let mut archive = Archive::new(tar);
archive.unpack(unzip_dest)?;
let mut decoder = Decoder::new(download_buffer.as_slice())?;
let mut decompress_buffer = Vec::new();
decoder.read_to_end(&mut decompress_buffer)?;
let mut archive = Archive::new(decompress_buffer.as_slice());
let unzip_dest = pgbin
.strip_suffix("/bin/postgres")
.expect("bad pgbin")
.to_string()
+ "/download_extensions";
archive.unpack(&unzip_dest)?;
info!("Download + unzip {:?} completed successfully", &ext_path);
let local_sharedir = Path::new(&get_pg_config("--sharedir", pgbin)).join("extension");
let zip_sharedir = format!("{unzip_dest}/extensions/{ext_name}/share/extension");
info!("mv {zip_sharedir:?}/* {local_sharedir:?}");
for file in std::fs::read_dir(zip_sharedir)? {
let old_file = file?.path();
let new_file =
Path::new(&local_sharedir).join(old_file.file_name().context("error parsing file")?);
std::fs::rename(old_file, new_file)?;
}
let local_libdir = Path::new(&get_pg_config("--libdir", pgbin)).join("postgresql");
let zip_libdir = format!("{unzip_dest}/extensions/{ext_name}/lib");
info!("mv {zip_libdir:?}/* {local_libdir:?}");
for file in std::fs::read_dir(zip_libdir)? {
let old_file = file?.path();
let new_file =
Path::new(&local_libdir).join(old_file.file_name().context("error parsing file")?);
std::fs::rename(old_file, new_file)?;
let sharedir_paths = (
format!("{unzip_dest}/{ext_name}/share/extension"),
Path::new(&get_pg_config("--sharedir", pgbin)).join("extension"),
);
let libdir_paths = (
format!("{unzip_dest}/{ext_name}/lib"),
Path::new(&get_pg_config("--libdir", pgbin)).join("postgresql"),
);
// move contents of the libdir / sharedir in unzipped archive to the correct local paths
for paths in [sharedir_paths, libdir_paths] {
let (zip_dir, real_dir) = paths;
info!("mv {zip_dir:?}/* {real_dir:?}");
for file in std::fs::read_dir(zip_dir)? {
let old_file = file?.path();
let new_file =
Path::new(&real_dir).join(old_file.file_name().context("error parsing file")?);
std::fs::rename(old_file, new_file)?;
}
}
Ok(())
}
// This function initializes the necessary structs to use remote storage (should be fairly cheap)
pub fn init_remote_storage(
remote_ext_config: &str,
default_prefix: &str,
) -> anyhow::Result<GenericRemoteStorage> {
pub fn init_remote_storage(remote_ext_config: &str) -> anyhow::Result<GenericRemoteStorage> {
let remote_ext_config: serde_json::Value = serde_json::from_str(remote_ext_config)?;
let remote_ext_bucket = remote_ext_config["bucket"]
@@ -199,19 +217,9 @@ pub fn init_remote_storage(
let remote_ext_endpoint = remote_ext_config["endpoint"].as_str();
let remote_ext_prefix = remote_ext_config["prefix"]
.as_str()
.unwrap_or(default_prefix)
.unwrap_or_default()
.to_string();
// control plane passes the aws creds via CLI ARGS to compute_ctl
let aws_key = remote_ext_config["key"].as_str();
let aws_id = remote_ext_config["id"].as_str();
if let Some(aws_key) = aws_key {
if let Some(aws_id) = aws_id {
std::env::set_var("AWS_SECRET_ACCESS_KEY", aws_key);
std::env::set_var("AWS_ACCESS_KEY_ID", aws_id);
}
}
// If needed, it is easy to allow modification of other parameters
// however, default values should be fine for now
let config = S3Config {
@@ -229,19 +237,3 @@ pub fn init_remote_storage(
};
GenericRemoteStorage::from_config(&config)
}
pub fn launch_download_extensions(
compute: &Arc<ComputeNode>,
) -> Result<thread::JoinHandle<()>, std::io::Error> {
let compute = Arc::clone(compute);
thread::Builder::new()
.name("download-extensions".into())
.spawn(move || {
info!("start download_extension_files");
let compute_state = compute.state.lock().expect("error unlocking compute.state");
compute
.prepare_external_extensions(&compute_state)
.expect("error preparing extensions");
info!("download_extension_files done, exiting thread");
})
}