Compare commits

...

18 Commits

Author SHA1 Message Date
Arpad Müller
e97e86eb43 poetry lock 2024-05-06 12:58:34 +02:00
Arpad Müller
c9370d48de Merge remote-tracking branch 'origin/main' into arpad/less_async_trait 2024-05-06 12:54:51 +02:00
Arseny Sher
e6da7e29ed Add option allowing running multiple endpoints on the same branch.
This is used by safekeeper tests.
2024-05-06 11:08:51 +03:00
Arseny Sher
0353a72a00 pg_waldump segment on safekeeper in test_pg_waldump.
To test it as well.
2024-05-06 07:18:38 +03:00
Arseny Sher
ce4d3da3ae Properly initialize first WAL segment on safekeepers.
Previously its segment header and page header of first record weren't
initialized because compute streams data only since first record LSN. Also, fix
a bug in the existing code for initialization: xlp_rem_len must not include page
header.

These changes make first segment pg_waldump'able.
2024-05-06 07:18:38 +03:00
Arseny Sher
5da3e2113a Allow bad state (not active) pageserver error/warns in walcraft test.
The top reason for it being flaky.
2024-05-06 06:45:27 +03:00
Arpad Müller
6f714c308b Adjust timeouts 2024-04-08 17:02:40 +02:00
Arpad Müller
2e6afaa642 Merge remote-tracking branch 'origin/main' into arpad/less_async_trait 2024-04-08 16:58:47 +02:00
Alexander Bayandin
8f0a0440ba CI: reduce session timeout to 30 minutes 2024-04-08 12:29:24 +01:00
Alexander Bayandin
987dc01ed7 CI: set fix timeout value in seconds for regression tests 2024-04-05 15:05:07 +01:00
Alexander Bayandin
719e4ad580 Bump pytest-timeout from 2.1.0 to 2.3.1 2024-04-05 14:58:58 +01:00
Alexander Bayandin
e61b2a08b3 CI: set pytest timeout for regression test suite 2024-04-05 12:54:47 +01:00
Arpad Müller
cc89b46ae5 Merge branch 'main' into arpad/less_async_trait 2024-04-04 16:30:09 +02:00
Arpad Müller
d5cbdd2e90 Remove it here as well 2024-04-04 12:36:28 +02:00
Arpad Müller
6ad9c3560e Merge branch 'main' into arpad/less_async_trait 2024-04-04 12:27:38 +02:00
Arpad Müller
9dc3b09e57 Remove async-trait from Cargo.toml of crates it became unused in 2024-04-03 23:25:06 +02:00
Arpad Müller
fe762e35d8 Remove async_trait from Handler trait as well 2024-04-03 23:21:16 +02:00
Arpad Müller
0c4988a92c Remove async_trait from CompactionDeltaLayer 2024-04-03 23:21:16 +02:00
21 changed files with 2011 additions and 1210 deletions

View File

@@ -48,6 +48,10 @@ inputs:
description: 'benchmark durations JSON'
required: false
default: '{}'
session_timeout:
description: 'Session timeout for the test suite'
required: false
default: ''
runs:
using: "composite"
@@ -107,6 +111,7 @@ runs:
ALLOW_FORWARD_COMPATIBILITY_BREAKAGE: contains(github.event.pull_request.labels.*.name, 'forward compatibility breakage')
RERUN_FLAKY: ${{ inputs.rerun_flaky }}
PG_VERSION: ${{ inputs.pg_version }}
SESSION_TIMEOUT: ${{ inputs.session_timeout }}
shell: bash -euxo pipefail {0}
run: |
# PLATFORM will be embedded in the perf test report
@@ -168,6 +173,10 @@ runs:
EXTRA_PARAMS="--durations-path $TEST_OUTPUT/benchmark_durations.json $EXTRA_PARAMS"
fi
if [ -n "${SESSION_TIMEOUT}" ]; then
EXTRA_PARAMS="--session-timeout ${SESSION_TIMEOUT} ${EXTRA_PARAMS}"
fi
if [[ "${{ inputs.build_type }}" == "debug" ]]; then
cov_prefix=(scripts/coverage "--profraw-prefix=$GITHUB_JOB" --dir=/tmp/coverage run)
elif [[ "${{ inputs.build_type }}" == "release" ]]; then

View File

@@ -461,7 +461,8 @@ jobs:
- name: Pytest regression tests
uses: ./.github/actions/run-python-test-set
timeout-minutes: 60
# Hard timeout to prevent hanging tests, we also have set softer pytest timeout (set via `session_timeout`) which is shorter
timeout-minutes: 110
with:
build_type: ${{ matrix.build_type }}
test_selection: regress
@@ -471,6 +472,8 @@ jobs:
real_s3_region: eu-central-1
rerun_flaky: true
pg_version: ${{ matrix.pg_version }}
# Set pytest session timeout to 25 minutes
session_timeout: '1500'
env:
TEST_RESULT_CONNSTR: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }}
CHECK_ONDISK_DATA_COMPATIBILITY: nonempty

2
Cargo.lock generated
View File

@@ -3638,7 +3638,6 @@ dependencies = [
"arc-swap",
"async-compression",
"async-stream",
"async-trait",
"byteorder",
"bytes",
"camino",
@@ -4107,7 +4106,6 @@ name = "postgres_backend"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"futures",
"once_cell",

View File

@@ -837,6 +837,8 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
.copied()
.unwrap_or(false);
let allow_multiple = sub_args.get_flag("allow-multiple");
let mode = match (lsn, hot_standby) {
(Some(lsn), false) => ComputeMode::Static(lsn),
(None, true) => ComputeMode::Replica,
@@ -854,7 +856,9 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
_ => {}
}
cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
if !allow_multiple {
cplane.check_conflicting_endpoints(mode, tenant_id, timeline_id)?;
}
cplane.new_endpoint(
&endpoint_id,
@@ -883,6 +887,8 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
let remote_ext_config = sub_args.get_one::<String>("remote-ext-config");
let allow_multiple = sub_args.get_flag("allow-multiple");
// If --safekeepers argument is given, use only the listed safekeeper nodes.
let safekeepers =
if let Some(safekeepers_str) = sub_args.get_one::<String>("safekeepers") {
@@ -908,11 +914,13 @@ async fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Re
.cloned()
.unwrap_or_default();
cplane.check_conflicting_endpoints(
endpoint.mode,
endpoint.tenant_id,
endpoint.timeline_id,
)?;
if !allow_multiple {
cplane.check_conflicting_endpoints(
endpoint.mode,
endpoint.tenant_id,
endpoint.timeline_id,
)?;
}
let (pageservers, stripe_size) = if let Some(pageserver_id) = pageserver_id {
let conf = env.get_pageserver_conf(pageserver_id).unwrap();
@@ -1444,6 +1452,12 @@ fn cli() -> Command {
.help("If set, will create test user `user` and `neondb` database. Requires `update-catalog = true`")
.required(false);
let allow_multiple = Arg::new("allow-multiple")
.help("Allow multiple primary endpoints running on the same branch. Shouldn't be used normally, but useful for tests.")
.long("allow-multiple")
.action(ArgAction::SetTrue)
.required(false);
Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
@@ -1601,6 +1615,7 @@ fn cli() -> Command {
.arg(pg_version_arg.clone())
.arg(hot_standby_arg.clone())
.arg(update_catalog)
.arg(allow_multiple.clone())
)
.subcommand(Command::new("start")
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
@@ -1609,6 +1624,7 @@ fn cli() -> Command {
.arg(safekeepers_arg)
.arg(remote_ext_config_args)
.arg(create_test_user)
.arg(allow_multiple.clone())
)
.subcommand(Command::new("reconfigure")
.about("Reconfigure the endpoint")

View File

@@ -5,7 +5,6 @@ edition.workspace = true
license.workspace = true
[dependencies]
async-trait.workspace = true
anyhow.workspace = true
bytes.workspace = true
futures.workspace = true

View File

@@ -78,17 +78,16 @@ pub fn is_expected_io_error(e: &io::Error) -> bool {
)
}
#[async_trait::async_trait]
pub trait Handler<IO> {
/// Handle single query.
/// postgres_backend will issue ReadyForQuery after calling this (this
/// might be not what we want after CopyData streaming, but currently we don't
/// care). It will also flush out the output buffer.
async fn process_query(
fn process_query(
&mut self,
pgb: &mut PostgresBackend<IO>,
query_string: &str,
) -> Result<(), QueryError>;
) -> impl Future<Output = Result<(), QueryError>> + Send;
/// Called on startup packet receival, allows to process params.
///

View File

@@ -22,7 +22,6 @@ async fn make_tcp_pair() -> (TcpStream, TcpStream) {
struct TestHandler {}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> Handler<IO> for TestHandler {
// return single col 'hey' for any query
async fn process_query(

View File

@@ -331,7 +331,10 @@ impl CheckPoint {
/// Returns 'true' if the XID was updated.
pub fn update_next_xid(&mut self, xid: u32) -> bool {
// nextXid should be greater than any XID in WAL, so increment provided XID and check for wraparround.
let mut new_xid = std::cmp::max(xid.wrapping_add(1), pg_constants::FIRST_NORMAL_TRANSACTION_ID);
let mut new_xid = std::cmp::max(
xid.wrapping_add(1),
pg_constants::FIRST_NORMAL_TRANSACTION_ID,
);
// To reduce number of metadata checkpoints, we forward align XID on XID_CHECKPOINT_INTERVAL.
// XID_CHECKPOINT_INTERVAL should not be larger than BLCKSZ*CLOG_XACTS_PER_BYTE
new_xid =
@@ -367,8 +370,16 @@ pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Byte
let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
let first_page_only = seg_off < XLOG_BLCKSZ;
let (shdr_rem_len, infoflags) = if first_page_only {
(seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD)
// If first records starts in the middle of the page, pretend in page header
// there is a fake record which ends where first real record starts. This
// makes pg_waldump etc happy.
let (shdr_rem_len, infoflags) = if first_page_only && seg_off > 0 {
assert!(seg_off >= XLOG_SIZE_OF_XLOG_LONG_PHD);
// xlp_rem_len doesn't include page header, hence the subtraction.
(
seg_off - XLOG_SIZE_OF_XLOG_LONG_PHD,
pg_constants::XLP_FIRST_IS_CONTRECORD,
)
} else {
(0, 0)
};
@@ -397,20 +408,22 @@ pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Byte
if !first_page_only {
let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
// see comments above about XLP_FIRST_IS_CONTRECORD and xlp_rem_len.
let (xlp_rem_len, xlp_info) = if page_off > 0 {
assert!(page_off >= XLOG_SIZE_OF_XLOG_SHORT_PHD as u64);
(
(page_off - XLOG_SIZE_OF_XLOG_SHORT_PHD as u64) as u32,
pg_constants::XLP_FIRST_IS_CONTRECORD,
)
} else {
(0, 0)
};
let header = XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
pg_constants::XLP_FIRST_IS_CONTRECORD
} else {
0
},
xlp_info,
xlp_tli: PG_TLI,
xlp_pageaddr: lsn.page_lsn().0,
xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
page_off as u32
} else {
0u32
},
xlp_rem_len,
..Default::default() // Put 0 in padding fields.
};
let hdr_bytes = header.encode()?;

View File

@@ -15,7 +15,6 @@ anyhow.workspace = true
arc-swap.workspace = true
async-compression.workspace = true
async-stream.workspace = true
async-trait.workspace = true
byteorder.workspace = true
bytes.workspace = true
camino.workspace = true

View File

@@ -1384,7 +1384,6 @@ impl PageServerHandler {
}
}
#[async_trait::async_trait]
impl<IO> postgres_backend::Handler<IO> for PageServerHandler
where
IO: AsyncRead + AsyncWrite + Send + Sync + Unpin,

3000
poetry.lock generated

File diff suppressed because one or more lines are too long

View File

@@ -75,7 +75,6 @@ pub type ComputeReady = DatabaseInfo;
// TODO: replace with an http-based protocol.
struct MgmtHandler;
#[async_trait::async_trait]
impl postgres_backend::Handler<tokio::net::TcpStream> for MgmtHandler {
async fn process_query(
&mut self,

View File

@@ -23,7 +23,7 @@ moto = {extras = ["server"], version = "^4.1.2"}
backoff = "^2.2.1"
pytest-lazy-fixture = "^0.6.3"
prometheus-client = "^0.14.1"
pytest-timeout = "^2.1.0"
pytest-timeout = "^2.3.1"
Werkzeug = "^3.0.1"
pytest-order = "^1.1.0"
allure-pytest = "^2.13.2"

View File

@@ -95,7 +95,6 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
}
}
#[async_trait::async_trait]
impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
for SafekeeperPostgresHandler
{

View File

@@ -725,6 +725,18 @@ where
self.state.inmem.commit_lsn
);
// Before first WAL write initialize its segment. It makes first segment
// pg_waldump'able because stream from compute doesn't include its
// segment and page headers.
//
// If we fail before first WAL write flush this action would be
// repeated, that's ok because it is idempotent.
if self.wal_store.flush_lsn() == Lsn::INVALID {
self.wal_store
.initialize_first_segment(msg.start_streaming_at)
.await?;
}
// TODO: cross check divergence point, check if msg.start_streaming_at corresponds to
// intersection of our history and history from msg
@@ -1007,6 +1019,10 @@ mod tests {
self.lsn
}
async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
Ok(())
}
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
self.lsn = startpos + buf.len() as u64;
Ok(())

View File

@@ -38,6 +38,12 @@ pub trait Storage {
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
/// Initialize segment by creating proper long header at the beginning of
/// the segment and short header at the page of given LSN. This is only used
/// for timeline initialization because compute will stream data only since
/// init_lsn. Other segment headers are included in compute stream.
async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()>;
/// Write piece of WAL from buf to disk, but not necessarily sync it.
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>;
@@ -78,6 +84,8 @@ pub struct PhysicalStorage {
/// Size of WAL segment in bytes.
wal_seg_size: usize,
pg_version: u32,
system_id: u64,
/// Written to disk, but possibly still in the cache and not fully persisted.
/// Also can be ahead of record_lsn, if happen to be in the middle of a WAL record.
@@ -169,6 +177,8 @@ impl PhysicalStorage {
timeline_dir,
conf: conf.clone(),
wal_seg_size,
pg_version: state.server.pg_version,
system_id: state.server.system_id,
write_lsn,
write_record_lsn: write_lsn,
flush_record_lsn: flush_lsn,
@@ -324,6 +334,20 @@ impl Storage for PhysicalStorage {
self.flush_record_lsn
}
async fn initialize_first_segment(&mut self, init_lsn: Lsn) -> Result<()> {
let segno = init_lsn.segment_number(self.wal_seg_size);
let (mut file, _) = self.open_or_create(segno).await?;
let major_pg_version = self.pg_version / 10000;
let wal_seg =
postgres_ffi::generate_wal_segment(segno, self.system_id, major_pg_version, init_lsn)?;
file.seek(SeekFrom::Start(0)).await?;
file.write_all(&wal_seg).await?;
file.flush().await?;
info!("initialized segno {} at lsn {}", segno, init_lsn);
// note: file is *not* fsynced
Ok(())
}
/// Write WAL to disk.
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
// Disallow any non-sequential writes, which can result in gaps or overwrites.

View File

@@ -182,6 +182,10 @@ impl wal_storage::Storage for DiskWALStorage {
self.flush_record_lsn
}
async fn initialize_first_segment(&mut self, _init_lsn: Lsn) -> Result<()> {
Ok(())
}
/// Write piece of WAL from buf to disk, but not necessarily sync it.
async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> {
if self.write_lsn != startpos {

View File

@@ -1801,6 +1801,7 @@ class NeonCli(AbstractNeonCli):
hot_standby: bool = False,
lsn: Optional[Lsn] = None,
pageserver_id: Optional[int] = None,
allow_multiple=False,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
@@ -1824,6 +1825,8 @@ class NeonCli(AbstractNeonCli):
args.extend(["--hot-standby", "true"])
if pageserver_id is not None:
args.extend(["--pageserver-id", str(pageserver_id)])
if allow_multiple:
args.extend(["--allow-multiple"])
res = self.raw_cli(args)
res.check_returncode()
@@ -1835,6 +1838,7 @@ class NeonCli(AbstractNeonCli):
safekeepers: Optional[List[int]] = None,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
allow_multiple=False,
) -> "subprocess.CompletedProcess[str]":
args = [
"endpoint",
@@ -1849,6 +1853,8 @@ class NeonCli(AbstractNeonCli):
args.append(endpoint_id)
if pageserver_id is not None:
args.extend(["--pageserver-id", str(pageserver_id)])
if allow_multiple:
args.extend(["--allow-multiple"])
res = self.raw_cli(args)
res.check_returncode()
@@ -3299,6 +3305,7 @@ class Endpoint(PgProtocol):
lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None,
pageserver_id: Optional[int] = None,
allow_multiple: bool = False,
) -> "Endpoint":
"""
Create a new Postgres endpoint.
@@ -3321,6 +3328,7 @@ class Endpoint(PgProtocol):
pg_port=self.pg_port,
http_port=self.http_port,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
)
path = Path("endpoints") / self.endpoint_id / "pgdata"
self.pgdata_dir = os.path.join(self.env.repo_dir, path)
@@ -3337,7 +3345,10 @@ class Endpoint(PgProtocol):
return self
def start(
self, remote_ext_config: Optional[str] = None, pageserver_id: Optional[int] = None
self,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
allow_multiple: bool = False,
) -> "Endpoint":
"""
Start the Postgres instance.
@@ -3353,6 +3364,7 @@ class Endpoint(PgProtocol):
safekeepers=self.active_safekeepers,
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
)
self.running = True
@@ -3482,6 +3494,7 @@ class Endpoint(PgProtocol):
config_lines: Optional[List[str]] = None,
remote_ext_config: Optional[str] = None,
pageserver_id: Optional[int] = None,
allow_multiple=False,
) -> "Endpoint":
"""
Create an endpoint, apply config, and start Postgres.
@@ -3497,7 +3510,12 @@ class Endpoint(PgProtocol):
hot_standby=hot_standby,
lsn=lsn,
pageserver_id=pageserver_id,
).start(remote_ext_config=remote_ext_config, pageserver_id=pageserver_id)
allow_multiple=allow_multiple,
).start(
remote_ext_config=remote_ext_config,
pageserver_id=pageserver_id,
allow_multiple=allow_multiple,
)
log.info(f"Postgres startup took {time.time() - started_at} seconds")

View File

@@ -19,6 +19,12 @@ from fixtures.neon_fixtures import NeonEnvBuilder, WalCraft
def test_crafted_wal_end(neon_env_builder: NeonEnvBuilder, wal_type: str):
env = neon_env_builder.init_start()
env.neon_cli.create_branch("test_crafted_wal_end")
env.pageserver.allowed_errors.extend(
[
# seems like pageserver stop triggers these
".*initial size calculation failed.*Bad state (not active).*",
]
)
endpoint = env.endpoints.create("test_crafted_wal_end")
wal_craft = WalCraft(env)

View File

@@ -1,13 +1,28 @@
import os
import shutil
from fixtures.neon_fixtures import NeonEnv, PgBin
from fixtures.utils import subprocess_capture
def check_wal_segment(pg_waldump_path: str, segment_path: str, test_output_dir):
# use special --ignore option to ignore the validation checks in pg_waldump
# this is necessary, because neon WAL files contain gap at the beginning
output_path, _, _ = subprocess_capture(
test_output_dir, [pg_waldump_path, "--ignore", segment_path]
)
with open(f"{output_path}.stdout", "r") as f:
stdout = f.read()
assert "ABORT" in stdout
assert "COMMIT" in stdout
# Simple test to check that pg_waldump works with neon WAL files
def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
env = neon_simple_env
env.neon_cli.create_branch("test_pg_waldump", "empty")
tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_pg_waldump", "empty")
endpoint = env.endpoints.create_start("test_pg_waldump")
cur = endpoint.connect().cursor()
@@ -35,12 +50,12 @@ def test_pg_waldump(neon_simple_env: NeonEnv, test_output_dir, pg_bin: PgBin):
assert endpoint.pgdata_dir
wal_path = os.path.join(endpoint.pgdata_dir, "pg_wal/000000010000000000000001")
pg_waldump_path = os.path.join(pg_bin.pg_bin_path, "pg_waldump")
# check segment on compute
check_wal_segment(pg_waldump_path, wal_path, test_output_dir)
# use special --ignore option to ignore the validation checks in pg_waldump
# this is necessary, because neon WAL files contain gap at the beginning
output_path, _, _ = subprocess_capture(test_output_dir, [pg_waldump_path, "--ignore", wal_path])
with open(f"{output_path}.stdout", "r") as f:
stdout = f.read()
assert "ABORT" in stdout
assert "COMMIT" in stdout
# Check file on safekeepers as well. pg_waldump is strict about file naming, so remove .partial suffix.
sk = env.safekeepers[0]
sk_tli_dir = sk.timeline_dir(tenant_id, timeline_id)
non_partial_path = os.path.join(sk_tli_dir, "000000010000000000000001")
shutil.copyfile(os.path.join(sk_tli_dir, "000000010000000000000001.partial"), non_partial_path)
check_wal_segment(pg_waldump_path, non_partial_path, test_output_dir)

View File

@@ -254,7 +254,9 @@ def test_restarts_frequent_checkpoints(neon_env_builder: NeonEnvBuilder):
)
def endpoint_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]):
def endpoint_create_start(
env: NeonEnv, branch: str, pgdir_name: Optional[str], allow_multiple: bool = False
):
endpoint = Endpoint(
env,
tenant_id=env.initial_tenant,
@@ -268,14 +270,23 @@ def endpoint_create_start(env: NeonEnv, branch: str, pgdir_name: Optional[str]):
# embed current time in endpoint ID
endpoint_id = pgdir_name or f"ep-{time.time()}"
return endpoint.create_start(
branch_name=branch, endpoint_id=endpoint_id, config_lines=["log_statement=all"]
branch_name=branch,
endpoint_id=endpoint_id,
config_lines=["log_statement=all"],
allow_multiple=allow_multiple,
)
async def exec_compute_query(
env: NeonEnv, branch: str, query: str, pgdir_name: Optional[str] = None
env: NeonEnv,
branch: str,
query: str,
pgdir_name: Optional[str] = None,
allow_multiple: bool = False,
):
with endpoint_create_start(env, branch=branch, pgdir_name=pgdir_name) as endpoint:
with endpoint_create_start(
env, branch=branch, pgdir_name=pgdir_name, allow_multiple=allow_multiple
) as endpoint:
before_conn = time.time()
conn = await endpoint.connect_async()
res = await conn.fetch(query)
@@ -347,6 +358,7 @@ class BackgroundCompute(object):
self.branch,
f"INSERT INTO query_log(index, verify_key) VALUES ({self.index}, {verify_key}) RETURNING verify_key",
pgdir_name=f"bgcompute{self.index}_key{verify_key}",
allow_multiple=True,
)
log.info(f"result: {res}")
if len(res) != 1: