Merge branch 'main' into extension_server

This commit is contained in:
Anastasia Lubennikova
2023-06-26 20:46:53 +03:00
24 changed files with 561 additions and 303 deletions

View File

@@ -180,7 +180,8 @@ jobs:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
timeout-minutes: 360 # 6h
# Increase timeout to 8h, default timeout is 6h
timeout-minutes: 480
steps:
- uses: actions/checkout@v3
@@ -321,8 +322,6 @@ jobs:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
timeout-minutes: 360 # 6h
steps:
- uses: actions/checkout@v3
@@ -414,8 +413,6 @@ jobs:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
timeout-minutes: 360 # 6h
steps:
- uses: actions/checkout@v3
@@ -501,8 +498,6 @@ jobs:
image: 369495373322.dkr.ecr.eu-central-1.amazonaws.com/rust:pinned
options: --init
timeout-minutes: 360 # 6h
steps:
- uses: actions/checkout@v3

View File

@@ -916,7 +916,7 @@ jobs:
exit 1
fi
- name: Create tag "release-${{ needs.tag.outputs.build-tag }}"
- name: Create git tag
if: github.ref_name == 'release'
uses: actions/github-script@v6
with:
@@ -926,7 +926,7 @@ jobs:
github.rest.git.createRef({
owner: context.repo.owner,
repo: context.repo.repo,
ref: "refs/tags/release-${{ needs.tag.outputs.build-tag }}",
ref: "refs/tags/${{ needs.tag.outputs.build-tag }}",
sha: context.sha,
})

View File

@@ -481,6 +481,23 @@ RUN wget https://github.com/rdkit/rdkit/archive/refs/tags/Release_2023_03_1.tar.
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/rdkit.control
#########################################################################################
#
# Layer "pg-uuidv7-pg-build"
# compile pg_uuidv7 extension
#
#########################################################################################
FROM build-deps AS pg-uuidv7-pg-build
COPY --from=pg-build /usr/local/pgsql/ /usr/local/pgsql/
ENV PATH "/usr/local/pgsql/bin/:$PATH"
RUN wget https://github.com/fboulnois/pg_uuidv7/archive/refs/tags/v1.0.1.tar.gz -O pg_uuidv7.tar.gz && \
echo "0d0759ab01b7fb23851ecffb0bce27822e1868a4a5819bfd276101c716637a7a pg_uuidv7.tar.gz" | sha256sum --check && \
mkdir pg_uuidv7-src && cd pg_uuidv7-src && tar xvzf ../pg_uuidv7.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/pg_uuidv7.control
#########################################################################################
#
# Layer "rust extensions"
@@ -614,6 +631,7 @@ COPY --from=kq-imcx-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-cron-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-pgx-ulid-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=rdkit-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY --from=pg-uuidv7-pg-build /usr/local/pgsql/ /usr/local/pgsql/
COPY pgxn/ pgxn/
RUN make -j $(getconf _NPROCESSORS_ONLN) \

View File

@@ -142,6 +142,84 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
}
}
/// Create special neon_superuser role, that's a slightly nerfed version of a real superuser
/// that we give to customers
fn create_neon_superuser(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
let roles = spec
.cluster
.roles
.iter()
.map(|r| format!("'{}'", escape_literal(&r.name)))
.collect::<Vec<_>>();
let dbs = spec
.cluster
.databases
.iter()
.map(|db| format!("'{}'", escape_literal(&db.name)))
.collect::<Vec<_>>();
let roles_decl = if roles.is_empty() {
String::from("roles text[] := NULL;")
} else {
format!(
r#"
roles text[] := ARRAY(SELECT rolname
FROM pg_catalog.pg_roles
WHERE rolname IN ({}));"#,
roles.join(", ")
)
};
let database_decl = if dbs.is_empty() {
String::from("dbs text[] := NULL;")
} else {
format!(
r#"
dbs text[] := ARRAY(SELECT datname
FROM pg_catalog.pg_database
WHERE datname IN ({}));"#,
dbs.join(", ")
)
};
// ALL PRIVILEGES grants CREATE, CONNECT, and TEMPORARY on all databases
// (see https://www.postgresql.org/docs/current/ddl-priv.html)
let query = format!(
r#"
DO $$
DECLARE
r text;
{}
{}
BEGIN
IF NOT EXISTS (
SELECT FROM pg_catalog.pg_roles WHERE rolname = 'neon_superuser')
THEN
CREATE ROLE neon_superuser CREATEDB CREATEROLE NOLOGIN IN ROLE pg_read_all_data, pg_write_all_data;
IF array_length(roles, 1) IS NOT NULL THEN
EXECUTE format('GRANT neon_superuser TO %s',
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(roles) as x), ', '));
FOREACH r IN ARRAY roles LOOP
EXECUTE format('ALTER ROLE %s CREATEROLE CREATEDB', quote_ident(r));
END LOOP;
END IF;
IF array_length(dbs, 1) IS NOT NULL THEN
EXECUTE format('GRANT ALL PRIVILEGES ON DATABASE %s TO neon_superuser',
array_to_string(ARRAY(SELECT quote_ident(x) FROM unnest(dbs) as x), ', '));
END IF;
END IF;
END
$$;"#,
roles_decl, database_decl,
);
info!("Neon superuser created:\n{}", &query);
client
.simple_query(&query)
.map_err(|e| anyhow::anyhow!(e).context(query))?;
Ok(())
}
impl ComputeNode {
pub fn set_status(&self, status: ComputeStatus) {
let mut state = self.state.lock().unwrap();
@@ -364,6 +442,8 @@ impl ComputeNode {
.map_err(|_| anyhow::anyhow!("invalid connstr"))?;
let mut client = Client::connect(zenith_admin_connstr.as_str(), NoTls)?;
// Disable forwarding so that users don't get a cloud_admin role
client.simple_query("SET neon.forward_ddl = false")?;
client.simple_query("CREATE USER cloud_admin WITH SUPERUSER")?;
client.simple_query("GRANT zenith_admin TO cloud_admin")?;
drop(client);
@@ -374,14 +454,16 @@ impl ComputeNode {
Ok(client) => client,
};
// Proceed with post-startup configuration. Note, that order of operations is important.
// Disable DDL forwarding because control plane already knows about these roles/databases.
client.simple_query("SET neon.forward_ddl = false")?;
// Proceed with post-startup configuration. Note, that order of operations is important.
let spec = &compute_state.pspec.as_ref().expect("spec must be set").spec;
create_neon_superuser(spec, &mut client)?;
handle_roles(spec, &mut client)?;
handle_databases(spec, &mut client)?;
handle_role_deletions(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, self.connstr.as_str(), &mut client)?;
handle_grants(spec, self.connstr.as_str())?;
handle_extensions(spec, &mut client)?;
// 'Close' connection
@@ -419,7 +501,7 @@ impl ComputeNode {
handle_roles(&spec, &mut client)?;
handle_databases(&spec, &mut client)?;
handle_role_deletions(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str(), &mut client)?;
handle_grants(&spec, self.connstr.as_str())?;
handle_extensions(&spec, &mut client)?;
}

View File

@@ -17,7 +17,7 @@ use compute_api::spec::{Database, GenericOption, GenericOptions, PgIdent, Role};
const POSTGRES_WAIT_TIMEOUT: Duration = Duration::from_millis(60 * 1000); // milliseconds
/// Escape a string for including it in a SQL literal
fn escape_literal(s: &str) -> String {
pub fn escape_literal(s: &str) -> String {
s.replace('\'', "''").replace('\\', "\\\\")
}

View File

@@ -269,17 +269,13 @@ pub fn handle_roles(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
xact.execute(query.as_str(), &[])?;
}
RoleAction::Create => {
let mut query: String = format!("CREATE ROLE {} ", name.pg_quote());
let mut query: String = format!(
"CREATE ROLE {} CREATEROLE CREATEDB IN ROLE neon_superuser",
name.pg_quote()
);
info!("role create query: '{}'", &query);
query.push_str(&role.to_pg_options());
xact.execute(query.as_str(), &[])?;
let grant_query = format!(
"GRANT pg_read_all_data, pg_write_all_data TO {}",
name.pg_quote()
);
xact.execute(grant_query.as_str(), &[])?;
info!("role grant query: '{}'", &grant_query);
}
}
@@ -476,6 +472,11 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
query.push_str(&db.to_pg_options());
let _guard = info_span!("executing", query).entered();
client.execute(query.as_str(), &[])?;
let grant_query: String = format!(
"GRANT ALL PRIVILEGES ON DATABASE {} TO neon_superuser",
name.pg_quote()
);
client.execute(grant_query.as_str(), &[])?;
}
};
@@ -495,35 +496,9 @@ pub fn handle_databases(spec: &ComputeSpec, client: &mut Client) -> Result<()> {
/// Grant CREATE ON DATABASE to the database owner and do some other alters and grants
/// to allow users creating trusted extensions and re-creating `public` schema, for example.
#[instrument(skip_all)]
pub fn handle_grants(spec: &ComputeSpec, connstr: &str, client: &mut Client) -> Result<()> {
pub fn handle_grants(spec: &ComputeSpec, connstr: &str) -> Result<()> {
info!("cluster spec grants:");
// We now have a separate `web_access` role to connect to the database
// via the web interface and proxy link auth. And also we grant a
// read / write all data privilege to every role. So also grant
// create to everyone.
// XXX: later we should stop messing with Postgres ACL in such horrible
// ways.
let roles = spec
.cluster
.roles
.iter()
.map(|r| r.name.pg_quote())
.collect::<Vec<_>>();
for db in &spec.cluster.databases {
let dbname = &db.name;
let query: String = format!(
"GRANT CREATE ON DATABASE {} TO {}",
dbname.pg_quote(),
roles.join(", ")
);
info!("grant query {}", &query);
client.execute(query.as_str(), &[])?;
}
// Do some per-database access adjustments. We'd better do this at db creation time,
// but CREATE DATABASE isn't transactional. So we cannot create db + do some grants
// atomically.

View File

@@ -23,6 +23,7 @@ use prometheus::{Registry, Result};
pub mod launch_timestamp;
mod wrappers;
pub use wrappers::{CountedReader, CountedWriter};
pub mod metric_vec_duration;
pub type UIntGauge = GenericGauge<AtomicU64>;
pub type UIntGaugeVec = GenericGaugeVec<AtomicU64>;

View File

@@ -0,0 +1,23 @@
//! Helpers for observing duration on HistogramVec / CounterVec / GaugeVec / MetricVec<T>.
use std::{future::Future, time::Instant};
pub trait DurationResultObserver {
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration);
}
pub async fn observe_async_block_duration_by_result<
T,
E,
F: Future<Output = Result<T, E>>,
O: DurationResultObserver,
>(
observer: &O,
block: F,
) -> Result<T, E> {
let start = Instant::now();
let result = block.await;
let duration = start.elapsed();
observer.observe_result(&result, duration);
result
}

View File

@@ -495,50 +495,50 @@ fn start_pageserver(
Ok(())
},
);
}
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let background_jobs_barrier = background_jobs_barrier;
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.
// The actual size calculation does need downloads, and
// creates a child context with the right DownloadBehavior.
DownloadBehavior::Error,
);
task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(),
TaskKind::MetricsCollection,
None,
None,
"consumption metrics collection",
true,
async move {
// first wait until background jobs are cleared to launch.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
let cancel = task_mgr::shutdown_token();
if let Some(metric_collection_endpoint) = &conf.metric_collection_endpoint {
let background_jobs_barrier = background_jobs_barrier;
let metrics_ctx = RequestContext::todo_child(
TaskKind::MetricsCollection,
// This task itself shouldn't download anything.
// The actual size calculation does need downloads, and
// creates a child context with the right DownloadBehavior.
DownloadBehavior::Error,
);
task_mgr::spawn(
crate::BACKGROUND_RUNTIME.handle(),
TaskKind::MetricsCollection,
None,
None,
"consumption metrics collection",
true,
async move {
// first wait until background jobs are cleared to launch.
//
// this is because we only process active tenants and timelines, and the
// Timeline::get_current_logical_size will spawn the logical size calculation,
// which will not be rate-limited.
let cancel = task_mgr::shutdown_token();
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = background_jobs_barrier.wait() => {}
};
tokio::select! {
_ = cancel.cancelled() => { return Ok(()); },
_ = background_jobs_barrier.wait() => {}
};
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,
conf.cached_metric_collection_interval,
conf.synthetic_size_calculation_interval,
conf.id,
metrics_ctx,
)
.instrument(info_span!("metrics_collection"))
.await?;
Ok(())
},
);
}
pageserver::consumption_metrics::collect_metrics(
metric_collection_endpoint,
conf.metric_collection_interval,
conf.cached_metric_collection_interval,
conf.synthetic_size_calculation_interval,
conf.id,
metrics_ctx,
)
.instrument(info_span!("metrics_collection"))
.await?;
Ok(())
},
);
}
// Spawn a task to listen for libpq connections. It will spawn further tasks

View File

@@ -96,12 +96,12 @@ pub mod defaults {
#background_task_maximum_delay = '{DEFAULT_BACKGROUND_TASK_MAXIMUM_DELAY}'
# [tenant_config]
[tenant_config]
#checkpoint_distance = {DEFAULT_CHECKPOINT_DISTANCE} # in bytes
#checkpoint_timeout = {DEFAULT_CHECKPOINT_TIMEOUT}
#compaction_target_size = {DEFAULT_COMPACTION_TARGET_SIZE} # in bytes
#compaction_period = '{DEFAULT_COMPACTION_PERIOD}'
#compaction_threshold = '{DEFAULT_COMPACTION_THRESHOLD}'
#compaction_threshold = {DEFAULT_COMPACTION_THRESHOLD}
#gc_period = '{DEFAULT_GC_PERIOD}'
#gc_horizon = {DEFAULT_GC_HORIZON}
@@ -111,7 +111,8 @@ pub mod defaults {
#min_resident_size_override = .. # in bytes
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
#gc_feedback = false
# [remote_storage]
[remote_storage]
"###
);

View File

@@ -1128,8 +1128,6 @@ async fn disk_usage_eviction_run(
freed_bytes: 0,
};
use crate::task_mgr::MGMT_REQUEST_RUNTIME;
let (tx, rx) = tokio::sync::oneshot::channel();
let state = get_state(&r);
@@ -1147,7 +1145,7 @@ async fn disk_usage_eviction_run(
let _g = cancel.drop_guard();
crate::task_mgr::spawn(
MGMT_REQUEST_RUNTIME.handle(),
crate::task_mgr::BACKGROUND_RUNTIME.handle(),
TaskKind::DiskUsageEviction,
None,
None,

View File

@@ -1,3 +1,4 @@
use metrics::metric_vec_duration::DurationResultObserver;
use metrics::{
register_counter_vec, register_histogram, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec, register_uint_gauge_vec,
@@ -424,6 +425,27 @@ pub static SMGR_QUERY_TIME: Lazy<HistogramVec> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub struct BasebackupQueryTime(HistogramVec);
pub static BASEBACKUP_QUERY_TIME: Lazy<BasebackupQueryTime> = Lazy::new(|| {
BasebackupQueryTime({
register_histogram_vec!(
"pageserver_basebackup_query_seconds",
"Histogram of basebackup queries durations, by result type",
&["result"],
CRITICAL_OP_BUCKETS.into(),
)
.expect("failed to define a metric")
})
});
impl DurationResultObserver for BasebackupQueryTime {
fn observe_result<T, E>(&self, res: &Result<T, E>, duration: std::time::Duration) {
let label_value = if res.is_ok() { "ok" } else { "error" };
let metric = self.0.get_metric_with_label_values(&[label_value]).unwrap();
metric.observe(duration.as_secs_f64());
}
}
pub static LIVE_CONNECTIONS_COUNT: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_live_connections",
@@ -823,11 +845,6 @@ impl TimelineMetrics {
let evictions_with_low_residence_duration =
evictions_with_low_residence_duration_builder.build(&tenant_id, &timeline_id);
// TODO(chi): remove this once we remove Lazy for all metrics. Otherwise this will not appear in the exporter
// and integration test will error.
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get();
MATERIALIZED_PAGE_CACHE_HIT.get();
TimelineMetrics {
tenant_id,
timeline_id,
@@ -1302,4 +1319,8 @@ pub fn preinitialize_metrics() {
// Same as above for this metric, but, it's a Vec-type metric for which we don't know all the labels.
BACKGROUND_LOOP_PERIOD_OVERRUN_COUNT.reset();
// Python tests need these.
MATERIALIZED_PAGE_CACHE_HIT_DIRECT.get();
MATERIALIZED_PAGE_CACHE_HIT.get();
}

View File

@@ -913,10 +913,24 @@ where
None
};
// Check that the timeline exists
self.handle_basebackup_request(pgb, tenant_id, timeline_id, lsn, None, false, ctx)
.await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
metrics::metric_vec_duration::observe_async_block_duration_by_result(
&*crate::metrics::BASEBACKUP_QUERY_TIME,
async move {
self.handle_basebackup_request(
pgb,
tenant_id,
timeline_id,
lsn,
None,
false,
ctx,
)
.await?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
anyhow::Ok(())
},
)
.await?;
}
// return pair of prev_lsn and last_lsn
else if query_string.starts_with("get_last_record_rlsn ") {

View File

@@ -12,7 +12,7 @@ use crate::context::RequestContext;
use crate::repository::{Key, Value};
use crate::task_mgr::TaskKind;
use crate::walrecord::NeonWalRecord;
use anyhow::Result;
use anyhow::{Context, Result};
use bytes::Bytes;
use enum_map::EnumMap;
use enumset::EnumSet;
@@ -343,7 +343,8 @@ impl LayerAccessStats {
/// All layers should implement a minimal `std::fmt::Debug` without tenant or
/// timeline names, because those are known in the context of which the layers
/// are used in (timeline).
pub trait Layer: std::fmt::Debug + Send + Sync {
#[async_trait::async_trait]
pub trait Layer: std::fmt::Debug + Send + Sync + 'static {
/// Range of keys that this layer covers
fn get_key_range(&self) -> Range<Key>;
@@ -373,13 +374,42 @@ pub trait Layer: std::fmt::Debug + Send + Sync {
/// is available. If this returns ValueReconstructResult::Continue, look up
/// the predecessor layer and call again with the same 'reconstruct_data' to
/// collect more data.
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: &mut ValueReconstructState,
ctx: &RequestContext,
) -> Result<ValueReconstructResult>;
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)>;
/// CANCEL SAFETY: if the returned future is dropped,
/// the wrapped closure still run to completion and the return value discarded.
/// For the case of get_value_reconstruct_data, we expect the closure to not
/// have any side effects, as it only attempts to read a layer (and stuff like
/// page cache isn't considered a real side effect).
/// But, ...
/// TRACING:
/// If the returned future is cancelled, the spawn_blocking span can outlive
/// the caller's span.
/// So, technically, we should be using `parent: None` and `follows_from: current`
/// instead. However, in practice, the advantage of maintaining the span stack
/// in logs outweighs the disadvantage of having a dangling span in a case that
/// is not expected to happen because in pageserver we generally don't drop pending futures.
async fn get_value_reconstruct_data(
self: Arc<Self>,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_data: ValueReconstructState,
ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
let span = tracing::info_span!("get_value_reconstruct_data_spawn_blocking");
tokio::task::spawn_blocking(move || {
let _enter = span.enter();
self.get_value_reconstruct_data_blocking(key, lsn_range, reconstruct_data, ctx)
})
.await
.context("spawn_blocking")?
}
/// A short ID string that uniquely identifies the given layer within a [`LayerMap`].
fn short_id(&self) -> String;
@@ -499,6 +529,7 @@ impl LayerDescriptor {
}
}
#[async_trait::async_trait]
impl Layer for LayerDescriptor {
fn get_key_range(&self) -> Range<Key> {
self.key.clone()
@@ -512,13 +543,13 @@ impl Layer for LayerDescriptor {
self.is_incremental
}
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_data: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
_reconstruct_data: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
todo!("This method shouldn't be part of the Layer trait")
}

View File

@@ -218,6 +218,7 @@ impl std::fmt::Debug for DeltaLayerInner {
}
}
#[async_trait::async_trait]
impl Layer for DeltaLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
@@ -294,13 +295,13 @@ impl Layer for DeltaLayer {
Ok(())
}
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
ensure!(lsn_range.start >= self.desc.lsn_range.start);
let mut need_image = true;
@@ -308,7 +309,7 @@ impl Layer for DeltaLayer {
{
// Open the file and lock the metadata in memory
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
// Scan the page versions backwards, starting from `lsn`.
let file = &inner.file;
@@ -374,9 +375,9 @@ impl Layer for DeltaLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok(ValueReconstructResult::Complete)
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
}

View File

@@ -149,6 +149,7 @@ impl std::fmt::Debug for ImageLayerInner {
}
}
#[async_trait::async_trait]
impl Layer for ImageLayer {
/// debugging function to print out the contents of the layer
fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
@@ -181,18 +182,18 @@ impl Layer for ImageLayer {
}
/// Look up given page in the file
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
mut reconstruct_state: ValueReconstructState,
ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
assert!(self.desc.key_range.contains(&key));
assert!(lsn_range.start >= self.lsn);
assert!(lsn_range.end >= self.lsn);
let inner = self.load(LayerAccessKind::GetValueReconstructData, ctx)?;
let inner = self.load(LayerAccessKind::GetValueReconstructData, &ctx)?;
let file = inner.file.as_ref().unwrap();
let tree_reader = DiskBtreeReader::new(inner.index_start_blk, inner.index_root_blk, file);
@@ -210,9 +211,9 @@ impl Layer for ImageLayer {
let value = Bytes::from(blob);
reconstruct_state.img = Some((self.lsn, value));
Ok(ValueReconstructResult::Complete)
Ok((reconstruct_state, ValueReconstructResult::Complete))
} else {
Ok(ValueReconstructResult::Missing)
Ok((reconstruct_state, ValueReconstructResult::Missing))
}
}

View File

@@ -110,6 +110,7 @@ impl InMemoryLayer {
}
}
#[async_trait::async_trait]
impl Layer for InMemoryLayer {
fn get_key_range(&self) -> Range<Key> {
Key::MIN..Key::MAX
@@ -190,13 +191,13 @@ impl Layer for InMemoryLayer {
}
/// Look up given value in the layer.
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
mut reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> anyhow::Result<(ValueReconstructState, ValueReconstructResult)> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
@@ -213,7 +214,7 @@ impl Layer for InMemoryLayer {
match value {
Value::Image(img) => {
reconstruct_state.img = Some((*entry_lsn, img));
return Ok(ValueReconstructResult::Complete);
return Ok((reconstruct_state, ValueReconstructResult::Complete));
}
Value::WalRecord(rec) => {
let will_init = rec.will_init();
@@ -233,9 +234,9 @@ impl Layer for InMemoryLayer {
// If an older page image is needed to reconstruct the page, let the
// caller know.
if need_image {
Ok(ValueReconstructResult::Continue)
Ok((reconstruct_state, ValueReconstructResult::Continue))
} else {
Ok(ValueReconstructResult::Complete)
Ok((reconstruct_state, ValueReconstructResult::Complete))
}
}
}

View File

@@ -6,7 +6,7 @@ use crate::context::RequestContext;
use crate::repository::Key;
use crate::tenant::layer_map::BatchedUpdates;
use crate::tenant::remote_timeline_client::index::LayerFileMetadata;
use crate::tenant::storage_layer::{Layer, ValueReconstructResult, ValueReconstructState};
use crate::tenant::storage_layer::{Layer, ValueReconstructState};
use anyhow::{bail, Result};
use pageserver_api::models::HistoricLayerInfo;
use std::ops::Range;
@@ -21,7 +21,7 @@ use utils::{
use super::filename::{DeltaFileName, ImageFileName};
use super::{
DeltaLayer, ImageLayer, LayerAccessStats, LayerAccessStatsReset, LayerIter, LayerKeyIter,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc,
LayerResidenceStatus, PersistentLayer, PersistentLayerDesc, ValueReconstructResult,
};
/// RemoteLayer is a not yet downloaded [`ImageLayer`] or
@@ -63,14 +63,15 @@ impl std::fmt::Debug for RemoteLayer {
}
}
#[async_trait::async_trait]
impl Layer for RemoteLayer {
fn get_value_reconstruct_data(
fn get_value_reconstruct_data_blocking(
&self,
_key: Key,
_lsn_range: Range<Lsn>,
_reconstruct_state: &mut ValueReconstructState,
_ctx: &RequestContext,
) -> Result<ValueReconstructResult> {
_reconstruct_state: ValueReconstructState,
_ctx: RequestContext,
) -> Result<(ValueReconstructState, ValueReconstructResult)> {
bail!(
"layer {} needs to be downloaded",
self.filename().file_name()

View File

@@ -129,7 +129,7 @@ pub struct Timeline {
pub pg_version: u32,
pub(crate) layers: tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>,
pub(crate) layers: Arc<tokio::sync::RwLock<LayerMap<dyn PersistentLayer>>>,
/// Set of key ranges which should be covered by image layers to
/// allow GC to remove old layers. This set is created by GC and its cutoff LSN is also stored.
@@ -555,13 +555,14 @@ impl Timeline {
None => None,
};
let mut reconstruct_state = ValueReconstructState {
let reconstruct_state = ValueReconstructState {
records: Vec::new(),
img: cached_page_img,
};
let timer = self.metrics.get_reconstruct_data_time_histo.start_timer();
self.get_reconstruct_data(key, lsn, &mut reconstruct_state, ctx)
let reconstruct_state = self
.get_reconstruct_data(key, lsn, reconstruct_state, ctx)
.await?;
timer.stop_and_record();
@@ -1418,7 +1419,7 @@ impl Timeline {
timeline_id,
tenant_id,
pg_version,
layers: tokio::sync::RwLock::new(LayerMap::default()),
layers: Arc::new(tokio::sync::RwLock::new(LayerMap::default())),
wanted_image_layers: Mutex::new(None),
walredo_mgr,
@@ -2352,9 +2353,9 @@ impl Timeline {
&self,
key: Key,
request_lsn: Lsn,
reconstruct_state: &mut ValueReconstructState,
mut reconstruct_state: ValueReconstructState,
ctx: &RequestContext,
) -> Result<(), PageReconstructError> {
) -> Result<ValueReconstructState, PageReconstructError> {
// Start from the current timeline.
let mut timeline_owned;
let mut timeline = self;
@@ -2384,12 +2385,12 @@ impl Timeline {
// The function should have updated 'state'
//info!("CALLED for {} at {}: {:?} with {} records, cached {}", key, cont_lsn, result, reconstruct_state.records.len(), cached_lsn);
match result {
ValueReconstructResult::Complete => return Ok(()),
ValueReconstructResult::Complete => return Ok(reconstruct_state),
ValueReconstructResult::Continue => {
// If we reached an earlier cached page image, we're done.
if cont_lsn == cached_lsn + 1 {
MATERIALIZED_PAGE_CACHE_HIT.inc_by(1);
return Ok(());
return Ok(reconstruct_state);
}
if prev_lsn <= cont_lsn {
// Didn't make any progress in last iteration. Error out to avoid
@@ -2493,13 +2494,19 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match open_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(open_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2520,13 +2527,19 @@ impl Timeline {
if cont_lsn > start_lsn {
//info!("CHECKING for {} at {} on frozen layer {}", key, cont_lsn, frozen_layer.filename().display());
let lsn_floor = max(cached_lsn + 1, start_lsn);
result = match frozen_layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(frozen_layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -2555,13 +2568,19 @@ impl Timeline {
// Get all the data needed to reconstruct the page version from this layer.
// But if we have an older cached page image, no need to go past that.
let lsn_floor = max(cached_lsn + 1, lsn_floor);
result = match layer.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx,
) {
Ok(result) => result,
result = match Arc::clone(&layer)
.get_value_reconstruct_data(
key,
lsn_floor..cont_lsn,
reconstruct_state,
ctx.attached_child(),
)
.await
{
Ok((new_reconstruct_state, result)) => {
reconstruct_state = new_reconstruct_state;
result
}
Err(e) => return Err(PageReconstructError::from(e)),
};
cont_lsn = lsn_floor;
@@ -3370,14 +3389,14 @@ struct CompactLevel0Phase1StatsBuilder {
version: Option<u64>,
tenant_id: Option<TenantId>,
timeline_id: Option<TimelineId>,
first_read_lock_acquisition_micros: DurationRecorder,
get_level0_deltas_plus_drop_lock_micros: DurationRecorder,
level0_deltas_count: Option<usize>,
time_spent_between_locks: DurationRecorder,
second_read_lock_acquisition_micros: DurationRecorder,
second_read_lock_held_micros: DurationRecorder,
sort_holes_micros: DurationRecorder,
read_lock_acquisition_micros: DurationRecorder,
read_lock_held_spawn_blocking_startup_micros: DurationRecorder,
read_lock_held_prerequisites_micros: DurationRecorder,
read_lock_held_compute_holes_micros: DurationRecorder,
read_lock_drop_micros: DurationRecorder,
prepare_iterators_micros: DurationRecorder,
write_layer_files_micros: DurationRecorder,
level0_deltas_count: Option<usize>,
new_deltas_count: Option<usize>,
new_deltas_size: Option<u64>,
}
@@ -3390,14 +3409,14 @@ struct CompactLevel0Phase1Stats {
tenant_id: TenantId,
#[serde_as(as = "serde_with::DisplayFromStr")]
timeline_id: TimelineId,
first_read_lock_acquisition_micros: RecordedDuration,
get_level0_deltas_plus_drop_lock_micros: RecordedDuration,
level0_deltas_count: usize,
time_spent_between_locks: RecordedDuration,
second_read_lock_acquisition_micros: RecordedDuration,
second_read_lock_held_micros: RecordedDuration,
sort_holes_micros: RecordedDuration,
read_lock_acquisition_micros: RecordedDuration,
read_lock_held_spawn_blocking_startup_micros: RecordedDuration,
read_lock_held_prerequisites_micros: RecordedDuration,
read_lock_held_compute_holes_micros: RecordedDuration,
read_lock_drop_micros: RecordedDuration,
prepare_iterators_micros: RecordedDuration,
write_layer_files_micros: RecordedDuration,
level0_deltas_count: usize,
new_deltas_count: usize,
new_deltas_size: u64,
}
@@ -3406,54 +3425,51 @@ impl TryFrom<CompactLevel0Phase1StatsBuilder> for CompactLevel0Phase1Stats {
type Error = anyhow::Error;
fn try_from(value: CompactLevel0Phase1StatsBuilder) -> Result<Self, Self::Error> {
let CompactLevel0Phase1StatsBuilder {
version,
tenant_id,
timeline_id,
first_read_lock_acquisition_micros,
get_level0_deltas_plus_drop_lock_micros,
level0_deltas_count,
time_spent_between_locks,
second_read_lock_acquisition_micros,
second_read_lock_held_micros,
sort_holes_micros,
write_layer_files_micros,
new_deltas_count,
new_deltas_size,
} = value;
Ok(CompactLevel0Phase1Stats {
version: version.ok_or_else(|| anyhow::anyhow!("version not set"))?,
tenant_id: tenant_id.ok_or_else(|| anyhow::anyhow!("tenant_id not set"))?,
timeline_id: timeline_id.ok_or_else(|| anyhow::anyhow!("timeline_id not set"))?,
first_read_lock_acquisition_micros: first_read_lock_acquisition_micros
Ok(Self {
version: value.version.ok_or_else(|| anyhow!("version not set"))?,
tenant_id: value
.tenant_id
.ok_or_else(|| anyhow!("tenant_id not set"))?,
timeline_id: value
.timeline_id
.ok_or_else(|| anyhow!("timeline_id not set"))?,
read_lock_acquisition_micros: value
.read_lock_acquisition_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("first_read_lock_acquisition_micros not set"))?,
get_level0_deltas_plus_drop_lock_micros: get_level0_deltas_plus_drop_lock_micros
.ok_or_else(|| anyhow!("read_lock_acquisition_micros not set"))?,
read_lock_held_spawn_blocking_startup_micros: value
.read_lock_held_spawn_blocking_startup_micros
.into_recorded()
.ok_or_else(|| {
anyhow::anyhow!("get_level0_deltas_plus_drop_lock_micros not set")
})?,
level0_deltas_count: level0_deltas_count
.ok_or_else(|| anyhow::anyhow!("level0_deltas_count not set"))?,
time_spent_between_locks: time_spent_between_locks
.ok_or_else(|| anyhow!("read_lock_held_spawn_blocking_startup_micros not set"))?,
read_lock_held_prerequisites_micros: value
.read_lock_held_prerequisites_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("time_spent_between_locks not set"))?,
second_read_lock_acquisition_micros: second_read_lock_acquisition_micros
.ok_or_else(|| anyhow!("read_lock_held_prerequisites_micros not set"))?,
read_lock_held_compute_holes_micros: value
.read_lock_held_compute_holes_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("second_read_lock_acquisition_micros not set"))?,
second_read_lock_held_micros: second_read_lock_held_micros
.ok_or_else(|| anyhow!("read_lock_held_compute_holes_micros not set"))?,
read_lock_drop_micros: value
.read_lock_drop_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("second_read_lock_held_micros not set"))?,
sort_holes_micros: sort_holes_micros
.ok_or_else(|| anyhow!("read_lock_drop_micros not set"))?,
prepare_iterators_micros: value
.prepare_iterators_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("sort_holes_micros not set"))?,
write_layer_files_micros: write_layer_files_micros
.ok_or_else(|| anyhow!("prepare_iterators_micros not set"))?,
write_layer_files_micros: value
.write_layer_files_micros
.into_recorded()
.ok_or_else(|| anyhow::anyhow!("write_layer_files_micros not set"))?,
new_deltas_count: new_deltas_count
.ok_or_else(|| anyhow::anyhow!("new_deltas_count not set"))?,
new_deltas_size: new_deltas_size
.ok_or_else(|| anyhow::anyhow!("new_deltas_size not set"))?,
.ok_or_else(|| anyhow!("write_layer_files_micros not set"))?,
level0_deltas_count: value
.level0_deltas_count
.ok_or_else(|| anyhow!("level0_deltas_count not set"))?,
new_deltas_count: value
.new_deltas_count
.ok_or_else(|| anyhow!("new_deltas_count not set"))?,
new_deltas_size: value
.new_deltas_size
.ok_or_else(|| anyhow!("new_deltas_size not set"))?,
})
}
}
@@ -3464,30 +3480,18 @@ impl Timeline {
/// This method takes the `_layer_removal_cs` guard to highlight it required downloads are
/// returned as an error. If the `layer_removal_cs` boundary is changed not to be taken in the
/// start of level0 files compaction, the on-demand download should be revisited as well.
async fn compact_level0_phase1(
&self,
fn compact_level0_phase1(
self: Arc<Self>,
_layer_removal_cs: Arc<tokio::sync::OwnedMutexGuard<()>>,
layers: tokio::sync::OwnedRwLockReadGuard<LayerMap<dyn PersistentLayer>>,
mut stats: CompactLevel0Phase1StatsBuilder,
target_file_size: u64,
ctx: &RequestContext,
) -> Result<CompactLevel0Phase1Result, CompactionError> {
let mut stats = CompactLevel0Phase1StatsBuilder {
version: Some(1),
tenant_id: Some(self.tenant_id),
timeline_id: Some(self.timeline_id),
..Default::default()
};
let begin = tokio::time::Instant::now();
let layers = self.layers.read().await;
let now = tokio::time::Instant::now();
stats.first_read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
stats.read_lock_held_spawn_blocking_startup_micros =
stats.read_lock_acquisition_micros.till_now(); // set by caller
let mut level0_deltas = layers.get_level0_deltas()?;
drop(layers);
stats.level0_deltas_count = Some(level0_deltas.len());
stats.get_level0_deltas_plus_drop_lock_micros =
stats.first_read_lock_acquisition_micros.till_now();
// Only compact if enough layers have accumulated.
let threshold = self.get_compaction_threshold();
if level0_deltas.is_empty() || level0_deltas.len() < threshold {
@@ -3565,6 +3569,53 @@ impl Timeline {
// we don't accidentally use it later in the function.
drop(level0_deltas);
stats.read_lock_held_prerequisites_micros = stats
.read_lock_held_spawn_blocking_startup_micros
.till_now();
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
// min-heap (reserve space for one more element added before eviction)
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None;
for (next_key, _next_lsn, _size) in itertools::process_results(
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
)? {
if let Some(prev_key) = prev {
// just first fast filter
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
let key_range = prev_key..next_key;
// Measuring hole by just subtraction of i128 representation of key range boundaries
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
coverage_size,
});
if heap.len() > max_holes {
heap.pop(); // remove smallest hole
}
}
}
}
prev = Some(next_key.next());
}
stats.read_lock_held_compute_holes_micros =
stats.read_lock_held_prerequisites_micros.till_now();
drop(layers);
stats.read_lock_drop_micros = stats.read_lock_held_compute_holes_micros.till_now();
let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start);
let mut next_hole = 0; // index of next hole in holes vector
// This iterator walks through all key-value pairs from all the layers
// we're compacting, in key, LSN order.
let all_values_iter = itertools::process_results(
@@ -3604,50 +3655,7 @@ impl Timeline {
},
)?;
// Determine N largest holes where N is number of compacted layers.
let max_holes = deltas_to_compact.len();
let last_record_lsn = self.get_last_record_lsn();
stats.time_spent_between_locks = stats.get_level0_deltas_plus_drop_lock_micros.till_now();
let layers = self.layers.read().await; // Is'n it better to hold original layers lock till here?
stats.second_read_lock_acquisition_micros = stats.time_spent_between_locks.till_now();
let min_hole_range = (target_file_size / page_cache::PAGE_SZ as u64) as i128;
let min_hole_coverage_size = 3; // TODO: something more flexible?
// min-heap (reserve space for one more element added before eviction)
let mut heap: BinaryHeap<Hole> = BinaryHeap::with_capacity(max_holes + 1);
let mut prev: Option<Key> = None;
for (next_key, _next_lsn, _size) in itertools::process_results(
deltas_to_compact.iter().map(|l| l.key_iter(ctx)),
|iter_iter| iter_iter.kmerge_by(|a, b| a.0 <= b.0),
)? {
if let Some(prev_key) = prev {
// just first fast filter
if next_key.to_i128() - prev_key.to_i128() >= min_hole_range {
let key_range = prev_key..next_key;
// Measuring hole by just subtraction of i128 representation of key range boundaries
// has not so much sense, because largest holes will corresponds field1/field2 changes.
// But we are mostly interested to eliminate holes which cause generation of excessive image layers.
// That is why it is better to measure size of hole as number of covering image layers.
let coverage_size = layers.image_coverage(&key_range, last_record_lsn)?.len();
if coverage_size >= min_hole_coverage_size {
heap.push(Hole {
key_range,
coverage_size,
});
if heap.len() > max_holes {
heap.pop(); // remove smallest hole
}
}
}
}
prev = Some(next_key.next());
}
drop(layers);
stats.second_read_lock_held_micros = stats.second_read_lock_acquisition_micros.till_now();
let mut holes = heap.into_vec();
holes.sort_unstable_by_key(|hole| hole.key_range.start);
let mut next_hole = 0; // index of next hole in holes vector
stats.sort_holes_micros = stats.second_read_lock_held_micros.till_now();
stats.prepare_iterators_micros = stats.read_lock_drop_micros.till_now();
// Merge the contents of all the input delta layers into a new set
// of delta layers, based on the current partitioning.
@@ -3807,7 +3815,7 @@ impl Timeline {
layer_paths.pop().unwrap();
}
stats.write_layer_files_micros = stats.sort_holes_micros.till_now();
stats.write_layer_files_micros = stats.prepare_iterators_micros.till_now();
stats.new_deltas_count = Some(new_layers.len());
stats.new_deltas_size = Some(new_layers.iter().map(|l| l.desc.file_size).sum());
@@ -3846,9 +3854,36 @@ impl Timeline {
let CompactLevel0Phase1Result {
new_layers,
deltas_to_compact,
} = self
.compact_level0_phase1(layer_removal_cs.clone(), target_file_size, ctx)
.await?;
} = {
let phase1_span = info_span!("compact_level0_phase1");
let myself = Arc::clone(self);
let ctx = ctx.attached_child(); // technically, the spawn_blocking can outlive this future
let mut stats = CompactLevel0Phase1StatsBuilder {
version: Some(2),
tenant_id: Some(self.tenant_id),
timeline_id: Some(self.timeline_id),
..Default::default()
};
let begin = tokio::time::Instant::now();
let phase1_layers_locked = Arc::clone(&self.layers).read_owned().await;
let now = tokio::time::Instant::now();
stats.read_lock_acquisition_micros =
DurationRecorder::Recorded(RecordedDuration(now - begin), now);
let layer_removal_cs = layer_removal_cs.clone();
tokio::task::spawn_blocking(move || {
let _entered = phase1_span.enter();
myself.compact_level0_phase1(
layer_removal_cs,
phase1_layers_locked,
stats,
target_file_size,
&ctx,
)
})
.await
.context("spawn_blocking")??
};
if new_layers.is_empty() && deltas_to_compact.is_empty() {
// nothing to do

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.68.2"
channel = "1.70.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -62,6 +62,7 @@ PAGESERVER_GLOBAL_METRICS: Tuple[str, ...] = (
"pageserver_getpage_reconstruct_seconds_bucket",
"pageserver_getpage_reconstruct_seconds_count",
"pageserver_getpage_reconstruct_seconds_sum",
*[f"pageserver_basebackup_query_seconds_{x}" for x in ["bucket", "count", "sum"]],
)
PAGESERVER_PER_TENANT_METRICS: Tuple[str, ...] = (

View File

@@ -2,6 +2,7 @@ import copy
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
from typing import Any, Optional
@@ -448,7 +449,7 @@ def dump_differs(first: Path, second: Path, output: Path) -> bool:
"""
with output.open("w") as stdout:
rv = subprocess.run(
res = subprocess.run(
[
"diff",
"--unified", # Make diff output more readable
@@ -460,4 +461,53 @@ def dump_differs(first: Path, second: Path, output: Path) -> bool:
stdout=stdout,
)
return rv.returncode != 0
differs = res.returncode != 0
# TODO: Remove after https://github.com/neondatabase/neon/pull/4425 is merged, and a couple of releases are made
if differs:
with tempfile.NamedTemporaryFile(mode="w") as tmp:
tmp.write(PR4425_ALLOWED_DIFF)
tmp.flush()
allowed = subprocess.run(
[
"diff",
"--unified", # Make diff output more readable
r"--ignore-matching-lines=^---", # Ignore diff headers
r"--ignore-matching-lines=^\+\+\+", # Ignore diff headers
"--ignore-matching-lines=^@@", # Ignore diff blocks location
"--ignore-matching-lines=^ *$", # Ignore lines with only spaces
"--ignore-matching-lines=^ --.*", # Ignore the " --" lines for compatibility with PG14
"--ignore-blank-lines",
str(output),
str(tmp.name),
],
)
differs = allowed.returncode != 0
return differs
PR4425_ALLOWED_DIFF = """
--- /tmp/test_output/test_backward_compatibility[release-pg15]/compatibility_snapshot/dump.sql 2023-06-08 18:12:45.000000000 +0000
+++ /tmp/test_output/test_backward_compatibility[release-pg15]/dump.sql 2023-06-13 07:25:35.211733653 +0000
@@ -13,12 +13,20 @@
CREATE ROLE cloud_admin;
ALTER ROLE cloud_admin WITH SUPERUSER INHERIT CREATEROLE CREATEDB LOGIN REPLICATION BYPASSRLS;
+CREATE ROLE neon_superuser;
+ALTER ROLE neon_superuser WITH NOSUPERUSER INHERIT CREATEROLE CREATEDB NOLOGIN NOREPLICATION NOBYPASSRLS;
--
-- User Configurations
--
+--
+-- Role memberships
+--
+
+GRANT pg_read_all_data TO neon_superuser GRANTED BY cloud_admin;
+GRANT pg_write_all_data TO neon_superuser GRANTED BY cloud_admin;
"""

View File

@@ -1,3 +1,5 @@
import time
import pytest
from fixtures.neon_fixtures import NeonEnv
@@ -10,9 +12,10 @@ def test_hot_standby(neon_simple_env: NeonEnv):
branch_name="main",
endpoint_id="primary",
) as primary:
time.sleep(1)
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
primary_lsn = None
cought_up = False
caught_up = False
queries = [
"SHOW neon.timeline_id",
"SHOW neon.tenant_id",
@@ -56,7 +59,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
res = s_cur.fetchone()
assert res is not None
while not cought_up:
while not caught_up:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
res = secondary_cursor.fetchone()
@@ -66,7 +69,7 @@ def test_hot_standby(neon_simple_env: NeonEnv):
# due to e.g. autovacuum, but that shouldn't impact the content
# of the tables, so we check whether we've replayed up to at
# least after the commit of the `test` table.
cought_up = secondary_lsn >= primary_lsn
caught_up = secondary_lsn >= primary_lsn
# Explicit commit to flush any transient transaction-level state.
s_con.commit()

View File

@@ -16,6 +16,7 @@ from fixtures.pg_version import PgVersion, xfail_on_postgres
from fixtures.types import Lsn, TenantId, TimelineId
@pytest.mark.xfail
def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
env = neon_simple_env
(tenant_id, _) = env.neon_cli.create_tenant()
@@ -44,12 +45,16 @@ def test_empty_tenant_size(neon_simple_env: NeonEnv, test_output_dir: Path):
# we've disabled the autovacuum and checkpoint
# so background processes should not change the size.
# If this test will flake we should probably loosen the check
assert size == initial_size, "starting idle compute should not change the tenant size"
assert (
size == initial_size
), f"starting idle compute should not change the tenant size (Currently {size}, expected {initial_size})"
# the size should be the same, until we increase the size over the
# gc_horizon
size, inputs = http_client.tenant_size_and_modelinputs(tenant_id)
assert size == initial_size, "tenant_size should not be affected by shutdown of compute"
assert (
size == initial_size
), f"tenant_size should not be affected by shutdown of compute (Currently {size}, expected {initial_size})"
expected_inputs = {
"segments": [
@@ -318,6 +323,7 @@ def test_only_heads_within_horizon(neon_simple_env: NeonEnv, test_output_dir: Pa
size_debug_file.write(size_debug)
@pytest.mark.xfail
def test_single_branch_get_tenant_size_grows(
neon_env_builder: NeonEnvBuilder, test_output_dir: Path, pg_version: PgVersion
):
@@ -333,13 +339,13 @@ def test_single_branch_get_tenant_size_grows(
# inserts is larger than gc_horizon. for example 0x20000 here hid the fact
# that there next_gc_cutoff could be smaller than initdb_lsn, which will
# obviously lead to issues when calculating the size.
gc_horizon = 0x38000
gc_horizon = 0x3BA00
# it's a bit of a hack, but different versions of postgres have different
# amount of WAL generated for the same amount of data. so we need to
# adjust the gc_horizon accordingly.
if pg_version == PgVersion.V14:
gc_horizon = 0x40000
gc_horizon = 0x4A000
neon_env_builder.pageserver_config_override = f"tenant_config={{compaction_period='0s', gc_period='0s', pitr_interval='0sec', gc_horizon={gc_horizon}}}"
@@ -360,11 +366,11 @@ def test_single_branch_get_tenant_size_grows(
if current_lsn - initdb_lsn >= gc_horizon:
assert (
size >= prev_size
), "tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size"
), f"tenant_size may grow or not grow, because we only add gc_horizon amount of WAL to initial snapshot size (Currently at: {current_lsn}, Init at: {initdb_lsn})"
else:
assert (
size > prev_size
), "tenant_size should grow, because we continue to add WAL to initial snapshot size"
), f"tenant_size should grow, because we continue to add WAL to initial snapshot size (Currently at: {current_lsn}, Init at: {initdb_lsn})"
def get_current_consistent_size(
env: NeonEnv,