mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-21 20:32:56 +00:00
Compare commits
18 Commits
extension_
...
skip-sync
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f6798d503a | ||
|
|
d748615c1f | ||
|
|
681c6910c2 | ||
|
|
148f0f9b21 | ||
|
|
a7f3f5f356 | ||
|
|
00d1cfa503 | ||
|
|
1faf69a698 | ||
|
|
44a441080d | ||
|
|
c215389f1c | ||
|
|
b1477b4448 | ||
|
|
a500bb06fb | ||
|
|
15456625c2 | ||
|
|
a3f0dd2d30 | ||
|
|
76718472be | ||
|
|
c07b6ffbdc | ||
|
|
6c3605fc24 | ||
|
|
d96d51a3b7 | ||
|
|
a010b2108a |
9
.github/workflows/benchmarking.yml
vendored
9
.github/workflows/benchmarking.yml
vendored
@@ -180,7 +180,8 @@ jobs:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
|
||||
timeout-minutes: 360 # 6h
|
||||
# Increase timeout to 8h, default timeout is 6h
|
||||
timeout-minutes: 480
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
@@ -321,8 +322,6 @@ jobs:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
|
||||
timeout-minutes: 360 # 6h
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
@@ -414,8 +413,6 @@ jobs:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
|
||||
timeout-minutes: 360 # 6h
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
@@ -501,8 +498,6 @@ jobs:
|
||||
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
|
||||
options: --init
|
||||
|
||||
timeout-minutes: 360 # 6h
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
|
||||
6
.github/workflows/build_and_test.yml
vendored
6
.github/workflows/build_and_test.yml
vendored
@@ -738,7 +738,7 @@ jobs:
|
||||
run:
|
||||
shell: sh -eu {0}
|
||||
env:
|
||||
VM_BUILDER_VERSION: v0.8.0
|
||||
VM_BUILDER_VERSION: v0.11.0
|
||||
|
||||
steps:
|
||||
- name: Checkout
|
||||
@@ -916,7 +916,7 @@ jobs:
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- name: Create tag "release-${{ needs.tag.outputs.build-tag }}"
|
||||
- name: Create git tag
|
||||
if: github.ref_name == 'release'
|
||||
uses: actions/github-script@v6
|
||||
with:
|
||||
@@ -926,7 +926,7 @@ jobs:
|
||||
github.rest.git.createRef({
|
||||
owner: context.repo.owner,
|
||||
repo: context.repo.repo,
|
||||
ref: "refs/tags/release-${{ needs.tag.outputs.build-tag }}",
|
||||
ref: "refs/tags/${{ needs.tag.outputs.build-tag }}",
|
||||
sha: context.sha,
|
||||
})
|
||||
|
||||
|
||||
@@ -481,6 +481,40 @@ RUN wget https://github.com/rdkit/rdkit/archive/refs/tags/Release_2023_03_1.tar.
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/rdkit.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-uuidv7-pg-build"
|
||||
# compile pg_uuidv7 extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-uuidv7-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://github.com/fboulnois/pg_uuidv7/archive/refs/tags/v1.0.1.tar.gz -O pg_uuidv7.tar.gz && \
|
||||
echo "0d0759ab01b7fb23851ecffb0bce27822e1868a4a5819bfd276101c716637a7a pg_uuidv7.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_uuidv7-src && cd pg_uuidv7-src && tar xvzf ../pg_uuidv7.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_uuidv7.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "pg-roaringbitmap-pg-build"
|
||||
# compile pg_roaringbitmap extension
|
||||
#
|
||||
#########################################################################################
|
||||
FROM build-deps AS pg-roaringbitmap-pg-build
|
||||
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
|
||||
ENV PATH "/usr/local/pgsql/bin/:$PATH"
|
||||
RUN wget https://github.com/ChenHuajun/pg_roaringbitmap/archive/refs/tags/v0.5.4.tar.gz -O pg_roaringbitmap.tar.gz && \
|
||||
echo "b75201efcb1c2d1b014ec4ae6a22769cc7a224e6e406a587f5784a37b6b5a2aa pg_roaringbitmap.tar.gz" | sha256sum --check && \
|
||||
mkdir pg_roaringbitmap-src && cd pg_roaringbitmap-src && tar xvzf ../pg_roaringbitmap.tar.gz --strip-components=1 -C . && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) && \
|
||||
make -j $(getconf _NPROCESSORS_ONLN) install && \
|
||||
echo 'trusted = true' >> /usr/local/pgsql/share/extension/roaringbitmap.control
|
||||
|
||||
#########################################################################################
|
||||
#
|
||||
# Layer "rust extensions"
|
||||
@@ -614,6 +648,8 @@ COPY --from=kq-imcx-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-cron-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-pgx-ulid-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY --from=pg-roaringbitmap-pg-build /usr/local/pgsql/ /usr/local/pgsql/
|
||||
COPY pgxn/ pgxn/
|
||||
|
||||
RUN make -j $(getconf _NPROCESSORS_ONLN) \
|
||||
|
||||
@@ -133,6 +133,84 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
|
||||
}
|
||||
}
|
||||
|
||||
/// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
|
||||
/// that we give to customers
|
||||
fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
let roles = spec
|
||||
.cluster
|
||||
.roles
|
||||
.iter()
|
||||
.map(|r| format!("'{}'", escape_literal(&r.name)))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let dbs = spec
|
||||
.cluster
|
||||
.databases
|
||||
.iter()
|
||||
.map(|db| format!("'{}'", escape_literal(&db.name)))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let roles_decl = if roles.is_empty() {
|
||||
String::from("roles text[] := NULL;")
|
||||
} else {
|
||||
format!(
|
||||
r#"
|
||||
roles text[] := ARRAY(SELECT rolname
|
||||
FROM pg_catalog.pg_roles
|
||||
WHERE rolname IN ({}));"#,
|
||||
roles.join(", ")
|
||||
)
|
||||
};
|
||||
|
||||
let database_decl = if dbs.is_empty() {
|
||||
String::from("dbs text[] := NULL;")
|
||||
} else {
|
||||
format!(
|
||||
r#"
|
||||
dbs text[] := ARRAY(SELECT datname
|
||||
FROM pg_catalog.pg_database
|
||||
WHERE datname IN ({}));"#,
|
||||
dbs.join(", ")
|
||||
)
|
||||
};
|
||||
|
||||
// ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases
|
||||
// (see https://www.postgresql.org/docs/current/ddl-priv.html)
|
||||
let query = format!(
|
||||
r#"
|
||||
DO $$
|
||||
DECLARE
|
||||
r text;
|
||||
{}
|
||||
{}
|
||||
BEGIN
|
||||
IF NOT EXISTS (
|
||||
SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
|
||||
THEN
|
||||
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN IN ROLE pg_read_all_data, pg_write_all_data;
|
||||
IF array_length(roles, 1) IS NOT NULL THEN
|
||||
EXECUTE format('GRANT neon_superuser TO %s',
|
||||
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
|
||||
FOREACH r IN ARRAY roles LOOP
|
||||
EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r));
|
||||
END LOOP;
|
||||
END IF;
|
||||
IF array_length(dbs, 1) IS NOT NULL THEN
|
||||
EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser',
|
||||
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', '));
|
||||
END IF;
|
||||
END IF;
|
||||
END
|
||||
$$;"#,
|
||||
roles_decl, database_decl,
|
||||
);
|
||||
info!("Neon superuser created:\n{}", &query);
|
||||
client
|
||||
.simple_query(&query)
|
||||
.map_err(|e| anyhow::anyhow!(e).context(query))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
impl ComputeNode {
|
||||
pub fn set_status(&self, status: ComputeStatus) {
|
||||
let mut state = self.state.lock().unwrap();
|
||||
@@ -260,9 +338,13 @@ impl ComputeNode {
|
||||
let lsn = match spec.mode {
|
||||
ComputeMode::Primary => {
|
||||
info!("starting safekeepers syncing");
|
||||
let lsn = self
|
||||
.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
.with_context(|| "failed to sync safekeepers")?;
|
||||
let lsn = if let Some(synced_lsn) = spec.skip_sync_safekeepers {
|
||||
info!("no need to sync");
|
||||
synced_lsn
|
||||
} else {
|
||||
self.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
.with_context(|| "failed to sync safekeepers")?
|
||||
};
|
||||
info!("safekeepers synced at LSN {}", lsn);
|
||||
lsn
|
||||
}
|
||||
@@ -347,6 +429,8 @@ impl ComputeNode {
|
||||
.map_err(|_| anyhow::anyhow!("invalid connstr"))?;
|
||||
|
||||
let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
|
||||
// Disable forwarding so that users don't get a cloud_admin role
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
|
||||
client.simple_query("GRANT zenith_admin TO cloud_admin")?;
|
||||
drop(client);
|
||||
@@ -357,14 +441,16 @@ impl ComputeNode {
|
||||
Ok(client) => client,
|
||||
};
|
||||
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
// Disable DDL forwarding because control plane already knows about these roles/databases.
|
||||
client.simple_query("SET neon.forward_ddl = false")?;
|
||||
|
||||
// Proceed with post-startup configuration. Note, that order of operations is important.
|
||||
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
|
||||
create_neon_superuser(spec, &mut client)?;
|
||||
handle_roles(spec, &mut client)?;
|
||||
handle_databases(spec, &mut client)?;
|
||||
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(spec, self.connstr.as_str())?;
|
||||
handle_extensions(spec, &mut client)?;
|
||||
|
||||
// 'Close' connection
|
||||
@@ -402,7 +488,7 @@ impl ComputeNode {
|
||||
handle_roles(&spec, &mut client)?;
|
||||
handle_databases(&spec, &mut client)?;
|
||||
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
|
||||
handle_grants(&spec, self.connstr.as_str())?;
|
||||
handle_extensions(&spec, &mut client)?;
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
|
||||
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
|
||||
|
||||
/// Escape a string for including it in a SQL literal
|
||||
fn escape_literal(s: &str) -> String {
|
||||
pub fn escape_literal(s: &str) -> String {
|
||||
s.replace('\'', "''").replace('\\', "\\\\")
|
||||
}
|
||||
|
||||
|
||||
@@ -269,17 +269,13 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
xact.execute(query.as_str(), &[])?;
|
||||
}
|
||||
RoleAction::Create => {
|
||||
let mut query: String = format!("CREATE ROLE {} ", name.pg_quote());
|
||||
let mut query: String = format!(
|
||||
"CREATE ROLE {} CREATEROLE CREATEDB IN ROLE neon_superuser",
|
||||
name.pg_quote()
|
||||
);
|
||||
info!("role create query: '{}'", &query);
|
||||
query.push_str(&role.to_pg_options());
|
||||
xact.execute(query.as_str(), &[])?;
|
||||
|
||||
let grant_query = format!(
|
||||
"GRANT pg_read_all_data, pg_write_all_data TO {}",
|
||||
name.pg_quote()
|
||||
);
|
||||
xact.execute(grant_query.as_str(), &[])?;
|
||||
info!("role grant query: '{}'", &grant_query);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -476,6 +472,11 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
query.push_str(&db.to_pg_options());
|
||||
let _guard = info_span!("executing", query).entered();
|
||||
client.execute(query.as_str(), &[])?;
|
||||
let grant_query: String = format!(
|
||||
"GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
|
||||
name.pg_quote()
|
||||
);
|
||||
client.execute(grant_query.as_str(), &[])?;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -495,35 +496,9 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
|
||||
/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
|
||||
/// to allow users creating trusted extensions and re-creating `public` schema, for example.
|
||||
#[instrument(skip_all)]
|
||||
pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
|
||||
pub fn handle_grants(spec: &ComputeSpec, connstr: &str) -> Result<()> {
|
||||
info!("cluster spec grants:");
|
||||
|
||||
// We now have a separate `web_access` role to connect to the database
|
||||
// via the web interface and proxy link auth. And also we grant a
|
||||
// read / write all data privilege to every role. So also grant
|
||||
// create to everyone.
|
||||
// XXX: later we should stop messing with Postgres ACL in such horrible
|
||||
// ways.
|
||||
let roles = spec
|
||||
.cluster
|
||||
.roles
|
||||
.iter()
|
||||
.map(|r| r.name.pg_quote())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for db in &spec.cluster.databases {
|
||||
let dbname = &db.name;
|
||||
|
||||
let query: String = format!(
|
||||
"GRANT CREATE ON DATABASE {} TO {}",
|
||||
dbname.pg_quote(),
|
||||
roles.join(", ")
|
||||
);
|
||||
info!("grant query {}", &query);
|
||||
|
||||
client.execute(query.as_str(), &[])?;
|
||||
}
|
||||
|
||||
// Do some per-database access adjustments. We'd better do this at db creation time,
|
||||
// but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
|
||||
// atomically.
|
||||
|
||||
@@ -68,6 +68,7 @@ pub struct EndpointConf {
|
||||
http_port: u16,
|
||||
pg_version: u32,
|
||||
skip_pg_catalog_updates: bool,
|
||||
skip_sync_safekeepers: Option<utils::lsn::Lsn>,
|
||||
}
|
||||
|
||||
//
|
||||
@@ -137,6 +138,7 @@ impl ComputeControlPlane {
|
||||
tenant_id,
|
||||
pg_version,
|
||||
skip_pg_catalog_updates: false,
|
||||
skip_sync_safekeepers: None,
|
||||
});
|
||||
|
||||
ep.create_endpoint_dir()?;
|
||||
@@ -151,6 +153,7 @@ impl ComputeControlPlane {
|
||||
pg_port,
|
||||
pg_version,
|
||||
skip_pg_catalog_updates: false,
|
||||
skip_sync_safekeepers: None,
|
||||
})?,
|
||||
)?;
|
||||
std::fs::write(
|
||||
@@ -189,6 +192,7 @@ pub struct Endpoint {
|
||||
|
||||
// Optimizations
|
||||
skip_pg_catalog_updates: bool,
|
||||
skip_sync_safekeepers: Option<utils::lsn::Lsn>,
|
||||
}
|
||||
|
||||
impl Endpoint {
|
||||
@@ -223,6 +227,7 @@ impl Endpoint {
|
||||
tenant_id: conf.tenant_id,
|
||||
pg_version: conf.pg_version,
|
||||
skip_pg_catalog_updates: conf.skip_pg_catalog_updates,
|
||||
skip_sync_safekeepers: conf.skip_sync_safekeepers,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -457,6 +462,7 @@ impl Endpoint {
|
||||
|
||||
// Create spec file
|
||||
let spec = ComputeSpec {
|
||||
skip_sync_safekeepers: self.skip_sync_safekeepers,
|
||||
skip_pg_catalog_updates: self.skip_pg_catalog_updates,
|
||||
format_version: 1.0,
|
||||
operation_uuid: None,
|
||||
|
||||
@@ -33,6 +33,15 @@ pub struct ComputeSpec {
|
||||
#[serde(default)] // Default false
|
||||
pub skip_pg_catalog_updates: bool,
|
||||
|
||||
/// An optinal hint that can be passed to speed up startup time if we know
|
||||
/// that safekeepers have already been synced at the given LSN.
|
||||
///
|
||||
/// NOTE: If there's any possibility that the safekeepers could have advanced
|
||||
/// (e.g. if we started compute, and it crashed) we should stay on the
|
||||
/// safe side and provide None.
|
||||
#[serde(default)]
|
||||
pub skip_sync_safekeepers: Option<Lsn>,
|
||||
|
||||
// Information needed to connect to the storage layer.
|
||||
//
|
||||
// `tenant_id`, `timeline_id` and `pageserver_connstring` are always needed.
|
||||
|
||||
@@ -23,6 +23,7 @@ use prometheus::{Registry, Result};
|
||||
pub mod launch_timestamp;
|
||||
mod wrappers;
|
||||
pub use wrappers::{CountedReader, CountedWriter};
|
||||
pub mod metric_vec_duration;
|
||||
|
||||
pub type UIntGauge = GenericGauge<AtomicU64>;
|
||||
pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;
|
||||
|
||||
23
libs/metrics/src/metric_vec_duration.rs
Normal file
23
libs/metrics/src/metric_vec_duration.rs
Normal file
@@ -0,0 +1,23 @@
|
||||
//! Helpers for observing duration on HistogramVec / CounterVec / GaugeVec / MetricVec<T>.
|
||||
|
||||
use std::{future::Future, time::Instant};
|
||||
|
||||
pub trait DurationResultObserver {
|
||||
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration);
|
||||
}
|
||||
|
||||
pub async fn observe_async_block_duration_by_result<
|
||||
T,
|
||||
E,
|
||||
F: Future<Output = Result<T, E>>,
|
||||
O: DurationResultObserver,
|
||||
>(
|
||||
observer: &O,
|
||||
block: F,
|
||||
) -> Result<T, E> {
|
||||
let start = Instant::now();
|
||||
let result = block.await;
|
||||
let duration = start.elapsed();
|
||||
observer.observe_result(&result, duration);
|
||||
result
|
||||
}
|
||||
@@ -173,10 +173,15 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
|
||||
let path2 = RemotePath::new(&PathBuf::from(format!("{}/path2", ctx.base_prefix,)))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let path3 = RemotePath::new(&PathBuf::from(format!("{}/path3", ctx.base_prefix,)))
|
||||
.with_context(|| "RemotePath conversion")?;
|
||||
|
||||
let data1 = "remote blob data1".as_bytes();
|
||||
let data1_len = data1.len();
|
||||
let data2 = "remote blob data2".as_bytes();
|
||||
let data2_len = data2.len();
|
||||
let data3 = "remote blob data3".as_bytes();
|
||||
let data3_len = data3.len();
|
||||
ctx.client
|
||||
.upload(std::io::Cursor::new(data1), data1_len, &path1, None)
|
||||
.await?;
|
||||
@@ -185,8 +190,18 @@ async fn s3_delete_objects_works(ctx: &mut MaybeEnabledS3) -> anyhow::Result<()>
|
||||
.upload(std::io::Cursor::new(data2), data2_len, &path2, None)
|
||||
.await?;
|
||||
|
||||
ctx.client
|
||||
.upload(std::io::Cursor::new(data3), data3_len, &path3, None)
|
||||
.await?;
|
||||
|
||||
ctx.client.delete_objects(&[path1, path2]).await?;
|
||||
|
||||
let prefixes = ctx.client.list_prefixes(None).await?;
|
||||
|
||||
assert_eq!(prefixes.len(), 1);
|
||||
|
||||
ctx.client.delete_objects(&[path3]).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -495,50 +495,50 @@ fn start_pageserver(
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
|
||||
let background_jobs_barrier = background_jobs_barrier;
|
||||
let metrics_ctx = RequestContext::todo_child(
|
||||
TaskKind::MetricsCollection,
|
||||
// This task itself shouldn't download anything.
|
||||
// The actual size calculation does need downloads, and
|
||||
// creates a child context with the right DownloadBehavior.
|
||||
DownloadBehavior::Error,
|
||||
);
|
||||
task_mgr::spawn(
|
||||
MGMT_REQUEST_RUNTIME.handle(),
|
||||
TaskKind::MetricsCollection,
|
||||
None,
|
||||
None,
|
||||
"consumption metrics collection",
|
||||
true,
|
||||
async move {
|
||||
// first wait until background jobs are cleared to launch.
|
||||
//
|
||||
// this is because we only process active tenants and timelines, and the
|
||||
// Timeline::get_current_logical_size will spawn the logical size calculation,
|
||||
// which will not be rate-limited.
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
|
||||
let background_jobs_barrier = background_jobs_barrier;
|
||||
let metrics_ctx = RequestContext::todo_child(
|
||||
TaskKind::MetricsCollection,
|
||||
// This task itself shouldn't download anything.
|
||||
// The actual size calculation does need downloads, and
|
||||
// creates a child context with the right DownloadBehavior.
|
||||
DownloadBehavior::Error,
|
||||
);
|
||||
task_mgr::spawn(
|
||||
crate::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::MetricsCollection,
|
||||
None,
|
||||
None,
|
||||
"consumption metrics collection",
|
||||
true,
|
||||
async move {
|
||||
// first wait until background jobs are cleared to launch.
|
||||
//
|
||||
// this is because we only process active tenants and timelines, and the
|
||||
// Timeline::get_current_logical_size will spawn the logical size calculation,
|
||||
// which will not be rate-limited.
|
||||
let cancel = task_mgr::shutdown_token();
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()); },
|
||||
_ = background_jobs_barrier.wait() => {}
|
||||
};
|
||||
tokio::select! {
|
||||
_ = cancel.cancelled() => { return Ok(()); },
|
||||
_ = background_jobs_barrier.wait() => {}
|
||||
};
|
||||
|
||||
pageserver::consumption_metrics::collect_metrics(
|
||||
metric_collection_endpoint,
|
||||
conf.metric_collection_interval,
|
||||
conf.cached_metric_collection_interval,
|
||||
conf.synthetic_size_calculation_interval,
|
||||
conf.id,
|
||||
metrics_ctx,
|
||||
)
|
||||
.instrument(info_span!("metrics_collection"))
|
||||
.await?;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
}
|
||||
pageserver::consumption_metrics::collect_metrics(
|
||||
metric_collection_endpoint,
|
||||
conf.metric_collection_interval,
|
||||
conf.cached_metric_collection_interval,
|
||||
conf.synthetic_size_calculation_interval,
|
||||
conf.id,
|
||||
metrics_ctx,
|
||||
)
|
||||
.instrument(info_span!("metrics_collection"))
|
||||
.await?;
|
||||
Ok(())
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Spawn a task to listen for libpq connections. It will spawn further tasks
|
||||
|
||||
@@ -96,12 +96,12 @@ pub mod defaults {
|
||||
|
||||
#background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
|
||||
|
||||
# [tenant_config]
|
||||
[tenant_config]
|
||||
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
|
||||
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
|
||||
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
|
||||
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
|
||||
#compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}'
|
||||
#compaction_threshold = {DEFAULT_COMPACTION_THRESHOLD}
|
||||
|
||||
#gc_period = '{DEFAULT_GC_PERIOD}'
|
||||
#gc_horizon = {DEFAULT_GC_HORIZON}
|
||||
@@ -111,7 +111,8 @@ pub mod defaults {
|
||||
#min_resident_size_override = .. # in bytes
|
||||
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
|
||||
#gc_feedback = false
|
||||
# [remote_storage]
|
||||
|
||||
[remote_storage]
|
||||
|
||||
"###
|
||||
);
|
||||
|
||||
@@ -186,10 +186,8 @@ paths:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Error"
|
||||
delete:
|
||||
description: "Attempts to delete specified timeline. On 500 errors should be retried"
|
||||
description: "Attempts to delete specified timeline. 500 and 409 errors should be retried"
|
||||
responses:
|
||||
"200":
|
||||
description: Ok
|
||||
"400":
|
||||
description: Error when no tenant id found in path or no timeline id
|
||||
content:
|
||||
@@ -214,6 +212,12 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/NotFoundError"
|
||||
"409":
|
||||
description: Deletion is already in progress, continue polling
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/ConflictError"
|
||||
"412":
|
||||
description: Tenant is missing, or timeline has children
|
||||
content:
|
||||
|
||||
@@ -187,6 +187,7 @@ impl From<crate::tenant::DeleteTimelineError> for ApiError {
|
||||
format!("Cannot delete timeline which has child timelines: {children:?}")
|
||||
.into_boxed_str(),
|
||||
),
|
||||
a @ AlreadyInProgress => ApiError::Conflict(a.to_string()),
|
||||
Other(e) => ApiError::InternalServerError(e),
|
||||
}
|
||||
}
|
||||
@@ -1128,8 +1129,6 @@ async fn disk_usage_eviction_run(
|
||||
freed_bytes: 0,
|
||||
};
|
||||
|
||||
use crate::task_mgr::MGMT_REQUEST_RUNTIME;
|
||||
|
||||
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||
|
||||
let state = get_state(&r);
|
||||
@@ -1147,7 +1146,7 @@ async fn disk_usage_eviction_run(
|
||||
let _g = cancel.drop_guard();
|
||||
|
||||
crate::task_mgr::spawn(
|
||||
MGMT_REQUEST_RUNTIME.handle(),
|
||||
crate::task_mgr::BACKGROUND_RUNTIME.handle(),
|
||||
TaskKind::DiskUsageEviction,
|
||||
None,
|
||||
None,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use metrics::metric_vec_duration::DurationResultObserver;
|
||||
use metrics::{
|
||||
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
|
||||
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
|
||||
@@ -424,6 +425,27 @@ pub static SMGR_QUERY_TIME: Lazy<HistogramVec> = Lazy::new(|| {
|
||||
.expect("failed to define a metric")
|
||||
});
|
||||
|
||||
pub struct BasebackupQueryTime(HistogramVec);
|
||||
pub static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|| {
|
||||
BasebackupQueryTime({
|
||||
register_histogram_vec!(
|
||||
"pageserver_basebackup_query_seconds",
|
||||
"Histogram of basebackup queries durations, by result type",
|
||||
&["result"],
|
||||
CRITICAL_OP_BUCKETS.into(),
|
||||
)
|
||||
.expect("failed to define a metric")
|
||||
})
|
||||
});
|
||||
|
||||
impl DurationResultObserver for BasebackupQueryTime {
|
||||
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration) {
|
||||
let label_value = if res.is_ok() { "ok" } else { "error" };
|
||||
let metric = self.0.get_metric_with_label_values(&[label_value]).unwrap();
|
||||
metric.observe(duration.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
pub static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
|
||||
register_int_gauge_vec!(
|
||||
"pageserver_live_connections",
|
||||
@@ -823,11 +845,6 @@ impl TimelineMetrics {
|
||||
let evictions_with_low_residence_duration =
|
||||
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
|
||||
|
||||
// TODO(chi): remove this once we remove Lazy for all metrics. Otherwise this will not appear in the exporter
|
||||
// and integration test will error.
|
||||
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get();
|
||||
MATERIALIZED_PAGE_CACHE_HIT.get();
|
||||
|
||||
TimelineMetrics {
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
@@ -1302,4 +1319,8 @@ pub fn preinitialize_metrics() {
|
||||
|
||||
// Same as above for this metric, but, it's a Vec-type metric for which we don't know all the labels.
|
||||
BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT.reset();
|
||||
|
||||
// Python tests need these.
|
||||
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get();
|
||||
MATERIALIZED_PAGE_CACHE_HIT.get();
|
||||
}
|
||||
|
||||
@@ -913,10 +913,24 @@ where
|
||||
None
|
||||
};
|
||||
|
||||
// Check that the timeline exists
|
||||
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false, ctx)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
metrics::metric_vec_duration::observe_async_block_duration_by_result(
|
||||
&*crate::metrics::BASEBACKUP_QUERY_TIME,
|
||||
async move {
|
||||
self.handle_basebackup_request(
|
||||
pgb,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
None,
|
||||
false,
|
||||
ctx,
|
||||
)
|
||||
.await?;
|
||||
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
|
||||
anyhow::Ok(())
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
// return pair of prev_lsn and last_lsn
|
||||
else if query_string.starts_with("get_last_record_rlsn ") {
|
||||
|
||||
@@ -440,8 +440,13 @@ pub enum GetTimelineError {
|
||||
pub enum DeleteTimelineError {
|
||||
#[error("NotFound")]
|
||||
NotFound,
|
||||
|
||||
#[error("HasChildren")]
|
||||
HasChildren(Vec<TimelineId>),
|
||||
|
||||
#[error("Timeline deletion is already in progress")]
|
||||
AlreadyInProgress,
|
||||
|
||||
#[error(transparent)]
|
||||
Other(#[from] anyhow::Error),
|
||||
}
|
||||
@@ -1755,14 +1760,11 @@ impl Tenant {
|
||||
timeline = Arc::clone(timeline_entry.get());
|
||||
|
||||
// Prevent two tasks from trying to delete the timeline at the same time.
|
||||
delete_lock_guard =
|
||||
DeletionGuard(Arc::clone(&timeline.delete_lock).try_lock_owned().map_err(
|
||||
|_| {
|
||||
DeleteTimelineError::Other(anyhow::anyhow!(
|
||||
"timeline deletion is already in progress"
|
||||
))
|
||||
},
|
||||
)?);
|
||||
delete_lock_guard = DeletionGuard(
|
||||
Arc::clone(&timeline.delete_lock)
|
||||
.try_lock_owned()
|
||||
.map_err(|_| DeleteTimelineError::AlreadyInProgress)?,
|
||||
);
|
||||
|
||||
// If another task finished the deletion just before we acquired the lock,
|
||||
// return success.
|
||||
|
||||
@@ -862,10 +862,8 @@ impl RemoteTimelineClient {
|
||||
"Found {} files not bound to index_file.json, proceeding with their deletion",
|
||||
remaining.len()
|
||||
);
|
||||
for file in remaining {
|
||||
warn!("Removing {}", file.object_name().unwrap_or_default());
|
||||
self.storage_impl.delete(&file).await?;
|
||||
}
|
||||
warn!("About to remove {} files", remaining.len());
|
||||
self.storage_impl.delete_objects(&remaining).await?;
|
||||
}
|
||||
|
||||
let index_file_path = timeline_storage_path.join(Path::new(IndexPart::FILE_NAME));
|
||||
|
||||
@@ -129,7 +129,7 @@ pub struct Timeline {
|
||||
|
||||
pub pg_version: u32,
|
||||
|
||||
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>,
|
||||
pub(crate) layers: Arc<tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>>,
|
||||
|
||||
/// Set of key ranges which should be covered by image layers to
|
||||
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
|
||||
@@ -1418,7 +1418,7 @@ impl Timeline {
|
||||
timeline_id,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
layers: tokio::sync::RwLock::new(LayerMap::default()),
|
||||
layers: Arc::new(tokio::sync::RwLock::new(LayerMap::default())),
|
||||
wanted_image_layers: Mutex::new(None),
|
||||
|
||||
walredo_mgr,
|
||||
@@ -3370,14 +3370,14 @@ struct CompactLevel0Phase1StatsBuilder {
|
||||
version: Option<u64>,
|
||||
tenant_id: Option<TenantId>,
|
||||
timeline_id: Option<TimelineId>,
|
||||
first_read_lock_acquisition_micros: DurationRecorder,
|
||||
get_level0_deltas_plus_drop_lock_micros: DurationRecorder,
|
||||
level0_deltas_count: Option<usize>,
|
||||
time_spent_between_locks: DurationRecorder,
|
||||
second_read_lock_acquisition_micros: DurationRecorder,
|
||||
second_read_lock_held_micros: DurationRecorder,
|
||||
sort_holes_micros: DurationRecorder,
|
||||
read_lock_acquisition_micros: DurationRecorder,
|
||||
read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
|
||||
read_lock_held_prerequisites_micros: DurationRecorder,
|
||||
read_lock_held_compute_holes_micros: DurationRecorder,
|
||||
read_lock_drop_micros: DurationRecorder,
|
||||
prepare_iterators_micros: DurationRecorder,
|
||||
write_layer_files_micros: DurationRecorder,
|
||||
level0_deltas_count: Option<usize>,
|
||||
new_deltas_count: Option<usize>,
|
||||
new_deltas_size: Option<u64>,
|
||||
}
|
||||
@@ -3390,14 +3390,14 @@ struct CompactLevel0Phase1Stats {
|
||||
tenant_id: TenantId,
|
||||
#[serde_as(as = "serde_with::DisplayFromStr")]
|
||||
timeline_id: TimelineId,
|
||||
first_read_lock_acquisition_micros: RecordedDuration,
|
||||
get_level0_deltas_plus_drop_lock_micros: RecordedDuration,
|
||||
level0_deltas_count: usize,
|
||||
time_spent_between_locks: RecordedDuration,
|
||||
second_read_lock_acquisition_micros: RecordedDuration,
|
||||
second_read_lock_held_micros: RecordedDuration,
|
||||
sort_holes_micros: RecordedDuration,
|
||||
read_lock_acquisition_micros: RecordedDuration,
|
||||
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
|
||||
read_lock_held_prerequisites_micros: RecordedDuration,
|
||||
read_lock_held_compute_holes_micros: RecordedDuration,
|
||||
read_lock_drop_micros: RecordedDuration,
|
||||
prepare_iterators_micros: RecordedDuration,
|
||||
write_layer_files_micros: RecordedDuration,
|
||||
level0_deltas_count: usize,
|
||||
new_deltas_count: usize,
|
||||
new_deltas_size: u64,
|
||||
}
|
||||
@@ -3406,54 +3406,51 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
|
||||
let CompactLevel0Phase1StatsBuilder {
|
||||
version,
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
first_read_lock_acquisition_micros,
|
||||
get_level0_deltas_plus_drop_lock_micros,
|
||||
level0_deltas_count,
|
||||
time_spent_between_locks,
|
||||
second_read_lock_acquisition_micros,
|
||||
second_read_lock_held_micros,
|
||||
sort_holes_micros,
|
||||
write_layer_files_micros,
|
||||
new_deltas_count,
|
||||
new_deltas_size,
|
||||
} = value;
|
||||
Ok(CompactLevel0Phase1Stats {
|
||||
version: version.ok_or_else(|| anyhow::anyhow!("version not set"))?,
|
||||
tenant_id: tenant_id.ok_or_else(|| anyhow::anyhow!("tenant_id not set"))?,
|
||||
timeline_id: timeline_id.ok_or_else(|| anyhow::anyhow!("timeline_id not set"))?,
|
||||
first_read_lock_acquisition_micros: first_read_lock_acquisition_micros
|
||||
Ok(Self {
|
||||
version: value.version.ok_or_else(|| anyhow!("version not set"))?,
|
||||
tenant_id: value
|
||||
.tenant_id
|
||||
.ok_or_else(|| anyhow!("tenant_id not set"))?,
|
||||
timeline_id: value
|
||||
.timeline_id
|
||||
.ok_or_else(|| anyhow!("timeline_id not set"))?,
|
||||
read_lock_acquisition_micros: value
|
||||
.read_lock_acquisition_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow::anyhow!("first_read_lock_acquisition_micros not set"))?,
|
||||
get_level0_deltas_plus_drop_lock_micros: get_level0_deltas_plus_drop_lock_micros
|
||||
.ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
|
||||
read_lock_held_spawn_blocking_startup_micros: value
|
||||
.read_lock_held_spawn_blocking_startup_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| {
|
||||
anyhow::anyhow!("get_level0_deltas_plus_drop_lock_micros not set")
|
||||
})?,
|
||||
level0_deltas_count: level0_deltas_count
|
||||
.ok_or_else(|| anyhow::anyhow!("level0_deltas_count not set"))?,
|
||||
time_spent_between_locks: time_spent_between_locks
|
||||
.ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
|
||||
read_lock_held_prerequisites_micros: value
|
||||
.read_lock_held_prerequisites_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow::anyhow!("time_spent_between_locks not set"))?,
|
||||
second_read_lock_acquisition_micros: second_read_lock_acquisition_micros
|
||||
.ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
|
||||
read_lock_held_compute_holes_micros: value
|
||||
.read_lock_held_compute_holes_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow::anyhow!("second_read_lock_acquisition_micros not set"))?,
|
||||
second_read_lock_held_micros: second_read_lock_held_micros
|
||||
.ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
|
||||
read_lock_drop_micros: value
|
||||
.read_lock_drop_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow::anyhow!("second_read_lock_held_micros not set"))?,
|
||||
sort_holes_micros: sort_holes_micros
|
||||
.ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
|
||||
prepare_iterators_micros: value
|
||||
.prepare_iterators_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow::anyhow!("sort_holes_micros not set"))?,
|
||||
write_layer_files_micros: write_layer_files_micros
|
||||
.ok_or_else(|| anyhow!("prepare_iterators_micros not set"))?,
|
||||
write_layer_files_micros: value
|
||||
.write_layer_files_micros
|
||||
.into_recorded()
|
||||
.ok_or_else(|| anyhow::anyhow!("write_layer_files_micros not set"))?,
|
||||
new_deltas_count: new_deltas_count
|
||||
.ok_or_else(|| anyhow::anyhow!("new_deltas_count not set"))?,
|
||||
new_deltas_size: new_deltas_size
|
||||
.ok_or_else(|| anyhow::anyhow!("new_deltas_size not set"))?,
|
||||
.ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
|
||||
level0_deltas_count: value
|
||||
.level0_deltas_count
|
||||
.ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
|
||||
new_deltas_count: value
|
||||
.new_deltas_count
|
||||
.ok_or_else(|| anyhow!("new_deltas_count not set"))?,
|
||||
new_deltas_size: value
|
||||
.new_deltas_size
|
||||
.ok_or_else(|| anyhow!("new_deltas_size not set"))?,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -3464,30 +3461,18 @@ impl Timeline {
|
||||
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
|
||||
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
|
||||
/// start of level0 files compaction, the on-demand download should be revisited as well.
|
||||
async fn compact_level0_phase1(
|
||||
&self,
|
||||
fn compact_level0_phase1(
|
||||
self: Arc<Self>,
|
||||
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
|
||||
layers: tokio::sync::OwnedRwLockReadGuard<LayerMap<dyn PersistentLayer>>,
|
||||
mut stats: CompactLevel0Phase1StatsBuilder,
|
||||
target_file_size: u64,
|
||||
ctx: &RequestContext,
|
||||
) -> Result<CompactLevel0Phase1Result, CompactionError> {
|
||||
let mut stats = CompactLevel0Phase1StatsBuilder {
|
||||
version: Some(1),
|
||||
tenant_id: Some(self.tenant_id),
|
||||
timeline_id: Some(self.timeline_id),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let begin = tokio::time::Instant::now();
|
||||
let layers = self.layers.read().await;
|
||||
let now = tokio::time::Instant::now();
|
||||
stats.first_read_lock_acquisition_micros =
|
||||
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
|
||||
stats.read_lock_held_spawn_blocking_startup_micros =
|
||||
stats.read_lock_acquisition_micros.till_now(); // set by caller
|
||||
let mut level0_deltas = layers.get_level0_deltas()?;
|
||||
drop(layers);
|
||||
stats.level0_deltas_count = Some(level0_deltas.len());
|
||||
stats.get_level0_deltas_plus_drop_lock_micros =
|
||||
stats.first_read_lock_acquisition_micros.till_now();
|
||||
|
||||
// Only compact if enough layers have accumulated.
|
||||
let threshold = self.get_compaction_threshold();
|
||||
if level0_deltas.is_empty() || level0_deltas.len() < threshold {
|
||||
@@ -3565,6 +3550,53 @@ impl Timeline {
|
||||
// we don't accidentally use it later in the function.
|
||||
drop(level0_deltas);
|
||||
|
||||
stats.read_lock_held_prerequisites_micros = stats
|
||||
.read_lock_held_spawn_blocking_startup_micros
|
||||
.till_now();
|
||||
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
|
||||
let mut prev: Option<Key> = None;
|
||||
for (next_key, _next_lsn, _size) in itertools::process_results(
|
||||
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|
||||
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
|
||||
)? {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter
|
||||
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
|
||||
let key_range = prev_key..next_key;
|
||||
// Measuring hole by just subtraction of i128 representation of key range boundaries
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
coverage_size,
|
||||
});
|
||||
if heap.len() > max_holes {
|
||||
heap.pop(); // remove smallest hole
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
prev = Some(next_key.next());
|
||||
}
|
||||
stats.read_lock_held_compute_holes_micros =
|
||||
stats.read_lock_held_prerequisites_micros.till_now();
|
||||
drop(layers);
|
||||
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
|
||||
let mut holes = heap.into_vec();
|
||||
holes.sort_unstable_by_key(|hole| hole.key_range.start);
|
||||
let mut next_hole = 0; // index of next hole in holes vector
|
||||
|
||||
// This iterator walks through all key-value pairs from all the layers
|
||||
// we're compacting, in key, LSN order.
|
||||
let all_values_iter = itertools::process_results(
|
||||
@@ -3604,50 +3636,7 @@ impl Timeline {
|
||||
},
|
||||
)?;
|
||||
|
||||
// Determine N largest holes where N is number of compacted layers.
|
||||
let max_holes = deltas_to_compact.len();
|
||||
let last_record_lsn = self.get_last_record_lsn();
|
||||
stats.time_spent_between_locks = stats.get_level0_deltas_plus_drop_lock_micros.till_now();
|
||||
let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here?
|
||||
stats.second_read_lock_acquisition_micros = stats.time_spent_between_locks.till_now();
|
||||
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
|
||||
let min_hole_coverage_size = 3; // TODO: something more flexible?
|
||||
|
||||
// min-heap (reserve space for one more element added before eviction)
|
||||
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
|
||||
let mut prev: Option<Key> = None;
|
||||
for (next_key, _next_lsn, _size) in itertools::process_results(
|
||||
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|
||||
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
|
||||
)? {
|
||||
if let Some(prev_key) = prev {
|
||||
// just first fast filter
|
||||
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
|
||||
let key_range = prev_key..next_key;
|
||||
// Measuring hole by just subtraction of i128 representation of key range boundaries
|
||||
// has not so much sense, because largest holes will corresponds field1/field2 changes.
|
||||
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
|
||||
// That is why it is better to measure size of hole as number of covering image layers.
|
||||
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
|
||||
if coverage_size >= min_hole_coverage_size {
|
||||
heap.push(Hole {
|
||||
key_range,
|
||||
coverage_size,
|
||||
});
|
||||
if heap.len() > max_holes {
|
||||
heap.pop(); // remove smallest hole
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
prev = Some(next_key.next());
|
||||
}
|
||||
drop(layers);
|
||||
stats.second_read_lock_held_micros = stats.second_read_lock_acquisition_micros.till_now();
|
||||
let mut holes = heap.into_vec();
|
||||
holes.sort_unstable_by_key(|hole| hole.key_range.start);
|
||||
let mut next_hole = 0; // index of next hole in holes vector
|
||||
stats.sort_holes_micros = stats.second_read_lock_held_micros.till_now();
|
||||
stats.prepare_iterators_micros = stats.read_lock_drop_micros.till_now();
|
||||
|
||||
// Merge the contents of all the input delta layers into a new set
|
||||
// of delta layers, based on the current partitioning.
|
||||
@@ -3807,7 +3796,7 @@ impl Timeline {
|
||||
layer_paths.pop().unwrap();
|
||||
}
|
||||
|
||||
stats.write_layer_files_micros = stats.sort_holes_micros.till_now();
|
||||
stats.write_layer_files_micros = stats.prepare_iterators_micros.till_now();
|
||||
stats.new_deltas_count = Some(new_layers.len());
|
||||
stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum());
|
||||
|
||||
@@ -3846,9 +3835,36 @@ impl Timeline {
|
||||
let CompactLevel0Phase1Result {
|
||||
new_layers,
|
||||
deltas_to_compact,
|
||||
} = self
|
||||
.compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx)
|
||||
.await?;
|
||||
} = {
|
||||
let phase1_span = info_span!("compact_level0_phase1");
|
||||
let myself = Arc::clone(self);
|
||||
let ctx = ctx.attached_child(); // technically, the spawn_blocking can outlive this future
|
||||
let mut stats = CompactLevel0Phase1StatsBuilder {
|
||||
version: Some(2),
|
||||
tenant_id: Some(self.tenant_id),
|
||||
timeline_id: Some(self.timeline_id),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let begin = tokio::time::Instant::now();
|
||||
let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
|
||||
let now = tokio::time::Instant::now();
|
||||
stats.read_lock_acquisition_micros =
|
||||
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
|
||||
let layer_removal_cs = layer_removal_cs.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let _entered = phase1_span.enter();
|
||||
myself.compact_level0_phase1(
|
||||
layer_removal_cs,
|
||||
phase1_layers_locked,
|
||||
stats,
|
||||
target_file_size,
|
||||
&ctx,
|
||||
)
|
||||
})
|
||||
.await
|
||||
.context("spawn_blocking")??
|
||||
};
|
||||
|
||||
if new_layers.is_empty() && deltas_to_compact.is_empty() {
|
||||
// nothing to do
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[toolchain]
|
||||
channel = "1.68.2"
|
||||
channel = "1.70.0"
|
||||
profile = "default"
|
||||
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
|
||||
# https://rust-lang.github.io/rustup/concepts/profiles.html
|
||||
|
||||
@@ -62,6 +62,7 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
|
||||
"pageserver_getpage_reconstruct_seconds_bucket",
|
||||
"pageserver_getpage_reconstruct_seconds_count",
|
||||
"pageserver_getpage_reconstruct_seconds_sum",
|
||||
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
|
||||
)
|
||||
|
||||
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (
|
||||
|
||||
@@ -30,7 +30,18 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
env.neon_cli.create_branch("test_startup")
|
||||
tenant_id = env.initial_tenant
|
||||
timeline_id = env.neon_cli.create_branch("test_startup")
|
||||
|
||||
def get_synced_lsn():
|
||||
"""Assert safekeepers are synced and get the LSN."""
|
||||
commit_lsns = [
|
||||
sk.http_client().timeline_status(tenant_id, timeline_id).commit_lsn
|
||||
for sk in env.safekeepers
|
||||
]
|
||||
assert len(commit_lsns) == 3
|
||||
assert len(set(commit_lsns)) == 1
|
||||
return commit_lsns[0]
|
||||
|
||||
endpoint = None
|
||||
|
||||
@@ -63,7 +74,8 @@ def test_startup_simple(neon_env_builder: NeonEnvBuilder, zenbenchmark: NeonBenc
|
||||
endpoint.stop()
|
||||
|
||||
# Imitate optimizations that console would do for the second start
|
||||
endpoint.respec(skip_pg_catalog_updates=True)
|
||||
lsn = get_synced_lsn()
|
||||
endpoint.respec(skip_pg_catalog_updates=True, skip_sync_safekeepers=lsn.lsn_int)
|
||||
|
||||
|
||||
# This test sometimes runs for longer than the global 5 minute timeout.
|
||||
|
||||
@@ -2,6 +2,7 @@ import copy
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
@@ -448,7 +449,7 @@ def dump_differs(first: Path, second: Path, output: Path) -> bool:
|
||||
"""
|
||||
|
||||
with output.open("w") as stdout:
|
||||
rv = subprocess.run(
|
||||
res = subprocess.run(
|
||||
[
|
||||
"diff",
|
||||
"--unified", # Make diff output more readable
|
||||
@@ -460,4 +461,53 @@ def dump_differs(first: Path, second: Path, output: Path) -> bool:
|
||||
stdout=stdout,
|
||||
)
|
||||
|
||||
return rv.returncode != 0
|
||||
differs = res.returncode != 0
|
||||
|
||||
# TODO: Remove after https://github.com/neondatabase/neon/pull/4425 is merged, and a couple of releases are made
|
||||
if differs:
|
||||
with tempfile.NamedTemporaryFile(mode="w") as tmp:
|
||||
tmp.write(PR4425_ALLOWED_DIFF)
|
||||
tmp.flush()
|
||||
|
||||
allowed = subprocess.run(
|
||||
[
|
||||
"diff",
|
||||
"--unified", # Make diff output more readable
|
||||
r"--ignore-matching-lines=^---", # Ignore diff headers
|
||||
r"--ignore-matching-lines=^\+\+\+", # Ignore diff headers
|
||||
"--ignore-matching-lines=^@@", # Ignore diff blocks location
|
||||
"--ignore-matching-lines=^ *$", # Ignore lines with only spaces
|
||||
"--ignore-matching-lines=^ --.*", # Ignore the " --" lines for compatibility with PG14
|
||||
"--ignore-blank-lines",
|
||||
str(output),
|
||||
str(tmp.name),
|
||||
],
|
||||
)
|
||||
|
||||
differs = allowed.returncode != 0
|
||||
|
||||
return differs
|
||||
|
||||
|
||||
PR4425_ALLOWED_DIFF = """
|
||||
--- /tmp/test_output/test_backward_compatibility[release-pg15]/compatibility_snapshot/dump.sql 2023-06-08 18:12:45.000000000 +0000
|
||||
+++ /tmp/test_output/test_backward_compatibility[release-pg15]/dump.sql 2023-06-13 07:25:35.211733653 +0000
|
||||
@@ -13,12 +13,20 @@
|
||||
|
||||
CREATE ROLE cloud_admin;
|
||||
ALTER ROLE cloud_admin WITH SUPERUSER INHERIT CREATEROLE CREATEDB LOGIN REPLICATION BYPASSRLS;
|
||||
+CREATE ROLE neon_superuser;
|
||||
+ALTER ROLE neon_superuser WITH NOSUPERUSER INHERIT CREATEROLE CREATEDB NOLOGIN NOREPLICATION NOBYPASSRLS;
|
||||
|
||||
--
|
||||
-- User Configurations
|
||||
--
|
||||
|
||||
|
||||
+--
|
||||
+-- Role memberships
|
||||
+--
|
||||
+
|
||||
+GRANT pg_read_all_data TO neon_superuser GRANTED BY cloud_admin;
|
||||
+GRANT pg_write_all_data TO neon_superuser GRANTED BY cloud_admin;
|
||||
"""
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
@@ -10,9 +12,10 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
) as primary:
|
||||
time.sleep(1)
|
||||
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
||||
primary_lsn = None
|
||||
cought_up = False
|
||||
caught_up = False
|
||||
queries = [
|
||||
"SHOW neon.timeline_id",
|
||||
"SHOW neon.tenant_id",
|
||||
@@ -56,7 +59,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
res = s_cur.fetchone()
|
||||
assert res is not None
|
||||
|
||||
while not cought_up:
|
||||
while not caught_up:
|
||||
with s_con.cursor() as secondary_cursor:
|
||||
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
|
||||
res = secondary_cursor.fetchone()
|
||||
@@ -66,7 +69,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
# due to e.g. autovacuum, but that shouldn't impact the content
|
||||
# of the tables, so we check whether we've replayed up to at
|
||||
# least after the commit of the `test` table.
|
||||
cought_up = secondary_lsn >= primary_lsn
|
||||
caught_up = secondary_lsn >= primary_lsn
|
||||
|
||||
# Explicit commit to flush any transient transaction-level state.
|
||||
s_con.commit()
|
||||
|
||||
@@ -16,6 +16,7 @@ from fixtures.pg_version import PgVersion, xfail_on_postgres
|
||||
from fixtures.types import Lsn, TenantId, TimelineId
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
env = neon_simple_env
|
||||
(tenant_id, _) = env.neon_cli.create_tenant()
|
||||
@@ -44,12 +45,16 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
|
||||
# we've disabled the autovacuum and checkpoint
|
||||
# so background processes should not change the size.
|
||||
# If this test will flake we should probably loosen the check
|
||||
assert size == initial_size, "starting idle compute should not change the tenant size"
|
||||
assert (
|
||||
size == initial_size
|
||||
), f"starting idle compute should not change the tenant size (Currently {size}, expected {initial_size})"
|
||||
|
||||
# the size should be the same, until we increase the size over the
|
||||
# gc_horizon
|
||||
size, inputs = http_client.tenant_size_and_modelinputs(tenant_id)
|
||||
assert size == initial_size, "tenant_size should not be affected by shutdown of compute"
|
||||
assert (
|
||||
size == initial_size
|
||||
), f"tenant_size should not be affected by shutdown of compute (Currently {size}, expected {initial_size})"
|
||||
|
||||
expected_inputs = {
|
||||
"segments": [
|
||||
@@ -318,6 +323,7 @@ def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Pa
|
||||
size_debug_file.write(size_debug)
|
||||
|
||||
|
||||
@pytest.mark.xfail
|
||||
def test_single_branch_get_tenant_size_grows(
|
||||
neon_env_builder: NeonEnvBuilder, test_output_dir: Path, pg_version: PgVersion
|
||||
):
|
||||
@@ -333,13 +339,13 @@ def test_single_branch_get_tenant_size_grows(
|
||||
# inserts is larger than gc_horizon. for example 0x20000 here hid the fact
|
||||
# that there next_gc_cutoff could be smaller than initdb_lsn, which will
|
||||
# obviously lead to issues when calculating the size.
|
||||
gc_horizon = 0x38000
|
||||
gc_horizon = 0x3BA00
|
||||
|
||||
# it's a bit of a hack, but different versions of postgres have different
|
||||
# amount of WAL generated for the same amount of data. so we need to
|
||||
# adjust the gc_horizon accordingly.
|
||||
if pg_version == PgVersion.V14:
|
||||
gc_horizon = 0x40000
|
||||
gc_horizon = 0x4A000
|
||||
|
||||
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
|
||||
|
||||
@@ -360,11 +366,11 @@ def test_single_branch_get_tenant_size_grows(
|
||||
if current_lsn - initdb_lsn >= gc_horizon:
|
||||
assert (
|
||||
size >= prev_size
|
||||
), "tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size"
|
||||
), f"tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size (Currently at: {current_lsn}, Init at: {initdb_lsn})"
|
||||
else:
|
||||
assert (
|
||||
size > prev_size
|
||||
), "tenant_size should grow, because we continue to add WAL to initial snapshot size"
|
||||
), f"tenant_size should grow, because we continue to add WAL to initial snapshot size (Currently at: {current_lsn}, Init at: {initdb_lsn})"
|
||||
|
||||
def get_current_consistent_size(
|
||||
env: NeonEnv,
|
||||
|
||||
@@ -463,10 +463,10 @@ def test_concurrent_timeline_delete_stuck_on(
|
||||
|
||||
# make the second call and assert behavior
|
||||
log.info("second call start")
|
||||
error_msg_re = "timeline deletion is already in progress"
|
||||
error_msg_re = "Timeline deletion is already in progress"
|
||||
with pytest.raises(PageserverApiException, match=error_msg_re) as second_call_err:
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id)
|
||||
assert second_call_err.value.status_code == 500
|
||||
assert second_call_err.value.status_code == 409
|
||||
env.pageserver.allowed_errors.append(f".*{child_timeline_id}.*{error_msg_re}.*")
|
||||
# the second call will try to transition the timeline into Stopping state as well
|
||||
env.pageserver.allowed_errors.append(
|
||||
@@ -518,9 +518,9 @@ def test_delete_timeline_client_hangup(neon_env_builder: NeonEnvBuilder):
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
|
||||
|
||||
env.pageserver.allowed_errors.append(
|
||||
f".*{child_timeline_id}.*timeline deletion is already in progress.*"
|
||||
f".*{child_timeline_id}.*Timeline deletion is already in progress.*"
|
||||
)
|
||||
with pytest.raises(PageserverApiException, match="timeline deletion is already in progress"):
|
||||
with pytest.raises(PageserverApiException, match="Timeline deletion is already in progress"):
|
||||
ps_http.timeline_delete(env.initial_tenant, child_timeline_id, timeout=2)
|
||||
|
||||
# make sure the timeout was due to the failpoint
|
||||
|
||||
Reference in New Issue
Block a user