Compare commits

..

1 Commits

Author SHA1 Message Date
Konstantin Knizhnik
8799a603a2 walingest createdb record both for CREATE_DATABASE_FROM_WAL and CREATE_DATABASE_FILE_COPY 2023-09-19 13:23:14 +03:00
34 changed files with 469 additions and 979 deletions

View File

@@ -834,7 +834,7 @@ jobs:
run:
shell: sh -eu {0}
env:
VM_BUILDER_VERSION: v0.17.11
VM_BUILDER_VERSION: v0.17.10
steps:
- name: Checkout
@@ -1091,9 +1091,8 @@ jobs:
GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }}
run: |
if [[ "$GITHUB_REF_NAME" == "main" ]]; then
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=false
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}}
elif [[ "$GITHUB_REF_NAME" == "release" ]]; then
gh workflow --repo neondatabase/aws run deploy-dev.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f deployPreprodRegion=true
gh workflow --repo neondatabase/aws run deploy-prod.yml --ref main -f branch=main -f dockerTag=${{needs.tag.outputs.build-tag}} -f disclamerAcknowledged=true
else
echo "GITHUB_REF_NAME (value '$GITHUB_REF_NAME') is not set to either 'main' or 'release'"

54
Cargo.lock generated
View File

@@ -636,7 +636,7 @@ dependencies = [
"sha1",
"sync_wrapper",
"tokio",
"tokio-tungstenite",
"tokio-tungstenite 0.20.0",
"tower",
"tower-layer",
"tower-service",
@@ -1941,15 +1941,15 @@ dependencies = [
[[package]]
name = "hyper-tungstenite"
version = "0.11.1"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cc7dcb1ab67cd336f468a12491765672e61a3b6b148634dbfe2fe8acd3fe7d9"
checksum = "880b8b1c98a5ec2a505c7c90db6d3f6f1f480af5655d9c5b55facc9382a5a5b5"
dependencies = [
"hyper",
"pin-project-lite",
"pin-project",
"tokio",
"tokio-tungstenite",
"tungstenite",
"tokio-tungstenite 0.18.0",
"tungstenite 0.18.0",
]
[[package]]
@@ -2908,9 +2908,9 @@ dependencies = [
[[package]]
name = "pin-project-lite"
version = "0.2.13"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58"
checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]]
name = "pin-utils"
@@ -4641,6 +4641,18 @@ dependencies = [
"xattr",
]
[[package]]
name = "tokio-tungstenite"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd"
dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite 0.18.0",
]
[[package]]
name = "tokio-tungstenite"
version = "0.20.0"
@@ -4650,7 +4662,7 @@ dependencies = [
"futures-util",
"log",
"tokio",
"tungstenite",
"tungstenite 0.20.0",
]
[[package]]
@@ -4965,9 +4977,28 @@ checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"
[[package]]
name = "tungstenite"
version = "0.20.1"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e3dac10fd62eaf6617d3a904ae222845979aec67c615d1c842b4002c7666fb9"
checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788"
dependencies = [
"base64 0.13.1",
"byteorder",
"bytes",
"http",
"httparse",
"log",
"rand",
"sha1",
"thiserror",
"url",
"utf-8",
]
[[package]]
name = "tungstenite"
version = "0.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e862a1c4128df0112ab625f55cd5c934bcb4312ba80b39ae4b4835a3fd58e649"
dependencies = [
"byteorder",
"bytes",
@@ -5617,7 +5648,6 @@ dependencies = [
"tower",
"tracing",
"tracing-core",
"tungstenite",
"url",
"uuid",
]

View File

@@ -78,7 +78,7 @@ hostname = "0.3.1"
humantime = "2.1"
humantime-serde = "1.1.1"
hyper = "0.14"
hyper-tungstenite = "0.11"
hyper-tungstenite = "0.9"
inotify = "0.10.2"
itertools = "0.10"
jsonwebtoken = "8"

View File

@@ -589,7 +589,8 @@ RUN case "${PG_VERSION}" in \
echo "${PG_EMBEDDING_CHECKSUM} pg_embedding.tar.gz" | sha256sum --check && \
mkdir pg_embedding-src && cd pg_embedding-src && tar xvzf ../pg_embedding.tar.gz --strip-components=1 -C . && \
make -j $(getconf _NPROCESSORS_ONLN) && \
make -j $(getconf _NPROCESSORS_ONLN) install
make -j $(getconf _NPROCESSORS_ONLN) install && \
echo 'trusted = true' >> /usr/local/pgsql/share/extension/embedding.control
#########################################################################################
#

View File

@@ -153,6 +153,18 @@ neon-pg-ext-%: postgres-%
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile install
# pg_embedding was temporarily released as hnsw from this repo, when we only
# supported PostgreSQL 14 and 15
neon-pg-ext-v14: neon-pg-ext-hnsw-v14
neon-pg-ext-v15: neon-pg-ext-hnsw-v15
neon-pg-ext-hnsw-%: postgres-headers-% postgres-%
+@echo "Compiling hnsw $*"
mkdir -p $(POSTGRES_INSTALL_DIR)/build/hnsw-$*
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config CFLAGS='$(PG_CFLAGS) $(COPT)' \
-C $(POSTGRES_INSTALL_DIR)/build/hnsw-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/hnsw/Makefile install
.PHONY: neon-pg-ext-clean-%
neon-pg-ext-clean-%:
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
@@ -167,6 +179,9 @@ neon-pg-ext-clean-%:
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/neon-utils-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/neon_utils/Makefile clean
$(MAKE) PG_CONFIG=$(POSTGRES_INSTALL_DIR)/$*/bin/pg_config \
-C $(POSTGRES_INSTALL_DIR)/build/hnsw-$* \
-f $(ROOT_PROJECT_DIR)/pgxn/hnsw/Makefile clean
.PHONY: neon-pg-ext
neon-pg-ext: \

View File

@@ -29,13 +29,13 @@ See developer documentation in [SUMMARY.md](/docs/SUMMARY.md) for more informati
```bash
apt install build-essential libtool libreadline-dev zlib1g-dev flex bison libseccomp-dev \
libssl-dev clang pkg-config libpq-dev cmake postgresql-client protobuf-compiler \
libcurl4-openssl-dev openssl python-poetry lsof libicu-dev
libcurl4-openssl-dev openssl python-poetry lsof
```
* On Fedora, these packages are needed:
```bash
dnf install flex bison readline-devel zlib-devel openssl-devel \
libseccomp-devel perl clang cmake postgresql postgresql-contrib protobuf-compiler \
protobuf-devel libcurl-devel openssl poetry lsof libicu-devel
protobuf-devel libcurl-devel openssl poetry lsof
```
* On Arch based systems, these packages are needed:
```bash

View File

@@ -3,8 +3,6 @@
//! Currently it only analyzes holes, which are regions within the layer range that the layer contains no updates for. In the future it might do more analysis (maybe key quantiles?) but it should never return sensitive data.
use anyhow::Result;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use std::cmp::Ordering;
use std::collections::BinaryHeap;
@@ -98,9 +96,9 @@ pub(crate) fn parse_filename(name: &str) -> Option<LayerFile> {
}
// Finds the max_holes largest holes, ignoring any that are smaller than MIN_HOLE_LENGTH"
async fn get_holes(path: &Path, max_holes: usize, ctx: &RequestContext) -> Result<Vec<Hole>> {
async fn get_holes(path: &Path, max_holes: usize) -> Result<Vec<Hole>> {
let file = FileBlockReader::new(VirtualFile::open(path).await?);
let summary_blk = file.read_blk(0, ctx).await?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
actual_summary.index_start_blk,
@@ -127,7 +125,6 @@ async fn get_holes(path: &Path, max_holes: usize, ctx: &RequestContext) -> Resul
prev_key = Some(curr.next());
true
},
ctx,
)
.await?;
let mut holes = heap.into_vec();
@@ -138,7 +135,6 @@ async fn get_holes(path: &Path, max_holes: usize, ctx: &RequestContext) -> Resul
pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
let storage_path = &cmd.path;
let max_holes = cmd.max_holes.unwrap_or(DEFAULT_MAX_HOLES);
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
// Initialize virtual_file (file desriptor cache) and page cache which are needed to access layer persistent B-Tree.
pageserver::virtual_file::init(10);
@@ -167,7 +163,7 @@ pub(crate) async fn main(cmd: &AnalyzeLayerMapCmd) -> Result<()> {
parse_filename(&layer.file_name().into_string().unwrap())
{
if layer_file.is_delta {
layer_file.holes = get_holes(&layer.path(), max_holes, &ctx).await?;
layer_file.holes = get_holes(&layer.path(), max_holes).await?;
n_deltas += 1;
}
layers.push(layer_file);

View File

@@ -2,8 +2,6 @@ use std::path::{Path, PathBuf};
use anyhow::Result;
use clap::Subcommand;
use pageserver::context::{DownloadBehavior, RequestContext};
use pageserver::task_mgr::TaskKind;
use pageserver::tenant::block_io::BlockCursor;
use pageserver::tenant::disk_btree::DiskBtreeReader;
use pageserver::tenant::storage_layer::delta_layer::{BlobRef, Summary};
@@ -46,12 +44,12 @@ pub(crate) enum LayerCmd {
},
}
async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result<()> {
async fn read_delta_file(path: impl AsRef<Path>) -> Result<()> {
let path = path.as_ref();
virtual_file::init(10);
page_cache::init(100);
let file = FileBlockReader::new(VirtualFile::open(path).await?);
let summary_blk = file.read_blk(0, ctx).await?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
let tree_reader = DiskBtreeReader::<_, DELTA_KEY_SIZE>::new(
actual_summary.index_start_blk,
@@ -69,12 +67,11 @@ async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result
all.push((curr, BlobRef(value_offset)));
true
},
ctx,
)
.await?;
let cursor = BlockCursor::new_fileblockreader(&file);
for (k, v) in all {
let value = cursor.read_blob(v.pos(), ctx).await?;
let value = cursor.read_blob(v.pos()).await?;
println!("key:{} value_len:{}", k, value.len());
}
// TODO(chi): special handling for last key?
@@ -82,7 +79,6 @@ async fn read_delta_file(path: impl AsRef<Path>, ctx: &RequestContext) -> Result
}
pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
match cmd {
LayerCmd::List { path } => {
for tenant in fs::read_dir(path.join(TENANTS_SEGMENT_NAME))? {
@@ -157,7 +153,7 @@ pub(crate) async fn main(cmd: &LayerCmd) -> Result<()> {
);
if layer_file.is_delta {
read_delta_file(layer.path(), &ctx).await?;
read_delta_file(layer.path()).await?;
} else {
anyhow::bail!("not supported yet :(");
}

View File

@@ -75,12 +75,12 @@ pub async fn import_timeline_from_postgres_datadir(
{
pg_control = Some(control_file);
}
modification.flush(ctx).await?;
modification.flush().await?;
}
}
// We're done importing all the data files.
modification.commit(ctx).await?;
modification.commit().await?;
// We expect the Postgres server to be shut down cleanly.
let pg_control = pg_control.context("pg_control file not found")?;
@@ -359,7 +359,7 @@ pub async fn import_basebackup_from_tar(
// We found the pg_control file.
pg_control = Some(res);
}
modification.flush(ctx).await?;
modification.flush().await?;
}
tokio_tar::EntryType::Directory => {
debug!("directory {:?}", file_path);
@@ -377,7 +377,7 @@ pub async fn import_basebackup_from_tar(
// sanity check: ensure that pg_control is loaded
let _pg_control = pg_control.context("pg_control file not found")?;
modification.commit(ctx).await?;
modification.commit().await?;
Ok(())
}

View File

@@ -1,4 +1,3 @@
use enum_map::EnumMap;
use metrics::metric_vec_duration::DurationResultObserver;
use metrics::{
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
@@ -128,24 +127,22 @@ pub(crate) static MATERIALIZED_PAGE_CACHE_HIT: Lazy<IntCounter> = Lazy::new(|| {
.expect("failed to define a metric")
});
pub struct PageCacheMetricsForTaskKind {
pub struct PageCacheMetrics {
pub read_accesses_materialized_page: IntCounter,
pub read_accesses_ephemeral: IntCounter,
pub read_accesses_immutable: IntCounter,
pub read_hits_ephemeral: IntCounter,
pub read_hits_immutable: IntCounter,
pub read_hits_materialized_page_exact: IntCounter,
pub read_hits_materialized_page_older_lsn: IntCounter,
}
pub struct PageCacheMetrics {
by_task_kind: EnumMap<TaskKind, PageCacheMetricsForTaskKind>,
}
static PAGE_CACHE_READ_HITS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_read_hits_total",
"Number of read accesses to the page cache that hit",
&["task_kind", "key_kind", "hit_kind"]
&["key_kind", "hit_kind"]
)
.expect("failed to define a metric")
});
@@ -154,55 +151,55 @@ static PAGE_CACHE_READ_ACCESSES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"pageserver_page_cache_read_accesses_total",
"Number of read accesses to the page cache",
&["task_kind", "key_kind"]
&["key_kind"]
)
.expect("failed to define a metric")
});
pub static PAGE_CACHE: Lazy<PageCacheMetrics> = Lazy::new(|| PageCacheMetrics {
by_task_kind: EnumMap::from_array(std::array::from_fn(|task_kind| {
let task_kind = <TaskKind as enum_map::Enum>::from_usize(task_kind);
let task_kind: &'static str = task_kind.into();
PageCacheMetricsForTaskKind {
read_accesses_materialized_page: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&[task_kind, "materialized_page"])
.unwrap()
},
read_accesses_materialized_page: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["materialized_page"])
.unwrap()
},
read_accesses_immutable: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&[task_kind, "immutable"])
.unwrap()
},
read_accesses_ephemeral: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["ephemeral"])
.unwrap()
},
read_hits_immutable: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&[task_kind, "immutable", "-"])
.unwrap()
},
read_accesses_immutable: {
PAGE_CACHE_READ_ACCESSES
.get_metric_with_label_values(&["immutable"])
.unwrap()
},
read_hits_materialized_page_exact: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&[task_kind, "materialized_page", "exact"])
.unwrap()
},
read_hits_ephemeral: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["ephemeral", "-"])
.unwrap()
},
read_hits_materialized_page_older_lsn: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&[task_kind, "materialized_page", "older_lsn"])
.unwrap()
},
}
})),
read_hits_immutable: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["immutable", "-"])
.unwrap()
},
read_hits_materialized_page_exact: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["materialized_page", "exact"])
.unwrap()
},
read_hits_materialized_page_older_lsn: {
PAGE_CACHE_READ_HITS
.get_metric_with_label_values(&["materialized_page", "older_lsn"])
.unwrap()
},
});
impl PageCacheMetrics {
pub(crate) fn for_task_kind(&self, task_kind: TaskKind) -> &PageCacheMetricsForTaskKind {
&self.by_task_kind[task_kind]
}
}
pub struct PageCacheSizeMetrics {
pub max_bytes: UIntGauge,
@@ -1283,8 +1280,6 @@ use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use crate::task_mgr::TaskKind;
pub struct RemoteTimelineClientMetrics {
tenant_id: String,
timeline_id: String,

View File

@@ -85,7 +85,7 @@ use utils::{
lsn::Lsn,
};
use crate::{context::RequestContext, metrics::PageCacheSizeMetrics, repository::Key};
use crate::{metrics::PageCacheSizeMetrics, repository::Key};
static PAGE_CACHE: OnceCell<PageCache> = OnceCell::new();
const TEST_PAGE_CACHE_SIZE: usize = 50;
@@ -346,10 +346,8 @@ impl PageCache {
timeline_id: TimelineId,
key: &Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, PageReadGuard)> {
crate::metrics::PAGE_CACHE
.for_task_kind(ctx.task_kind())
.read_accesses_materialized_page
.inc();
@@ -370,12 +368,10 @@ impl PageCache {
{
if available_lsn == lsn {
crate::metrics::PAGE_CACHE
.for_task_kind(ctx.task_kind())
.read_hits_materialized_page_exact
.inc();
} else {
crate::metrics::PAGE_CACHE
.for_task_kind(ctx.task_kind())
.read_hits_materialized_page_older_lsn
.inc();
}
@@ -430,11 +426,10 @@ impl PageCache {
&self,
file_id: FileId,
blkno: u32,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
let mut cache_key = CacheKey::ImmutableFilePage { file_id, blkno };
self.lock_for_read(&mut cache_key, ctx).await
self.lock_for_read(&mut cache_key).await
}
//
@@ -502,22 +497,14 @@ impl PageCache {
/// }
/// ```
///
async fn lock_for_read(
&self,
cache_key: &mut CacheKey,
ctx: &RequestContext,
) -> anyhow::Result<ReadBufResult> {
async fn lock_for_read(&self, cache_key: &mut CacheKey) -> anyhow::Result<ReadBufResult> {
let (read_access, hit) = match cache_key {
CacheKey::MaterializedPage { .. } => {
unreachable!("Materialized pages use lookup_materialized_page")
}
CacheKey::ImmutableFilePage { .. } => (
&crate::metrics::PAGE_CACHE
.for_task_kind(ctx.task_kind())
.read_accesses_immutable,
&crate::metrics::PAGE_CACHE
.for_task_kind(ctx.task_kind())
.read_hits_immutable,
&crate::metrics::PAGE_CACHE.read_accesses_immutable,
&crate::metrics::PAGE_CACHE.read_hits_immutable,
),
};
read_access.inc();

View File

@@ -1138,7 +1138,7 @@ impl<'a> DatadirModification<'a> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
pub async fn flush(&mut self) -> anyhow::Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
@@ -1154,7 +1154,7 @@ impl<'a> DatadirModification<'a> {
if is_rel_block_key(key) || is_slru_block_key(key) {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put(key, self.lsn, &value, ctx).await?;
writer.put(key, self.lsn, &value).await?;
} else {
retained_pending_updates.insert(key, value);
}
@@ -1174,14 +1174,14 @@ impl<'a> DatadirModification<'a> {
/// underlying timeline.
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
pub async fn commit(&mut self) -> anyhow::Result<()> {
let writer = self.tline.writer().await;
let lsn = self.lsn;
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;
for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, &value, ctx).await?;
writer.put(key, lsn, &value).await?;
}
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn).await?;

View File

@@ -187,7 +187,6 @@ task_local! {
Debug,
// NB: enumset::EnumSetType derives PartialEq, Eq, Clone, Copy
enumset::EnumSetType,
enum_map::Enum,
serde::Serialize,
serde::Deserialize,
strum_macros::IntoStaticStr,

View File

@@ -1504,7 +1504,7 @@ impl Tenant {
.init_empty_test_timeline()
.context("init_empty_test_timeline")?;
modification
.commit(ctx)
.commit()
.await
.context("commit init_empty_test_timeline modification")?;
@@ -3538,24 +3538,14 @@ mod tests {
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
Lsn(0x10),
&Value::Image(TEST_IMG("foo at 0x10")),
&ctx,
)
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
.await?;
writer.finish_write(Lsn(0x10));
drop(writer);
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
Lsn(0x20),
&Value::Image(TEST_IMG("foo at 0x20")),
&ctx,
)
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
.await?;
writer.finish_write(Lsn(0x20));
drop(writer);
@@ -3629,19 +3619,19 @@ mod tests {
// Insert a value on the timeline
writer
.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"), &ctx)
.put(TEST_KEY_A, Lsn(0x20), &test_value("foo at 0x20"))
.await?;
writer
.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"), &ctx)
.put(TEST_KEY_B, Lsn(0x20), &test_value("foobar at 0x20"))
.await?;
writer.finish_write(Lsn(0x20));
writer
.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"), &ctx)
.put(TEST_KEY_A, Lsn(0x30), &test_value("foo at 0x30"))
.await?;
writer.finish_write(Lsn(0x30));
writer
.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"), &ctx)
.put(TEST_KEY_A, Lsn(0x40), &test_value("foo at 0x40"))
.await?;
writer.finish_write(Lsn(0x40));
@@ -3656,7 +3646,7 @@ mod tests {
.expect("Should have a local timeline");
let new_writer = newtline.writer().await;
new_writer
.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"), &ctx)
.put(TEST_KEY_A, Lsn(0x40), &test_value("bar at 0x40"))
.await?;
new_writer.finish_write(Lsn(0x40));
@@ -3679,11 +3669,7 @@ mod tests {
Ok(())
}
async fn make_some_layers(
tline: &Timeline,
start_lsn: Lsn,
ctx: &RequestContext,
) -> anyhow::Result<()> {
async fn make_some_layers(tline: &Timeline, start_lsn: Lsn) -> anyhow::Result<()> {
let mut lsn = start_lsn;
#[allow(non_snake_case)]
{
@@ -3694,7 +3680,6 @@ mod tests {
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
ctx,
)
.await?;
writer.finish_write(lsn);
@@ -3704,7 +3689,6 @@ mod tests {
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
ctx,
)
.await?;
writer.finish_write(lsn);
@@ -3718,7 +3702,6 @@ mod tests {
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
ctx,
)
.await?;
writer.finish_write(lsn);
@@ -3728,7 +3711,6 @@ mod tests {
*TEST_KEY,
lsn,
&Value::Image(TEST_IMG(&format!("foo at {}", lsn))),
ctx,
)
.await?;
writer.finish_write(lsn);
@@ -3745,7 +3727,7 @@ mod tests {
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
// FIXME: this doesn't actually remove any layer currently, given how the flushing
@@ -3819,7 +3801,7 @@ mod tests {
.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO)?;
let latest_gc_cutoff_lsn = tline.get_latest_gc_cutoff_lsn();
@@ -3841,7 +3823,7 @@ mod tests {
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
@@ -3850,7 +3832,7 @@ mod tests {
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
tline.set_broken("test".to_owned());
@@ -3891,7 +3873,7 @@ mod tests {
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
@@ -3916,7 +3898,7 @@ mod tests {
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
@@ -3925,7 +3907,7 @@ mod tests {
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
// run gc on parent
tenant
@@ -3950,7 +3932,7 @@ mod tests {
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x7000), DEFAULT_PG_VERSION, &ctx)
.await?;
make_some_layers(tline.as_ref(), Lsn(0x8000), &ctx).await?;
make_some_layers(tline.as_ref(), Lsn(0x8000)).await?;
// so that all uploads finish & we can call harness.load() below again
tenant
.shutdown(Default::default(), true)
@@ -3979,7 +3961,7 @@ mod tests {
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
let child_tline = tenant
.branch_timeline_test(&tline, NEW_TIMELINE_ID, Some(Lsn(0x40)), &ctx)
@@ -3990,7 +3972,7 @@ mod tests {
.get_timeline(NEW_TIMELINE_ID, true)
.expect("Should have a local timeline");
make_some_layers(newtline.as_ref(), Lsn(0x60), &ctx).await?;
make_some_layers(newtline.as_ref(), Lsn(0x60)).await?;
// so that all uploads finish & we can call harness.load() below again
tenant
@@ -4022,7 +4004,7 @@ mod tests {
let tline = tenant
.create_test_timeline(TIMELINE_ID, Lsn(0x10), DEFAULT_PG_VERSION, &ctx)
.await?;
make_some_layers(tline.as_ref(), Lsn(0x20), &ctx).await?;
make_some_layers(tline.as_ref(), Lsn(0x20)).await?;
let layer_map = tline.layers.read().await;
let level0_deltas = layer_map.layer_map().get_level0_deltas()?;
@@ -4105,12 +4087,7 @@ mod tests {
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
Lsn(0x10),
&Value::Image(TEST_IMG("foo at 0x10")),
&ctx,
)
.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))
.await?;
writer.finish_write(Lsn(0x10));
drop(writer);
@@ -4120,12 +4097,7 @@ mod tests {
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
Lsn(0x20),
&Value::Image(TEST_IMG("foo at 0x20")),
&ctx,
)
.put(*TEST_KEY, Lsn(0x20), &Value::Image(TEST_IMG("foo at 0x20")))
.await?;
writer.finish_write(Lsn(0x20));
drop(writer);
@@ -4135,12 +4107,7 @@ mod tests {
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
Lsn(0x30),
&Value::Image(TEST_IMG("foo at 0x30")),
&ctx,
)
.put(*TEST_KEY, Lsn(0x30), &Value::Image(TEST_IMG("foo at 0x30")))
.await?;
writer.finish_write(Lsn(0x30));
drop(writer);
@@ -4150,12 +4117,7 @@ mod tests {
let writer = tline.writer().await;
writer
.put(
*TEST_KEY,
Lsn(0x40),
&Value::Image(TEST_IMG("foo at 0x40")),
&ctx,
)
.put(*TEST_KEY, Lsn(0x40), &Value::Image(TEST_IMG("foo at 0x40")))
.await?;
writer.finish_write(Lsn(0x40));
drop(writer);
@@ -4213,7 +4175,6 @@ mod tests {
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
@@ -4266,7 +4227,6 @@ mod tests {
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
@@ -4287,7 +4247,6 @@ mod tests {
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
@@ -4347,7 +4306,6 @@ mod tests {
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
writer.finish_write(lsn);
@@ -4376,7 +4334,6 @@ mod tests {
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
&ctx,
)
.await?;
println!("updating {} at {}", blknum, lsn);
@@ -4445,7 +4402,6 @@ mod tests {
test_key,
lsn,
&Value::Image(TEST_IMG(&format!("{} {} at {}", idx, blknum, lsn))),
&ctx,
)
.await?;
println!("updating [{}][{}] at {}", idx, blknum, lsn);
@@ -4518,7 +4474,7 @@ mod tests {
.init_empty_test_timeline()
.context("init_empty_test_timeline")?;
modification
.commit(&ctx)
.commit()
.await
.context("commit init_empty_test_timeline modification")?;

View File

@@ -11,7 +11,6 @@
//! len < 128: 0XXXXXXX
//! len >= 128: 1XXXXXXX XXXXXXXX XXXXXXXX XXXXXXXX
//!
use crate::context::RequestContext;
use crate::page_cache::PAGE_SZ;
use crate::tenant::block_io::BlockCursor;
use crate::virtual_file::VirtualFile;
@@ -20,13 +19,9 @@ use std::io::{Error, ErrorKind};
impl<'a> BlockCursor<'a> {
/// Read a blob into a new buffer.
pub async fn read_blob(
&self,
offset: u64,
ctx: &RequestContext,
) -> Result<Vec<u8>, std::io::Error> {
pub async fn read_blob(&self, offset: u64) -> Result<Vec<u8>, std::io::Error> {
let mut buf = Vec::new();
self.read_blob_into_buf(offset, &mut buf, ctx).await?;
self.read_blob_into_buf(offset, &mut buf).await?;
Ok(buf)
}
/// Read blob into the given buffer. Any previous contents in the buffer
@@ -35,12 +30,11 @@ impl<'a> BlockCursor<'a> {
&self,
offset: u64,
dstbuf: &mut Vec<u8>,
ctx: &RequestContext,
) -> Result<(), std::io::Error> {
let mut blknum = (offset / PAGE_SZ as u64) as u32;
let mut off = (offset % PAGE_SZ as u64) as usize;
let mut buf = self.read_blk(blknum, ctx).await?;
let mut buf = self.read_blk(blknum).await?;
// peek at the first byte, to determine if it's a 1- or 4-byte length
let first_len_byte = buf[off];
@@ -56,7 +50,7 @@ impl<'a> BlockCursor<'a> {
// it is split across two pages
len_buf[..thislen].copy_from_slice(&buf[off..PAGE_SZ]);
blknum += 1;
buf = self.read_blk(blknum, ctx).await?;
buf = self.read_blk(blknum).await?;
len_buf[thislen..].copy_from_slice(&buf[0..4 - thislen]);
off = 4 - thislen;
} else {
@@ -77,7 +71,7 @@ impl<'a> BlockCursor<'a> {
if page_remain == 0 {
// continue on next page
blknum += 1;
buf = self.read_blk(blknum, ctx).await?;
buf = self.read_blk(blknum).await?;
off = 0;
page_remain = PAGE_SZ;
}
@@ -234,13 +228,12 @@ impl BlobWriter<false> {
#[cfg(test)]
mod tests {
use super::*;
use crate::{context::DownloadBehavior, task_mgr::TaskKind, tenant::block_io::BlockReaderRef};
use crate::tenant::block_io::BlockReaderRef;
use rand::{Rng, SeedableRng};
async fn round_trip_test<const BUFFERED: bool>(blobs: &[Vec<u8>]) -> Result<(), Error> {
let temp_dir = tempfile::tempdir()?;
let path = temp_dir.path().join("file");
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
// Write part (in block to drop the file)
let mut offsets = Vec::new();
@@ -262,7 +255,7 @@ mod tests {
let rdr = BlockReaderRef::VirtualFile(&file);
let rdr = BlockCursor::new(rdr);
for (idx, (blob, offset)) in blobs.iter().zip(offsets.iter()).enumerate() {
let blob_read = rdr.read_blob(*offset, &ctx).await?;
let blob_read = rdr.read_blob(*offset).await?;
assert_eq!(
blob, &blob_read,
"mismatch for idx={idx} at offset={offset}"

View File

@@ -4,7 +4,6 @@
use super::ephemeral_file::EphemeralFile;
use super::storage_layer::delta_layer::{Adapter, DeltaLayerInner};
use crate::context::RequestContext;
use crate::page_cache::{self, PageReadGuard, ReadBufResult, PAGE_SZ};
use crate::virtual_file::VirtualFile;
use bytes::Bytes;
@@ -83,16 +82,12 @@ pub(crate) enum BlockReaderRef<'a> {
impl<'a> BlockReaderRef<'a> {
#[inline(always)]
async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
use BlockReaderRef::*;
match self {
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
EphemeralFile(r) => r.read_blk(blknum, ctx).await,
Adapter(r) => r.read_blk(blknum, ctx).await,
FileBlockReader(r) => r.read_blk(blknum).await,
EphemeralFile(r) => r.read_blk(blknum).await,
Adapter(r) => r.read_blk(blknum).await,
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
@@ -110,13 +105,11 @@ impl<'a> BlockReaderRef<'a> {
///
/// ```no_run
/// # use pageserver::tenant::block_io::{BlockReader, FileBlockReader};
/// # use pageserver::context::RequestContext;
/// # let reader: FileBlockReader = unimplemented!("stub");
/// # let ctx: RequestContext = unimplemented!("stub");
/// let cursor = reader.block_cursor();
/// let buf = cursor.read_blk(1, &ctx);
/// let buf = cursor.read_blk(1);
/// // do stuff with 'buf'
/// let buf = cursor.read_blk(2, &ctx);
/// let buf = cursor.read_blk(2);
/// // do stuff with 'buf'
/// ```
///
@@ -141,12 +134,8 @@ impl<'a> BlockCursor<'a> {
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
#[inline(always)]
pub async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
self.reader.read_blk(blknum, ctx).await
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.reader.read_blk(blknum).await
}
}
@@ -180,15 +169,11 @@ impl FileBlockReader {
/// Returns a "lease" object that can be used to
/// access to the contents of the page. (For the page cache, the
/// lease object represents a lock on the buffer.)
pub async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
pub async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.file_id, blknum, ctx)
.read_immutable_buf(self.file_id, blknum)
.await
.map_err(|e| {
std::io::Error::new(

View File

@@ -26,11 +26,7 @@ use std::{cmp::Ordering, io, result};
use thiserror::Error;
use tracing::error;
use crate::{
context::{DownloadBehavior, RequestContext},
task_mgr::TaskKind,
tenant::block_io::{BlockReader, BlockWriter},
};
use crate::tenant::block_io::{BlockReader, BlockWriter};
// The maximum size of a value stored in the B-tree. 5 bytes is enough currently.
pub const VALUE_SZ: usize = 5;
@@ -235,19 +231,14 @@ where
///
/// Read the value for given key. Returns the value, or None if it doesn't exist.
///
pub async fn get(&self, search_key: &[u8; L], ctx: &RequestContext) -> Result<Option<u64>> {
pub async fn get(&self, search_key: &[u8; L]) -> Result<Option<u64>> {
let mut result: Option<u64> = None;
self.visit(
search_key,
VisitDirection::Forwards,
|key, value| {
if key == search_key {
result = Some(value);
}
false
},
ctx,
)
self.visit(search_key, VisitDirection::Forwards, |key, value| {
if key == search_key {
result = Some(value);
}
false
})
.await?;
Ok(result)
}
@@ -262,7 +253,6 @@ where
search_key: &[u8; L],
dir: VisitDirection,
mut visitor: V,
ctx: &RequestContext,
) -> Result<bool>
where
V: FnMut(&[u8], u64) -> bool,
@@ -272,9 +262,7 @@ where
let block_cursor = self.reader.block_cursor();
while let Some((node_blknum, opt_iter)) = stack.pop() {
// Locate the node.
let node_buf = block_cursor
.read_blk(self.start_blk + node_blknum, ctx)
.await?;
let node_buf = block_cursor.read_blk(self.start_blk + node_blknum).await?;
let node = OnDiskNode::deparse(node_buf.as_ref())?;
let prefix_len = node.prefix_len as usize;
@@ -363,14 +351,13 @@ where
#[allow(dead_code)]
pub async fn dump(&self) -> Result<()> {
let mut stack = Vec::new();
let ctx = RequestContext::new(TaskKind::DebugTool, DownloadBehavior::Error);
stack.push((self.root_blk, String::new(), 0, 0, 0));
let block_cursor = self.reader.block_cursor();
while let Some((blknum, path, depth, child_idx, key_off)) = stack.pop() {
let blk = block_cursor.read_blk(self.start_blk + blknum, &ctx).await?;
let blk = block_cursor.read_blk(self.start_blk + blknum).await?;
let buf: &[u8] = blk.as_ref();
let node = OnDiskNode::<L>::deparse(buf)?;
@@ -701,8 +688,6 @@ impl<const L: usize> BuildNode<L> {
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReaderRef};
use rand::Rng;
use std::collections::BTreeMap;
@@ -740,8 +725,6 @@ pub(crate) mod tests {
let mut disk = TestDisk::new();
let mut writer = DiskBtreeBuilder::<_, 6>::new(&mut disk);
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
let all_keys: Vec<&[u8; 6]> = vec![
b"xaaaaa", b"xaaaba", b"xaaaca", b"xabaaa", b"xababa", b"xabaca", b"xabada", b"xabadb",
];
@@ -762,12 +745,12 @@ pub(crate) mod tests {
// Test the `get` function on all the keys.
for (key, val) in all_data.iter() {
assert_eq!(reader.get(key, &ctx).await?, Some(*val));
assert_eq!(reader.get(key).await?, Some(*val));
}
// And on some keys that don't exist
assert_eq!(reader.get(b"aaaaaa", &ctx).await?, None);
assert_eq!(reader.get(b"zzzzzz", &ctx).await?, None);
assert_eq!(reader.get(b"xaaabx", &ctx).await?, None);
assert_eq!(reader.get(b"aaaaaa").await?, None);
assert_eq!(reader.get(b"zzzzzz").await?, None);
assert_eq!(reader.get(b"xaaabx").await?, None);
// Test search with `visit` function
let search_key = b"xabaaa";
@@ -779,15 +762,10 @@ pub(crate) mod tests {
let mut data = Vec::new();
reader
.visit(
search_key,
VisitDirection::Forwards,
|key, value| {
data.push((key.to_vec(), value));
true
},
&ctx,
)
.visit(search_key, VisitDirection::Forwards, |key, value| {
data.push((key.to_vec(), value));
true
})
.await?;
assert_eq!(data, expected);
@@ -800,28 +778,18 @@ pub(crate) mod tests {
expected.reverse();
let mut data = Vec::new();
reader
.visit(
search_key,
VisitDirection::Backwards,
|key, value| {
data.push((key.to_vec(), value));
true
},
&ctx,
)
.visit(search_key, VisitDirection::Backwards, |key, value| {
data.push((key.to_vec(), value));
true
})
.await?;
assert_eq!(data, expected);
// Backward scan where nothing matches
reader
.visit(
b"aaaaaa",
VisitDirection::Backwards,
|key, value| {
panic!("found unexpected key {}: {}", hex::encode(key), value);
},
&ctx,
)
.visit(b"aaaaaa", VisitDirection::Backwards, |key, value| {
panic!("found unexpected key {}: {}", hex::encode(key), value);
})
.await?;
// Full scan
@@ -831,15 +799,10 @@ pub(crate) mod tests {
.collect();
let mut data = Vec::new();
reader
.visit(
&[0u8; 6],
VisitDirection::Forwards,
|key, value| {
data.push((key.to_vec(), value));
true
},
&ctx,
)
.visit(&[0u8; 6], VisitDirection::Forwards, |key, value| {
data.push((key.to_vec(), value));
true
})
.await?;
assert_eq!(data, expected);
@@ -850,7 +813,6 @@ pub(crate) mod tests {
async fn lots_of_keys() -> Result<()> {
let mut disk = TestDisk::new();
let mut writer = DiskBtreeBuilder::<_, 8>::new(&mut disk);
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
const NUM_KEYS: u64 = 1000;
@@ -889,14 +851,14 @@ pub(crate) mod tests {
for search_key_int in 0..(NUM_KEYS * 2 + 10) {
let search_key = u64::to_be_bytes(search_key_int);
assert_eq!(
reader.get(&search_key, &ctx).await?,
reader.get(&search_key).await?,
all_data.get(&search_key_int).cloned()
);
// Test a forward scan starting with this key
result.lock().unwrap().clear();
reader
.visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
.visit(&search_key, VisitDirection::Forwards, take_ten)
.await?;
let expected = all_data
.range(search_key_int..)
@@ -908,7 +870,7 @@ pub(crate) mod tests {
// And a backwards scan
result.lock().unwrap().clear();
reader
.visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
.visit(&search_key, VisitDirection::Backwards, take_ten)
.await?;
let expected = all_data
.range(..=search_key_int)
@@ -924,7 +886,7 @@ pub(crate) mod tests {
limit.store(usize::MAX, Ordering::Relaxed);
result.lock().unwrap().clear();
reader
.visit(&search_key, VisitDirection::Forwards, take_ten, &ctx)
.visit(&search_key, VisitDirection::Forwards, take_ten)
.await?;
let expected = all_data
.iter()
@@ -937,7 +899,7 @@ pub(crate) mod tests {
limit.store(usize::MAX, Ordering::Relaxed);
result.lock().unwrap().clear();
reader
.visit(&search_key, VisitDirection::Backwards, take_ten, &ctx)
.visit(&search_key, VisitDirection::Backwards, take_ten)
.await?;
let expected = all_data
.iter()
@@ -951,8 +913,6 @@ pub(crate) mod tests {
#[tokio::test]
async fn random_data() -> Result<()> {
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
// Generate random keys with exponential distribution, to
// exercise the prefix compression
const NUM_KEYS: usize = 100000;
@@ -979,24 +939,22 @@ pub(crate) mod tests {
// Test get() operation on all the keys
for (&key, &val) in all_data.iter() {
let search_key = u128::to_be_bytes(key);
assert_eq!(reader.get(&search_key, &ctx).await?, Some(val));
assert_eq!(reader.get(&search_key).await?, Some(val));
}
// Test get() operations on random keys, most of which will not exist
for _ in 0..100000 {
let key_int = rand::thread_rng().gen::<u128>();
let search_key = u128::to_be_bytes(key_int);
assert!(reader.get(&search_key, &ctx).await? == all_data.get(&key_int).cloned());
assert!(reader.get(&search_key).await? == all_data.get(&key_int).cloned());
}
// Test boundary cases
assert!(
reader.get(&u128::to_be_bytes(u128::MIN), &ctx).await?
== all_data.get(&u128::MIN).cloned()
reader.get(&u128::to_be_bytes(u128::MIN)).await? == all_data.get(&u128::MIN).cloned()
);
assert!(
reader.get(&u128::to_be_bytes(u128::MAX), &ctx).await?
== all_data.get(&u128::MAX).cloned()
reader.get(&u128::to_be_bytes(u128::MAX)).await? == all_data.get(&u128::MAX).cloned()
);
Ok(())
@@ -1027,7 +985,6 @@ pub(crate) mod tests {
// Build a tree from it
let mut disk = TestDisk::new();
let mut writer = DiskBtreeBuilder::<_, 26>::new(&mut disk);
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
for (key, val) in disk_btree_test_data::TEST_DATA {
writer.append(&key, val)?;
@@ -1040,21 +997,16 @@ pub(crate) mod tests {
// Test get() operation on all the keys
for (key, val) in disk_btree_test_data::TEST_DATA {
assert_eq!(reader.get(&key, &ctx).await?, Some(val));
assert_eq!(reader.get(&key).await?, Some(val));
}
// Test full scan
let mut count = 0;
reader
.visit(
&[0u8; 26],
VisitDirection::Forwards,
|_key, _value| {
count += 1;
true
},
&ctx,
)
.visit(&[0u8; 26], VisitDirection::Forwards, |_key, _value| {
count += 1;
true
})
.await?;
assert_eq!(count, disk_btree_test_data::TEST_DATA.len());

View File

@@ -2,7 +2,6 @@
//! used to keep in-memory layers spilled on disk.
use crate::config::PageServerConf;
use crate::context::RequestContext;
use crate::page_cache::{self, PAGE_SZ};
use crate::tenant::block_io::{BlockCursor, BlockLease, BlockReader};
use crate::virtual_file::VirtualFile;
@@ -62,17 +61,13 @@ impl EphemeralFile {
self.len
}
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, io::Error> {
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, io::Error> {
let flushed_blknums = 0..self.len / PAGE_SZ as u64;
if flushed_blknums.contains(&(blknum as u64)) {
let cache = page_cache::get();
loop {
match cache
.read_immutable_buf(self.page_cache_file_id, blknum, ctx)
.read_immutable_buf(self.page_cache_file_id, blknum)
.await
.map_err(|e| {
std::io::Error::new(
@@ -108,11 +103,7 @@ impl EphemeralFile {
}
}
pub(crate) async fn write_blob(
&mut self,
srcbuf: &[u8],
ctx: &RequestContext,
) -> Result<u64, io::Error> {
pub(crate) async fn write_blob(&mut self, srcbuf: &[u8]) -> Result<u64, io::Error> {
struct Writer<'a> {
ephemeral_file: &'a mut EphemeralFile,
/// The block to which the next [`push_bytes`] will write.
@@ -129,11 +120,7 @@ impl EphemeralFile {
})
}
#[inline(always)]
async fn push_bytes(
&mut self,
src: &[u8],
ctx: &RequestContext,
) -> Result<(), io::Error> {
async fn push_bytes(&mut self, src: &[u8]) -> Result<(), io::Error> {
let mut src_remaining = src;
while !src_remaining.is_empty() {
let dst_remaining = &mut self.ephemeral_file.mutable_tail[self.off..];
@@ -159,7 +146,6 @@ impl EphemeralFile {
.read_immutable_buf(
self.ephemeral_file.page_cache_file_id,
self.blknum,
ctx,
)
.await
{
@@ -213,15 +199,15 @@ impl EphemeralFile {
if srcbuf.len() < 0x80 {
// short one-byte length header
let len_buf = [srcbuf.len() as u8];
writer.push_bytes(&len_buf, ctx).await?;
writer.push_bytes(&len_buf).await?;
} else {
let mut len_buf = u32::to_be_bytes(srcbuf.len() as u32);
len_buf[0] |= 0x80;
writer.push_bytes(&len_buf, ctx).await?;
writer.push_bytes(&len_buf).await?;
}
// Write the payload
writer.push_bytes(srcbuf, ctx).await?;
writer.push_bytes(srcbuf).await?;
if srcbuf.len() < 0x80 {
self.len += 1;
@@ -275,8 +261,6 @@ impl BlockReader for EphemeralFile {
#[cfg(test)]
mod tests {
use super::*;
use crate::context::DownloadBehavior;
use crate::task_mgr::TaskKind;
use crate::tenant::block_io::{BlockCursor, BlockReaderRef};
use rand::{thread_rng, RngCore};
use std::fs;
@@ -284,15 +268,7 @@ mod tests {
fn harness(
test_name: &str,
) -> Result<
(
&'static PageServerConf,
TenantId,
TimelineId,
RequestContext,
),
io::Error,
> {
) -> Result<(&'static PageServerConf, TenantId, TimelineId), io::Error> {
let repo_dir = PageServerConf::test_repo_dir(test_name);
let _ = fs::remove_dir_all(&repo_dir);
let conf = PageServerConf::dummy_conf(repo_dir);
@@ -304,57 +280,46 @@ mod tests {
let timeline_id = TimelineId::from_str("22000000000000000000000000000000").unwrap();
fs::create_dir_all(conf.timeline_path(&tenant_id, &timeline_id))?;
let ctx = RequestContext::new(TaskKind::UnitTest, DownloadBehavior::Error);
Ok((conf, tenant_id, timeline_id, ctx))
Ok((conf, tenant_id, timeline_id))
}
#[tokio::test]
async fn test_ephemeral_blobs() -> Result<(), io::Error> {
let (conf, tenant_id, timeline_id, ctx) = harness("ephemeral_blobs")?;
let (conf, tenant_id, timeline_id) = harness("ephemeral_blobs")?;
let mut file = EphemeralFile::create(conf, tenant_id, timeline_id).await?;
let pos_foo = file.write_blob(b"foo", &ctx).await?;
let pos_foo = file.write_blob(b"foo").await?;
assert_eq!(
b"foo",
file.block_cursor()
.read_blob(pos_foo, &ctx)
.await?
.as_slice()
file.block_cursor().read_blob(pos_foo).await?.as_slice()
);
let pos_bar = file.write_blob(b"bar", &ctx).await?;
let pos_bar = file.write_blob(b"bar").await?;
assert_eq!(
b"foo",
file.block_cursor()
.read_blob(pos_foo, &ctx)
.await?
.as_slice()
file.block_cursor().read_blob(pos_foo).await?.as_slice()
);
assert_eq!(
b"bar",
file.block_cursor()
.read_blob(pos_bar, &ctx)
.await?
.as_slice()
file.block_cursor().read_blob(pos_bar).await?.as_slice()
);
let mut blobs = Vec::new();
for i in 0..10000 {
let data = Vec::from(format!("blob{}", i).as_bytes());
let pos = file.write_blob(&data, &ctx).await?;
let pos = file.write_blob(&data).await?;
blobs.push((pos, data));
}
// also test with a large blobs
for i in 0..100 {
let data = format!("blob{}", i).as_bytes().repeat(100);
let pos = file.write_blob(&data, &ctx).await?;
let pos = file.write_blob(&data).await?;
blobs.push((pos, data));
}
let cursor = BlockCursor::new(BlockReaderRef::EphemeralFile(&file));
for (pos, expected) in blobs {
let actual = cursor.read_blob(pos, &ctx).await?;
let actual = cursor.read_blob(pos).await?;
assert_eq!(actual, expected);
}
@@ -362,8 +327,8 @@ mod tests {
let mut large_data = Vec::new();
large_data.resize(20000, 0);
thread_rng().fill_bytes(&mut large_data);
let pos_large = file.write_blob(&large_data, &ctx).await?;
let result = file.block_cursor().read_blob(pos_large, &ctx).await?;
let pos_large = file.write_blob(&large_data).await?;
let result = file.block_cursor().read_blob(pos_large).await?;
assert_eq!(result, large_data);
Ok(())

View File

@@ -317,11 +317,11 @@ impl DeltaLayer {
tree_reader.dump().await?;
let keys = DeltaLayerInner::load_keys(&inner, ctx).await?;
let keys = DeltaLayerInner::load_keys(&inner).await?;
// A subroutine to dump a single blob
async fn dump_blob(val: ValueRef<'_>, ctx: &RequestContext) -> Result<String> {
let buf = val.reader.read_blob(val.blob_ref.pos(), ctx).await?;
async fn dump_blob(val: ValueRef<'_>) -> Result<String> {
let buf = val.reader.read_blob(val.blob_ref.pos()).await?;
let val = Value::des(&buf)?;
let desc = match val {
Value::Image(img) => {
@@ -342,7 +342,7 @@ impl DeltaLayer {
for entry in keys {
let DeltaEntry { key, lsn, val, .. } = entry;
let desc = match dump_blob(val, ctx).await {
let desc = match dump_blob(val).await {
Ok(desc) => desc,
Err(err) => {
let err: anyhow::Error = err;
@@ -370,7 +370,7 @@ impl DeltaLayer {
.load(LayerAccessKind::GetValueReconstructData, ctx)
.await?;
inner
.get_value_reconstruct_data(key, lsn_range, reconstruct_state, ctx)
.get_value_reconstruct_data(key, lsn_range, reconstruct_state)
.await
}
@@ -453,12 +453,12 @@ impl DeltaLayer {
self.access_stats.record_access(access_kind, ctx);
// Quick exit if already loaded
self.inner
.get_or_try_init(|| self.load_inner(ctx))
.get_or_try_init(|| self.load_inner())
.await
.with_context(|| format!("Failed to load delta layer {}", self.path().display()))
}
async fn load_inner(&self, ctx: &RequestContext) -> Result<Arc<DeltaLayerInner>> {
async fn load_inner(&self) -> Result<Arc<DeltaLayerInner>> {
let path = self.path();
let summary = match &self.path_or_conf {
@@ -466,7 +466,7 @@ impl DeltaLayer {
PathOrConf::Path(_) => None,
};
let loaded = DeltaLayerInner::load(&path, summary, ctx).await?;
let loaded = DeltaLayerInner::load(&path, summary).await?;
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
@@ -554,7 +554,7 @@ impl DeltaLayer {
.load(LayerAccessKind::KeyIter, ctx)
.await
.context("load delta layer keys")?;
DeltaLayerInner::load_keys(inner, ctx)
DeltaLayerInner::load_keys(inner)
.await
.context("Layer index is corrupted")
}
@@ -849,14 +849,13 @@ impl DeltaLayerInner {
pub(super) async fn load(
path: &std::path::Path,
summary: Option<Summary>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.await
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
if let Some(mut expected_summary) = summary {
@@ -884,7 +883,6 @@ impl DeltaLayerInner {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
let mut need_image = true;
// Scan the page versions backwards, starting from `lsn`.
@@ -899,24 +897,19 @@ impl DeltaLayerInner {
let mut offsets: Vec<(Lsn, u64)> = Vec::new();
tree_reader
.visit(
&search_key.0,
VisitDirection::Backwards,
|key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
.visit(&search_key.0, VisitDirection::Backwards, |key, value| {
let blob_ref = BlobRef(value);
if key[..KEY_SIZE] != search_key.0[..KEY_SIZE] {
return false;
}
let entry_lsn = DeltaKey::extract_lsn_from_buf(key);
if entry_lsn < lsn_range.start {
return false;
}
offsets.push((entry_lsn, blob_ref.pos()));
!blob_ref.will_init()
},
ctx,
)
!blob_ref.will_init()
})
.await?;
// Ok, 'offsets' now contains the offsets of all the entries we need to read
@@ -924,7 +917,7 @@ impl DeltaLayerInner {
let mut buf = Vec::new();
for (entry_lsn, pos) in offsets {
cursor
.read_blob_into_buf(pos, &mut buf, ctx)
.read_blob_into_buf(pos, &mut buf)
.await
.with_context(|| {
format!(
@@ -965,10 +958,9 @@ impl DeltaLayerInner {
}
}
pub(super) async fn load_keys<'a, 'b, T: AsRef<DeltaLayerInner> + Clone>(
this: &'a T,
ctx: &'b RequestContext,
) -> Result<Vec<DeltaEntry<'a>>> {
pub(super) async fn load_keys<T: AsRef<DeltaLayerInner> + Clone>(
this: &T,
) -> Result<Vec<DeltaEntry<'_>>> {
let dl = this.as_ref();
let file = &dl.file;
@@ -1005,7 +997,6 @@ impl DeltaLayerInner {
all_keys.push(entry);
true
},
ctx,
)
.await?;
if let Some(last) = all_keys.last_mut() {
@@ -1035,9 +1026,9 @@ pub struct ValueRef<'a> {
impl<'a> ValueRef<'a> {
/// Loads the value from disk
pub async fn load(&self, ctx: &RequestContext) -> Result<Value> {
pub async fn load(&self) -> Result<Value> {
// theoretically we *could* record an access time for each, but it does not really matter
let buf = self.reader.read_blob(self.blob_ref.pos(), ctx).await?;
let buf = self.reader.read_blob(self.blob_ref.pos()).await?;
let val = Value::des(&buf)?;
Ok(val)
}
@@ -1046,11 +1037,7 @@ impl<'a> ValueRef<'a> {
pub(crate) struct Adapter<T>(T);
impl<T: AsRef<DeltaLayerInner>> Adapter<T> {
pub(crate) async fn read_blk(
&self,
blknum: u32,
ctx: &RequestContext,
) -> Result<BlockLease, std::io::Error> {
self.0.as_ref().file.read_blk(blknum, ctx).await
pub(crate) async fn read_blk(&self, blknum: u32) -> Result<BlockLease, std::io::Error> {
self.0.as_ref().file.read_blk(blknum).await
}
}

View File

@@ -237,15 +237,10 @@ impl ImageLayer {
tree_reader.dump().await?;
tree_reader
.visit(
&[0u8; KEY_SIZE],
VisitDirection::Forwards,
|key, value| {
println!("key: {} offset {}", hex::encode(key), value);
true
},
ctx,
)
.visit(&[0u8; KEY_SIZE], VisitDirection::Forwards, |key, value| {
println!("key: {} offset {}", hex::encode(key), value);
true
})
.await?;
Ok(())
@@ -266,7 +261,7 @@ impl ImageLayer {
.load(LayerAccessKind::GetValueReconstructData, ctx)
.await?;
inner
.get_value_reconstruct_data(key, reconstruct_state, ctx)
.get_value_reconstruct_data(key, reconstruct_state)
.await
// FIXME: makes no sense to dump paths
.with_context(|| format!("read {}", self.path().display()))
@@ -340,12 +335,12 @@ impl ImageLayer {
) -> Result<&ImageLayerInner> {
self.access_stats.record_access(access_kind, ctx);
self.inner
.get_or_try_init(|| self.load_inner(ctx))
.get_or_try_init(|| self.load_inner())
.await
.with_context(|| format!("Failed to load image layer {}", self.path().display()))
}
async fn load_inner(&self, ctx: &RequestContext) -> Result<ImageLayerInner> {
async fn load_inner(&self) -> Result<ImageLayerInner> {
let path = self.path();
let expected_summary = match &self.path_or_conf {
@@ -354,8 +349,7 @@ impl ImageLayer {
};
let loaded =
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary, ctx)
.await?;
ImageLayerInner::load(&path, self.desc.image_layer_lsn(), expected_summary).await?;
if let PathOrConf::Path(ref path) = self.path_or_conf {
// not production code
@@ -442,13 +436,12 @@ impl ImageLayerInner {
path: &std::path::Path,
lsn: Lsn,
summary: Option<Summary>,
ctx: &RequestContext,
) -> anyhow::Result<Self> {
let file = VirtualFile::open(path)
.await
.with_context(|| format!("Failed to open file '{}'", path.display()))?;
let file = FileBlockReader::new(file);
let summary_blk = file.read_blk(0, ctx).await?;
let summary_blk = file.read_blk(0).await?;
let actual_summary = Summary::des_prefix(summary_blk.as_ref())?;
if let Some(mut expected_summary) = summary {
@@ -477,17 +470,16 @@ impl ImageLayerInner {
&self,
key: Key,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
let file = &self.file;
let tree_reader = DiskBtreeReader::new(self.index_start_blk, self.index_root_blk, file);
let mut keybuf: [u8; KEY_SIZE] = [0u8; KEY_SIZE];
key.write_to_byte_slice(&mut keybuf);
if let Some(offset) = tree_reader.get(&keybuf, ctx).await? {
if let Some(offset) = tree_reader.get(&keybuf).await? {
let blob = file
.block_cursor()
.read_blob(offset, ctx)
.read_blob(offset)
.await
.with_context(|| format!("failed to read value from offset {}", offset))?;
let value = Bytes::from(blob);

View File

@@ -106,7 +106,7 @@ impl InMemoryLayer {
/// debugging function to print out the contents of the layer
///
/// this is likely completly unused
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
pub async fn dump(&self, verbose: bool, _ctx: &RequestContext) -> Result<()> {
let inner = self.inner.read().await;
let end_str = self.end_lsn_or_max();
@@ -125,7 +125,7 @@ impl InMemoryLayer {
for (key, vec_map) in inner.index.iter() {
for (lsn, pos) in vec_map.as_slice() {
let mut desc = String::new();
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
cursor.read_blob_into_buf(*pos, &mut buf).await?;
let val = Value::des(&buf);
match val {
Ok(Value::Image(img)) => {
@@ -158,7 +158,7 @@ impl InMemoryLayer {
key: Key,
lsn_range: Range<Lsn>,
reconstruct_state: &mut ValueReconstructState,
ctx: &RequestContext,
_ctx: &RequestContext,
) -> anyhow::Result<ValueReconstructResult> {
ensure!(lsn_range.start >= self.start_lsn);
let mut need_image = true;
@@ -171,7 +171,7 @@ impl InMemoryLayer {
if let Some(vec_map) = inner.index.get(&key) {
let slice = vec_map.slice_range(lsn_range);
for (entry_lsn, pos) in slice.iter().rev() {
let buf = reader.read_blob(*pos, ctx).await?;
let buf = reader.read_blob(*pos).await?;
let value = Value::des(&buf)?;
match value {
Value::Image(img) => {
@@ -263,13 +263,7 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub async fn put_value(
&self,
key: Key,
lsn: Lsn,
val: &Value,
ctx: &RequestContext,
) -> Result<()> {
pub async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
let inner: &mut _ = &mut *self.inner.write().await;
self.assert_writable();
@@ -281,7 +275,7 @@ impl InMemoryLayer {
let mut buf = smallvec::SmallVec::<[u8; 256]>::new();
buf.clear();
val.ser_into(&mut buf)?;
inner.file.write_blob(&buf, ctx).await?
inner.file.write_blob(&buf).await?
};
let vec_map = inner.index.entry(key).or_default();
@@ -319,7 +313,7 @@ impl InMemoryLayer {
/// Write this frozen in-memory layer to disk.
///
/// Returns a new delta layer with all the same data as this in-memory layer
pub(crate) async fn write_to_disk(&self, ctx: &RequestContext) -> Result<DeltaLayer> {
pub(crate) async fn write_to_disk(&self) -> Result<DeltaLayer> {
// Grab the lock in read-mode. We hold it over the I/O, but because this
// layer is not writeable anymore, no one should be trying to acquire the
// write lock on it, so we shouldn't block anyone. There's one exception
@@ -353,7 +347,7 @@ impl InMemoryLayer {
let key = **key;
// Write all page versions
for (lsn, pos) in vec_map.as_slice() {
cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?;
cursor.read_blob_into_buf(*pos, &mut buf).await?;
let will_init = Value::des(&buf)?.will_init();
delta_layer_writer
.put_value_bytes(key, *lsn, &buf, will_init)

View File

@@ -471,7 +471,7 @@ impl Timeline {
// The cached image can be returned directly if there is no WAL between the cached image
// and requested LSN. The cached image can also be used to reduce the amount of WAL needed
// for redo.
let cached_page_img = match self.lookup_cached_page(&key, lsn, ctx).await {
let cached_page_img = match self.lookup_cached_page(&key, lsn).await {
Some((cached_lsn, cached_img)) => {
match cached_lsn.cmp(&lsn) {
Ordering::Less => {} // there might be WAL between cached_lsn and lsn, we need to check
@@ -2518,18 +2518,13 @@ impl Timeline {
}
}
async fn lookup_cached_page(
&self,
key: &Key,
lsn: Lsn,
ctx: &RequestContext,
) -> Option<(Lsn, Bytes)> {
async fn lookup_cached_page(&self, key: &Key, lsn: Lsn) -> Option<(Lsn, Bytes)> {
let cache = page_cache::get();
// FIXME: It's pointless to check the cache for things that are not 8kB pages.
// We should look at the key to determine if it's a cacheable object
let (lsn, read_guard) = cache
.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn, ctx)
.lookup_materialized_page(self.tenant_id, self.timeline_id, key, lsn)
.await?;
let img = Bytes::from(read_guard.to_vec());
Some((lsn, img))
@@ -2563,16 +2558,10 @@ impl Timeline {
Ok(layer)
}
async fn put_value(
&self,
key: Key,
lsn: Lsn,
val: &Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> anyhow::Result<()> {
//info!("PUT: key {} at {}", key, lsn);
let layer = self.get_layer_for_write(lsn).await?;
layer.put_value(key, lsn, val, ctx).await?;
layer.put_value(key, lsn, val).await?;
Ok(())
}
@@ -2744,7 +2733,7 @@ impl Timeline {
// Normal case, write out a L0 delta layer file.
// `create_delta_layer` will not modify the layer map.
// We will remove frozen layer and add delta layer in one atomic operation later.
let layer = self.create_delta_layer(&frozen_layer, ctx).await?;
let layer = self.create_delta_layer(&frozen_layer).await?;
(
HashMap::from([(
layer.filename(),
@@ -2867,21 +2856,19 @@ impl Timeline {
async fn create_delta_layer(
self: &Arc<Self>,
frozen_layer: &Arc<InMemoryLayer>,
ctx: &RequestContext,
) -> anyhow::Result<DeltaLayer> {
let span = tracing::info_span!("blocking");
let new_delta: DeltaLayer = tokio::task::spawn_blocking({
let _g = span.entered();
let self_clone = Arc::clone(self);
let frozen_layer = Arc::clone(frozen_layer);
let ctx = ctx.attached_child();
move || {
// Write it out
// Keep this inside `spawn_blocking` and `Handle::current`
// as long as the write path is still sync and the read impl
// is still not fully async. Otherwise executor threads would
// be blocked.
let new_delta = Handle::current().block_on(frozen_layer.write_to_disk(&ctx))?;
let new_delta = Handle::current().block_on(frozen_layer.write_to_disk())?;
let new_delta_path = new_delta.path();
// Sync it to disk.
@@ -3587,7 +3574,7 @@ impl Timeline {
key, lsn, ref val, ..
} in all_values_iter
{
let value = val.load(ctx).await?;
let value = val.load().await?;
let same_key = prev_key.map_or(false, |prev_key| prev_key == key);
// We need to check key boundaries once we reach next key or end of layer with the same key
if !same_key || lsn == dup_end_lsn {
@@ -4712,14 +4699,8 @@ impl<'a> TimelineWriter<'a> {
///
/// This will implicitly extend the relation, if the page is beyond the
/// current end-of-file.
pub async fn put(
&self,
key: Key,
lsn: Lsn,
value: &Value,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value, ctx).await
pub async fn put(&self, key: Key, lsn: Lsn, value: &Value) -> anyhow::Result<()> {
self.tl.put_value(key, lsn, value).await
}
pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {

View File

@@ -650,12 +650,6 @@ mod tests {
File(File),
}
impl From<VirtualFile> for MaybeVirtualFile {
fn from(vf: VirtualFile) -> Self {
MaybeVirtualFile::VirtualFile(vf)
}
}
impl MaybeVirtualFile {
async fn read_exact_at(&self, buf: &mut [u8], offset: u64) -> Result<(), Error> {
match self {
@@ -893,54 +887,4 @@ mod tests {
Ok(())
}
#[tokio::test]
async fn test_atomic_overwrite_basic() {
let testdir = crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_basic");
std::fs::create_dir_all(&testdir).unwrap();
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
let post = file.read_string().await.unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"bar")
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
let post = file.read_string().await.unwrap();
assert_eq!(post, "bar");
assert!(!tmp_path.exists());
drop(file);
}
#[tokio::test]
async fn test_atomic_overwrite_preexisting_tmp() {
let testdir =
crate::config::PageServerConf::test_repo_dir("test_atomic_overwrite_preexisting_tmp");
std::fs::create_dir_all(&testdir).unwrap();
let path = testdir.join("myfile");
let tmp_path = testdir.join("myfile.tmp");
std::fs::write(&tmp_path, "some preexisting junk that should be removed").unwrap();
assert!(tmp_path.exists());
VirtualFile::crashsafe_overwrite(&path, &tmp_path, b"foo")
.await
.unwrap();
let mut file = MaybeVirtualFile::from(VirtualFile::open(&path).await.unwrap());
let post = file.read_string().await.unwrap();
assert_eq!(post, "foo");
assert!(!tmp_path.exists());
drop(file);
}
}

View File

@@ -151,56 +151,50 @@ impl<'a> WalIngest<'a> {
}
}
} else if self.timeline.pg_version == 15 {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG
{
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY
{
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
.await?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
match decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK {
postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG
| postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY => {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
debug!("XLOG_DBASE_CREATE_WAL_LOG");
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
.await?;
}
postgres_ffi::v15::bindings::XLOG_DBASE_DROP => {
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
}
}
_ => {}
}
} else if self.timeline.pg_version == 16 {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG
{
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY
{
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
.await?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v16::bindings::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
match decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK {
postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_WAL_LOG
| postgres_ffi::v16::bindings::XLOG_DBASE_CREATE_FILE_COPY => {
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
debug!("XLOG_DBASE_CREATE_WAL_LOG");
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb, ctx)
.await?;
}
postgres_ffi::v16::bindings::XLOG_DBASE_DROP => {
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification
.drop_dbdir(tablespace_id, dropdb.db_id, ctx)
.await?;
}
}
_ => {}
}
}
} else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID {
@@ -363,7 +357,7 @@ impl<'a> WalIngest<'a> {
// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit(ctx).await?;
modification.commit().await?;
Ok(())
}
@@ -1561,7 +1555,7 @@ mod tests {
let mut m = tline.begin_modification(Lsn(0x10));
m.put_checkpoint(ZERO_CHECKPOINT.clone())?;
m.put_relmap_file(0, 111, Bytes::from(""), ctx).await?; // dummy relmapper file
m.commit(ctx).await?;
m.commit().await?;
let walingest = WalIngest::new(tline, Lsn(0x10), ctx).await?;
Ok(walingest)
@@ -1580,22 +1574,22 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x30));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 3"), &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x40));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1 at 4"), &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
let mut m = tline.begin_modification(Lsn(0x50));
walingest
.put_rel_page_image(&mut m, TESTREL_A, 2, TEST_IMG("foo blk 2 at 5"), &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
assert_current_logical_size(&tline, Lsn(0x50));
@@ -1681,7 +1675,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 2, &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
assert_current_logical_size(&tline, Lsn(0x60));
// Check reported size and contents after truncation
@@ -1723,7 +1717,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 0, &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x68), false, &ctx)
@@ -1736,7 +1730,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1, TEST_IMG("foo blk 1"), &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x70), false, &ctx)
@@ -1761,7 +1755,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 1500, TEST_IMG("foo blk 1500"), &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
assert_eq!(
tline
.get_rel_size(TESTREL_A, Lsn(0x80), false, &ctx)
@@ -1800,7 +1794,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 2"), &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1819,7 +1813,7 @@ mod tests {
// Drop rel
let mut m = tline.begin_modification(Lsn(0x30));
walingest.put_rel_drop(&mut m, TESTREL_A, &ctx).await?;
m.commit(&ctx).await?;
m.commit().await?;
// Check that rel is not visible anymore
assert_eq!(
@@ -1837,7 +1831,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, 0, TEST_IMG("foo blk 0 at 4"), &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
// Check that rel exists and size is correct
assert_eq!(
@@ -1876,7 +1870,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit(&ctx).await?;
m.commit().await?;
// The relation was created at LSN 20, not visible at LSN 1 yet.
assert_eq!(
@@ -1921,7 +1915,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, 1, &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
// Check reported size and contents after truncation
assert_eq!(
@@ -1970,7 +1964,7 @@ mod tests {
.put_rel_page_image(&mut m, TESTREL_A, blkno, TEST_IMG(&data), &ctx)
.await?;
}
m.commit(&ctx).await?;
m.commit().await?;
assert_eq!(
tline
@@ -2017,7 +2011,7 @@ mod tests {
walingest
.put_rel_page_image(&mut m, TESTREL_A, blknum as BlockNumber, img, &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
}
assert_current_logical_size(&tline, Lsn(lsn));
@@ -2033,7 +2027,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE, &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE
@@ -2046,7 +2040,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, RELSEG_SIZE - 1, &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
RELSEG_SIZE - 1
@@ -2062,7 +2056,7 @@ mod tests {
walingest
.put_rel_truncation(&mut m, TESTREL_A, size as BlockNumber, &ctx)
.await?;
m.commit(&ctx).await?;
m.commit().await?;
assert_eq!(
tline.get_rel_size(TESTREL_A, Lsn(lsn), false, &ctx).await?,
size as BlockNumber

View File

@@ -2,3 +2,4 @@ comment = '** Deprecated ** Please use pg_embedding instead'
default_version = '0.1.0'
module_pathname = '$libdir/hnsw'
relocatable = true
trusted = true

View File

@@ -72,7 +72,6 @@
typedef struct FileCacheEntry
{
BufferTag key;
uint32 hash;
uint32 offset;
uint32 access_count;
uint32 bitmap[BLOCKS_PER_CHUNK/32];
@@ -84,9 +83,6 @@ typedef struct FileCacheControl
uint64 generation; /* generation is needed to handle correct hash reenabling */
uint32 size; /* size of cache file in chunks */
uint32 used; /* number of used chunks */
uint32 limit; /* shared copy of lfc_size_limit */
uint64 hits;
uint64 misses;
dlist_head lru; /* double linked list for LRU replacement algorithm */
} FileCacheControl;
@@ -104,9 +100,7 @@ static shmem_request_hook_type prev_shmem_request_hook;
#endif
static int lfc_shrinking_factor; /* power of two by which local cache size will be shrinked when lfc_free_space_watermark is reached */
#define LFC_ENABLED() (lfc_ctl->limit != 0)
void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg);
void FileCacheMonitorMain(Datum main_arg);
/*
* Local file cache is mandatory and Neon can work without it.
@@ -117,68 +111,49 @@ void PGDLLEXPORT FileCacheMonitorMain(Datum main_arg);
static void
lfc_disable(char const* op)
{
HASH_SEQ_STATUS status;
FileCacheEntry* entry;
elog(WARNING, "Failed to %s local file cache at %s: %m, disabling local file cache", op, lfc_path);
/* Invalidate hash */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (LFC_ENABLED())
{
HASH_SEQ_STATUS status;
FileCacheEntry* entry;
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
hash_search_with_hash_value(lfc_hash, &entry->key, entry->hash, HASH_REMOVE, NULL);
}
lfc_ctl->generation += 1;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
lfc_ctl->limit = 0;
dlist_init(&lfc_ctl->lru);
if (lfc_desc > 0)
{
/* If the reason of error is ENOSPC, then truncation of file may help to reclaim some space */
int rc = ftruncate(lfc_desc, 0);
if (rc < 0)
elog(WARNING, "Failed to truncate local file cache %s: %m", lfc_path);
}
}
LWLockRelease(lfc_lock);
if (lfc_desc > 0)
close(lfc_desc);
lfc_desc = -1;
}
lfc_size_limit = 0;
/*
* This check is done without obtaining lfc_lock, so it is unreliable
*/
static bool
lfc_maybe_disabled(void)
{
return !lfc_ctl || !LFC_ENABLED();
/* Invalidate hash */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
hash_search(lfc_hash, &entry->key, HASH_REMOVE, NULL);
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
hash_seq_term(&status);
lfc_ctl->generation += 1;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
dlist_init(&lfc_ctl->lru);
LWLockRelease(lfc_lock);
}
static bool
lfc_ensure_opened(void)
{
bool enabled = !lfc_maybe_disabled();
/* Open cache file if not done yet */
if (lfc_desc <= 0 && enabled)
if (lfc_desc <= 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR);
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
if (lfc_desc < 0) {
lfc_disable("open");
return false;
}
}
return enabled;
return true;
}
static void
@@ -197,7 +172,6 @@ lfc_shmem_startup(void)
lfc_ctl = (FileCacheControl*)ShmemInitStruct("lfc", sizeof(FileCacheControl), &found);
if (!found)
{
int fd;
uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size);
lfc_lock = (LWLockId)GetNamedLWLockTranche("lfc_lock");
info.keysize = sizeof(BufferTag);
@@ -210,22 +184,10 @@ lfc_shmem_startup(void)
lfc_ctl->generation = 0;
lfc_ctl->size = 0;
lfc_ctl->used = 0;
lfc_ctl->hits = 0;
lfc_ctl->misses = 0;
dlist_init(&lfc_ctl->lru);
/* Recreate file cache on restart */
fd = BasicOpenFile(lfc_path, O_RDWR|O_CREAT|O_TRUNC);
if (fd < 0)
{
elog(WARNING, "Failed to create local file cache %s: %m", lfc_path);
lfc_ctl->limit = 0;
}
else
{
close(fd);
lfc_ctl->limit = SIZE_MB_TO_CHUNKS(lfc_size_limit);
}
/* Remove file cache on restart */
(void)unlink(lfc_path);
}
LWLockRelease(AddinShmemInitLock);
}
@@ -242,17 +204,6 @@ lfc_shmem_request(void)
RequestNamedLWLockTranche("lfc_lock", 1);
}
static bool
is_normal_backend(void)
{
/*
* Stats collector detach shared memory, so we should not try to access shared memory here.
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
* The Postmaster can handle SIGHUP and it has access to shared memory (UsedShmemSegAddr != NULL), but has no PGPROC.
*/
return lfc_ctl && MyProc && UsedShmemSegAddr && !IsParallelWorker();
}
static bool
lfc_check_limit_hook(int *newval, void **extra, GucSource source)
{
@@ -268,15 +219,24 @@ static void
lfc_change_limit_hook(int newval, void *extra)
{
uint32 new_size = SIZE_MB_TO_CHUNKS(newval);
if (!is_normal_backend())
return;
if (!lfc_ensure_opened())
/*
* Stats collector detach shared memory, so we should not try to access shared memory here.
* Parallel workers first assign default value (0), so not perform truncation in parallel workers.
*/
if (!lfc_ctl || !UsedShmemSegAddr || IsParallelWorker())
return;
/* Open cache file if not done yet */
if (lfc_desc <= 0)
{
lfc_desc = BasicOpenFile(lfc_path, O_RDWR|O_CREAT);
if (lfc_desc < 0) {
elog(WARNING, "Failed to open file cache %s: %m, disabling file cache", lfc_path);
lfc_size_limit = 0; /* disable file cache */
return;
}
}
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
while (new_size < lfc_ctl->used && !dlist_is_empty(&lfc_ctl->lru))
{
/* Shrink cache by throwing away least recently accessed chunks and returning their space to file system */
@@ -286,12 +246,10 @@ lfc_change_limit_hook(int newval, void *extra)
if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE|FALLOC_FL_KEEP_SIZE, (off_t)victim->offset*BLOCKS_PER_CHUNK*BLCKSZ, BLOCKS_PER_CHUNK*BLCKSZ) < 0)
elog(LOG, "Failed to punch hole in file: %m");
#endif
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
lfc_ctl->used -= 1;
}
lfc_ctl->limit = new_size;
elog(DEBUG1, "set local file cache limit to %d", new_size);
LWLockRelease(lfc_lock);
}
@@ -309,7 +267,7 @@ lfc_change_limit_hook(int newval, void *extra)
* disk space with maximal possible disk write speed (1Gb/sec). But not larger than 1 second.
* Calling statvfs each second should not add any noticeable overhead.
*/
PGDLLEXPORT void
void
FileCacheMonitorMain(Datum main_arg)
{
/*
@@ -327,7 +285,7 @@ FileCacheMonitorMain(Datum main_arg)
/* Periodically dump buffers until terminated. */
while (!ShutdownRequestPending)
{
if (!lfc_maybe_disabled())
if (lfc_size_limit != 0)
{
struct statvfs sfs;
if (statvfs(lfc_path, &sfs) < 0)
@@ -341,7 +299,7 @@ FileCacheMonitorMain(Datum main_arg)
if (lfc_shrinking_factor < 31) {
lfc_shrinking_factor += 1;
}
lfc_change_limit_hook(lfc_ctl->limit >> lfc_shrinking_factor, NULL);
lfc_change_limit_hook(lfc_size_limit >> lfc_shrinking_factor, NULL);
}
else
lfc_shrinking_factor = 0; /* reset to initial value */
@@ -379,7 +337,6 @@ lfc_init(void)
if (!process_shared_preload_libraries_in_progress)
elog(ERROR, "Neon module should be loaded via shared_preload_libraries");
DefineCustomIntVariable("neon.max_file_cache_size",
"Maximal size of Neon local file cache",
NULL,
@@ -456,10 +413,10 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
BufferTag tag;
FileCacheEntry* entry;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
bool found = false;
bool found;
uint32 hash;
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return false;
CopyNRelFileInfoToBufTag(tag, rinfo);
@@ -468,11 +425,8 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
{
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
found = entry != NULL && (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) != 0;
LWLockRelease(lfc_lock);
return found;
}
@@ -489,7 +443,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
uint32 hash;
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return;
CopyNRelFileInfoToBufTag(tag, rinfo);
@@ -499,13 +453,6 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
if (!found)
@@ -556,7 +503,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno)
/*
* Try to read page from local cache.
* Returns true if page is found in local cache.
* In case of error local file cache is disabled (lfc->limit is set to zero).
* In case of error lfc_size_limit is set to zero to disable any further opera-tins with cache.
*/
bool
lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
@@ -571,7 +518,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
uint64 generation;
uint32 entry_offset;
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return false;
if (!lfc_ensure_opened())
@@ -583,18 +530,10 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return false;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, NULL);
if (entry == NULL || (entry->bitmap[chunk_offs >> 5] & (1 << (chunk_offs & 31))) == 0)
{
/* Page is not cached */
lfc_ctl->misses += 1; /* race condition here, but precise value is not needed */
LWLockRelease(lfc_lock);
return false;
}
@@ -615,11 +554,8 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
/* Place entry to the head of LRU list */
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (lfc_ctl->generation == generation)
{
Assert(LFC_ENABLED());
lfc_ctl->hits += 1;
Assert(entry->access_count > 0);
if (--entry->access_count == 0)
dlist_push_tail(&lfc_ctl->lru, &entry->lru_node);
@@ -651,7 +587,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
uint32 hash;
if (lfc_maybe_disabled()) /* fast exit if file cache is disabled */
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return;
if (!lfc_ensure_opened())
@@ -659,17 +595,12 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
tag.forkNum = forkNum;
tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK-1);
CopyNRelFileInfoToBufTag(tag, rinfo);
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
if (!LFC_ENABLED())
{
LWLockRelease(lfc_lock);
return;
}
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_ENTER, &found);
if (found)
@@ -688,13 +619,13 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
* there are should be very large number of concurrent IO operations and them are limited by max_connections,
* we prefer not to complicate code and use second approach.
*/
if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru))
if (lfc_ctl->used >= SIZE_MB_TO_CHUNKS(lfc_size_limit) && !dlist_is_empty(&lfc_ctl->lru))
{
/* Cache overflow: evict least recently used chunk */
FileCacheEntry* victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru));
Assert(victim->access_count == 0);
entry->offset = victim->offset; /* grab victim's chunk */
hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL);
hash_search(lfc_hash, &victim->key, HASH_REMOVE, NULL);
elog(DEBUG2, "Swap file cache page");
}
else
@@ -703,7 +634,6 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno,
entry->offset = lfc_ctl->size++; /* allocate new chunk at end of file */
}
entry->access_count = 1;
entry->hash = hash;
memset(entry->bitmap, 0, sizeof entry->bitmap);
}
@@ -748,23 +678,6 @@ typedef struct
LocalCachePagesRec *record;
} LocalCachePagesContext;
PG_FUNCTION_INFO_V1(local_cache_hits);
Datum
local_cache_hits(PG_FUNCTION_ARGS)
{
PG_RETURN_INT64(lfc_ctl ? lfc_ctl->hits : -1);
}
PG_FUNCTION_INFO_V1(local_cache_misses);
Datum
local_cache_misses(PG_FUNCTION_ARGS)
{
PG_RETURN_INT64(lfc_ctl ? lfc_ctl->misses : -1);
}
/*
* Function returning data from the local file cache
* relation node/tablespace/database/blocknum and access_counter
@@ -838,15 +751,13 @@ local_cache_pages(PG_FUNCTION_ARGS)
LWLockAcquire(lfc_lock, LW_SHARED);
if (LFC_ENABLED())
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0;
}
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
n_pages += (entry->bitmap[i >> 5] & (1 << (i & 31))) != 0;
}
hash_seq_term(&status);
fctx->record = (LocalCachePagesRec *)
MemoryContextAllocHuge(CurrentMemoryContext,
sizeof(LocalCachePagesRec) * n_pages);
@@ -858,37 +769,35 @@ local_cache_pages(PG_FUNCTION_ARGS)
/* Return to original context when allocating transient memory */
MemoryContextSwitchTo(oldcontext);
if (n_pages != 0)
/*
* Scan through all the buffers, saving the relevant fields in the
* fctx->record structure.
*
* We don't hold the partition locks, so we don't get a consistent
* snapshot across all buffers, but we do grab the buffer header
* locks, so the information of each buffer is self-consistent.
*/
n_pages = 0;
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
{
/*
* Scan through all the buffers, saving the relevant fields in the
* fctx->record structure.
*
* We don't hold the partition locks, so we don't get a consistent
* snapshot across all buffers, but we do grab the buffer header
* locks, so the information of each buffer is self-consistent.
*/
n_pages = 0;
hash_seq_init(&status, lfc_hash);
while ((entry = hash_seq_search(&status)) != NULL)
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
{
for (int i = 0; i < BLOCKS_PER_CHUNK; i++)
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
{
if (entry->bitmap[i >> 5] & (1 << (i & 31)))
{
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].forknum = entry->key.forkNum;
fctx->record[n_pages].blocknum = entry->key.blockNum + i;
fctx->record[n_pages].accesscount = entry->access_count;
n_pages += 1;
}
fctx->record[n_pages].pageoffs = entry->offset*BLOCKS_PER_CHUNK + i;
fctx->record[n_pages].relfilenode = NInfoGetRelNumber(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].reltablespace = NInfoGetSpcOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].reldatabase = NInfoGetDbOid(BufTagGetNRelFileInfo(entry->key));
fctx->record[n_pages].forknum = entry->key.forkNum;
fctx->record[n_pages].blocknum = entry->key.blockNum + i;
fctx->record[n_pages].accesscount = entry->access_count;
n_pages += 1;
}
}
Assert(n_pages == funcctx->max_calls);
}
hash_seq_term(&status);
Assert(n_pages == funcctx->max_calls);
LWLockRelease(lfc_lock);
}

50
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
[[package]]
name = "aiohttp"
@@ -887,34 +887,34 @@ files = [
[[package]]
name = "cryptography"
version = "41.0.4"
version = "41.0.3"
description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
optional = false
python-versions = ">=3.7"
files = [
{file = "cryptography-41.0.4-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:80907d3faa55dc5434a16579952ac6da800935cd98d14dbd62f6f042c7f5e839"},
{file = "cryptography-41.0.4-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:35c00f637cd0b9d5b6c6bd11b6c3359194a8eba9c46d4e875a3660e3b400005f"},
{file = "cryptography-41.0.4-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cecfefa17042941f94ab54f769c8ce0fe14beff2694e9ac684176a2535bf9714"},
{file = "cryptography-41.0.4-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e40211b4923ba5a6dc9769eab704bdb3fbb58d56c5b336d30996c24fcf12aadb"},
{file = "cryptography-41.0.4-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:23a25c09dfd0d9f28da2352503b23e086f8e78096b9fd585d1d14eca01613e13"},
{file = "cryptography-41.0.4-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:2ed09183922d66c4ec5fdaa59b4d14e105c084dd0febd27452de8f6f74704143"},
{file = "cryptography-41.0.4-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:5a0f09cefded00e648a127048119f77bc2b2ec61e736660b5789e638f43cc397"},
{file = "cryptography-41.0.4-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:9eeb77214afae972a00dee47382d2591abe77bdae166bda672fb1e24702a3860"},
{file = "cryptography-41.0.4-cp37-abi3-win32.whl", hash = "sha256:3b224890962a2d7b57cf5eeb16ccaafba6083f7b811829f00476309bce2fe0fd"},
{file = "cryptography-41.0.4-cp37-abi3-win_amd64.whl", hash = "sha256:c880eba5175f4307129784eca96f4e70b88e57aa3f680aeba3bab0e980b0f37d"},
{file = "cryptography-41.0.4-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:004b6ccc95943f6a9ad3142cfabcc769d7ee38a3f60fb0dddbfb431f818c3a67"},
{file = "cryptography-41.0.4-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:86defa8d248c3fa029da68ce61fe735432b047e32179883bdb1e79ed9bb8195e"},
{file = "cryptography-41.0.4-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:37480760ae08065437e6573d14be973112c9e6dcaf5f11d00147ee74f37a3829"},
{file = "cryptography-41.0.4-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:b5f4dfe950ff0479f1f00eda09c18798d4f49b98f4e2006d644b3301682ebdca"},
{file = "cryptography-41.0.4-pp38-pypy38_pp73-macosx_10_12_x86_64.whl", hash = "sha256:7e53db173370dea832190870e975a1e09c86a879b613948f09eb49324218c14d"},
{file = "cryptography-41.0.4-pp38-pypy38_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:5b72205a360f3b6176485a333256b9bcd48700fc755fef51c8e7e67c4b63e3ac"},
{file = "cryptography-41.0.4-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:93530900d14c37a46ce3d6c9e6fd35dbe5f5601bf6b3a5c325c7bffc030344d9"},
{file = "cryptography-41.0.4-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:efc8ad4e6fc4f1752ebfb58aefece8b4e3c4cae940b0994d43649bdfce8d0d4f"},
{file = "cryptography-41.0.4-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:c3391bd8e6de35f6f1140e50aaeb3e2b3d6a9012536ca23ab0d9c35ec18c8a91"},
{file = "cryptography-41.0.4-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:0d9409894f495d465fe6fda92cb70e8323e9648af912d5b9141d616df40a87b8"},
{file = "cryptography-41.0.4-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:8ac4f9ead4bbd0bc8ab2d318f97d85147167a488be0e08814a37eb2f439d5cf6"},
{file = "cryptography-41.0.4-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:047c4603aeb4bbd8db2756e38f5b8bd7e94318c047cfe4efeb5d715e08b49311"},
{file = "cryptography-41.0.4.tar.gz", hash = "sha256:7febc3094125fc126a7f6fb1f420d0da639f3f32cb15c8ff0dc3997c4549f51a"},
{file = "cryptography-41.0.3-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:652627a055cb52a84f8c448185922241dd5217443ca194d5739b44612c5e6507"},
{file = "cryptography-41.0.3-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:8f09daa483aedea50d249ef98ed500569841d6498aa9c9f4b0531b9964658922"},
{file = "cryptography-41.0.3-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4fd871184321100fb400d759ad0cddddf284c4b696568204d281c902fc7b0d81"},
{file = "cryptography-41.0.3-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:84537453d57f55a50a5b6835622ee405816999a7113267739a1b4581f83535bd"},
{file = "cryptography-41.0.3-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:3fb248989b6363906827284cd20cca63bb1a757e0a2864d4c1682a985e3dca47"},
{file = "cryptography-41.0.3-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:42cb413e01a5d36da9929baa9d70ca90d90b969269e5a12d39c1e0d475010116"},
{file = "cryptography-41.0.3-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:aeb57c421b34af8f9fe830e1955bf493a86a7996cc1338fe41b30047d16e962c"},
{file = "cryptography-41.0.3-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:6af1c6387c531cd364b72c28daa29232162010d952ceb7e5ca8e2827526aceae"},
{file = "cryptography-41.0.3-cp37-abi3-win32.whl", hash = "sha256:0d09fb5356f975974dbcb595ad2d178305e5050656affb7890a1583f5e02a306"},
{file = "cryptography-41.0.3-cp37-abi3-win_amd64.whl", hash = "sha256:a983e441a00a9d57a4d7c91b3116a37ae602907a7618b882c8013b5762e80574"},
{file = "cryptography-41.0.3-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:5259cb659aa43005eb55a0e4ff2c825ca111a0da1814202c64d28a985d33b087"},
{file = "cryptography-41.0.3-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:67e120e9a577c64fe1f611e53b30b3e69744e5910ff3b6e97e935aeb96005858"},
{file = "cryptography-41.0.3-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:7efe8041897fe7a50863e51b77789b657a133c75c3b094e51b5e4b5cec7bf906"},
{file = "cryptography-41.0.3-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:ce785cf81a7bdade534297ef9e490ddff800d956625020ab2ec2780a556c313e"},
{file = "cryptography-41.0.3-pp38-pypy38_pp73-macosx_10_12_x86_64.whl", hash = "sha256:57a51b89f954f216a81c9d057bf1a24e2f36e764a1ca9a501a6964eb4a6800dd"},
{file = "cryptography-41.0.3-pp38-pypy38_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:4c2f0d35703d61002a2bbdcf15548ebb701cfdd83cdc12471d2bae80878a4207"},
{file = "cryptography-41.0.3-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:23c2d778cf829f7d0ae180600b17e9fceea3c2ef8b31a99e3c694cbbf3a24b84"},
{file = "cryptography-41.0.3-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:95dd7f261bb76948b52a5330ba5202b91a26fbac13ad0e9fc8a3ac04752058c7"},
{file = "cryptography-41.0.3-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:41d7aa7cdfded09b3d73a47f429c298e80796c8e825ddfadc84c8a7f12df212d"},
{file = "cryptography-41.0.3-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:d0d651aa754ef58d75cec6edfbd21259d93810b73f6ec246436a21b7841908de"},
{file = "cryptography-41.0.3-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:ab8de0d091acbf778f74286f4989cf3d1528336af1b59f3e5d2ebca8b5fe49e1"},
{file = "cryptography-41.0.3-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:a74fbcdb2a0d46fe00504f571a2a540532f4c188e6ccf26f1f178480117b33c4"},
{file = "cryptography-41.0.3.tar.gz", hash = "sha256:6d192741113ef5e30d89dcb5b956ef4e1578f304708701b8b73d38e3e1461f34"},
]
[package.dependencies]

View File

@@ -28,11 +28,10 @@ use tracing::{error, info, info_span, warn, Instrument};
use utils::measured_stream::MeasuredStream;
/// Number of times we should retry the `/proxy_wake_compute` http request.
/// Retry duration is BASE_RETRY_WAIT_DURATION * RETRY_WAIT_EXPONENT_BASE ^ n, where n starts at 0
pub const NUM_RETRIES_CONNECT: u32 = 16;
/// Retry duration is BASE_RETRY_WAIT_DURATION * 1.5^n
pub const NUM_RETRIES_CONNECT: u32 = 10;
const CONNECT_TIMEOUT: time::Duration = time::Duration::from_secs(2);
const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(25);
const RETRY_WAIT_EXPONENT_BASE: f64 = std::f64::consts::SQRT_2;
const BASE_RETRY_WAIT_DURATION: time::Duration = time::Duration::from_millis(100);
const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)";
const ERR_PROTO_VIOLATION: &str = "protocol violation";
@@ -554,7 +553,8 @@ impl ShouldRetry for compute::ConnectionError {
}
pub fn retry_after(num_retries: u32) -> time::Duration {
BASE_RETRY_WAIT_DURATION.mul_f64(RETRY_WAIT_EXPONENT_BASE.powi((num_retries as i32) - 1))
// 1.5 seems to be an ok growth factor heuristic
BASE_RETRY_WAIT_DURATION.mul_f64(1.5_f64.powi(num_retries as i32))
}
/// Finish client connection initialization: confirm auth success, send params, etc.

View File

@@ -303,7 +303,7 @@ async fn scram_auth_mock() -> anyhow::Result<()> {
#[test]
fn connect_compute_total_wait() {
let mut total_wait = tokio::time::Duration::ZERO;
for num_retries in 1..NUM_RETRIES_CONNECT {
for num_retries in 1..10 {
total_wait += retry_after(num_retries);
}
assert!(total_wait < tokio::time::Duration::from_secs(12));
@@ -494,11 +494,11 @@ async fn connect_to_compute_non_retry_2() {
/// Retry for at most `NUM_RETRIES_CONNECT` times.
#[tokio::test]
async fn connect_to_compute_non_retry_3() {
assert_eq!(NUM_RETRIES_CONNECT, 16);
assert_eq!(NUM_RETRIES_CONNECT, 10);
use ConnectAction::*;
let mechanism = TestConnectMechanism::new(vec![
Retry, Wake, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry,
Retry, Retry, Retry, Retry, /* the 17th time */ Retry,
Retry, Wake, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry, Retry,
/* the 11th time */ Retry,
]);
let (cache, extra, creds) = helper_create_connect_info(&mechanism);
connect_to_compute(&mechanism, cache, &extra, &creds)

View File

@@ -1,5 +1,5 @@
[toolchain]
channel = "1.72.1"
channel = "1.72.0"
profile = "default"
# The default profile includes rustc, rust-std, cargo, rust-docs, rustfmt and clippy.
# https://rust-lang.github.io/rustup/concepts/profiles.html

View File

@@ -1,62 +0,0 @@
#!/usr/bin/env python3
#
# Script to download the basebackup from a pageserver to a tar file.
#
# This can be useful in disaster recovery.
#
import argparse
import psycopg2
from psycopg2.extensions import connection as PgConnection
def main(args: argparse.Namespace):
pageserver_connstr = args.pageserver_connstr
tenant_id = args.tenant
timeline_id = args.timeline
lsn = args.lsn
output_path = args.output_path
psconn: PgConnection = psycopg2.connect(pageserver_connstr)
psconn.autocommit = True
output = open(output_path, "wb")
with psconn.cursor() as pscur:
pscur.copy_expert(f"basebackup {tenant_id} {timeline_id} {lsn}", output)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
"--tenant-id",
dest="tenant",
required=True,
help="Id of the tenant",
)
parser.add_argument(
"--timeline-id",
dest="timeline",
required=True,
help="Id of the timeline",
)
parser.add_argument(
"--lsn",
dest="lsn",
required=True,
help="LSN to take the basebackup at",
)
parser.add_argument(
"--pageserver-connstr",
dest="pageserver_connstr",
required=True,
help="libpq connection string of the pageserver",
)
parser.add_argument(
"--output",
dest="output_path",
required=True,
help="output path to write the basebackup to",
)
args = parser.parse_args()
main(args)

View File

@@ -1,44 +0,0 @@
import threading
import time
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv, PgBin
#
# Test branching, when a transaction is in prepared state
#
@pytest.mark.timeout(600)
def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin):
env = neon_simple_env
env.neon_cli.create_branch("test_lfc_resize", "empty")
endpoint = env.endpoints.create_start(
"test_lfc_resize",
config_lines=[
"neon.file_cache_path='file.cache'",
"neon.max_file_cache_size=1GB",
"neon.file_cache_size_limit=1GB",
],
)
n_resize = 10
scale = 10
log.info("postgres is running on 'test_lfc_resize' branch")
def run_pgbench(connstr: str):
log.info(f"Start a pgbench workload on pg {connstr}")
pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr])
pg_bin.run_capture(["pgbench", "-c4", f"-T{n_resize}", "-Mprepared", connstr])
thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True)
thread.start()
conn = endpoint.connect()
cur = conn.cursor()
for i in range(n_resize):
cur.execute(f"alter system set neon.file_cache_size_limit='{i*10}MB'")
cur.execute("select pg_reload_conf()")
time.sleep(1)
thread.join()

View File

@@ -1,74 +0,0 @@
import os
import random
import threading
import time
from typing import List
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import query_scalar
def test_local_file_cache_unlink(neon_simple_env: NeonEnv):
env = neon_simple_env
cache_dir = os.path.join(env.repo_dir, "file_cache")
os.mkdir(cache_dir)
env.neon_cli.create_branch("test_local_file_cache_unlink", "empty")
endpoint = env.endpoints.create_start(
"test_local_file_cache_unlink",
config_lines=[
"shared_buffers='1MB'",
f"neon.file_cache_path='{cache_dir}/file.cache'",
"neon.max_file_cache_size='64MB'",
"neon.file_cache_size_limit='10MB'",
],
)
cur = endpoint.connect().cursor()
n_rows = 100000
n_threads = 20
n_updates_per_thread = 10000
n_updates_per_connection = 1000
n_total_updates = n_threads * n_updates_per_thread
cur.execute("CREATE TABLE lfctest (id int4 PRIMARY KEY, n int) WITH (fillfactor=10)")
cur.execute(f"INSERT INTO lfctest SELECT g, 1 FROM generate_series(1, {n_rows}) g")
# Start threads that will perform random UPDATEs. Each UPDATE
# increments the counter on the row, so that we can check at the
# end that the sum of all the counters match the number of updates
# performed (plus the initial 1 on each row).
#
# Furthermore, each thread will reconnect between every 1000 updates.
def run_updates():
n_updates_performed = 0
conn = endpoint.connect()
cur = conn.cursor()
for _ in range(n_updates_per_thread):
id = random.randint(1, n_rows)
cur.execute(f"UPDATE lfctest SET n = n + 1 WHERE id = {id}")
n_updates_performed += 1
if n_updates_performed % n_updates_per_connection == 0:
cur.close()
conn.close()
conn = endpoint.connect()
cur = conn.cursor()
threads: List[threading.Thread] = []
for _i in range(n_threads):
thread = threading.Thread(target=run_updates, args=(), daemon=True)
thread.start()
threads.append(thread)
time.sleep(5)
new_cache_dir = os.path.join(env.repo_dir, "file_cache_new")
os.rename(cache_dir, new_cache_dir)
for thread in threads:
thread.join()
assert query_scalar(cur, "SELECT SUM(n) FROM lfctest") == n_total_updates + n_rows

View File

@@ -64,7 +64,6 @@ toml_edit = { version = "0.19", features = ["serde"] }
tower = { version = "0.4", features = ["balance", "buffer", "limit", "retry", "timeout", "util"] }
tracing = { version = "0.1", features = ["log"] }
tracing-core = { version = "0.1" }
tungstenite = { version = "0.20" }
url = { version = "2", features = ["serde"] }
uuid = { version = "1", features = ["serde", "v4"] }