Files
neon/test_runner/logical_repl/test_debezium.py
Alexander Bayandin 30a7dd630c ruff: enable TC — flake8-type-checking (#11368)
## Problem

`TYPE_CHECKING` is used inconsistently across Python tests.

## Summary of changes
- Update `ruff`: 0.7.0 -> 0.11.2
- Enable TC (flake8-type-checking):
https://docs.astral.sh/ruff/rules/#flake8-type-checking-tc
- (auto)fix all new issues
2025-03-30 18:58:33 +00:00

190 lines
5.9 KiB
Python

"""
Test the logical replication in Neon with Debezium as a consumer
"""
from __future__ import annotations
import json
import os
import time
from typing import TYPE_CHECKING
import psycopg2
import pytest
import requests
from fixtures.log_helper import log
from fixtures.utils import wait_until
if TYPE_CHECKING:
from fixtures.neon_fixtures import RemotePostgres
class DebeziumAPI:
"""
The class for Debezium API calls
"""
def __init__(self):
self.__host = "debezium" if ("CI" in os.environ) else "127.0.0.1"
self.__base_url = f"http://{self.__host}:8083"
self.__connectors_url = f"{self.__base_url}/connectors"
def __request(self, method, addurl="", **kwargs):
return requests.request(
method,
self.__connectors_url + addurl,
headers={"Accept": "application/json", "Content-type": "application/json"},
timeout=60,
**kwargs,
)
def create_pg_connector(self, remote_pg: RemotePostgres, dbz_conn_name: str):
"""
Create a Postgres connector in debezium
"""
conn_options = remote_pg.conn_options()
payload = {
"name": dbz_conn_name,
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": conn_options["host"],
"database.port": "5432",
"database.user": conn_options["user"],
"database.password": conn_options["password"],
"database.dbname": conn_options["dbname"],
"plugin.name": "pgoutput",
"topic.prefix": "dbserver1",
"schema.include.list": "inventory",
},
}
return self.__request("POST", json=payload)
def list_connectors(self):
"""
Returns a list of all connectors existent in Debezium.
"""
resp = self.__request("GET")
assert resp.ok
return json.loads(resp.text)
def del_connector(self, connector):
"""
Deletes the specified connector
"""
return self.__request("DELETE", f"/{connector}")
@pytest.fixture(scope="function")
def debezium(remote_pg: RemotePostgres):
"""
Prepare the Debezium API handler, connection
"""
conn = psycopg2.connect(remote_pg.connstr())
cur = conn.cursor()
cur.execute("DROP SCHEMA IF EXISTS inventory CASCADE")
cur.execute("CREATE SCHEMA inventory")
cur.execute(
"CREATE TABLE inventory.customers ("
"id SERIAL NOT NULL PRIMARY KEY,"
"first_name character varying(255) NOT NULL,"
"last_name character varying(255) NOT NULL,"
"email character varying(255) NOT NULL)"
)
conn.commit()
dbz = DebeziumAPI()
assert len(dbz.list_connectors()) == 0
dbz_conn_name = "inventory-connector"
resp = dbz.create_pg_connector(remote_pg, dbz_conn_name)
log.debug("%s %s %s", resp.status_code, resp.ok, resp.text)
assert resp.status_code == 201
assert len(dbz.list_connectors()) == 1
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"dbserver1.inventory.customers",
bootstrap_servers=["kafka:9092"],
auto_offset_reset="earliest",
enable_auto_commit=False,
)
yield conn, consumer
resp = dbz.del_connector(dbz_conn_name)
assert resp.status_code == 204
def get_kafka_msg(consumer, ts_ms, before=None, after=None) -> None:
"""
Gets the message from Kafka and checks its validity
Arguments:
consumer: the consumer object
ts_ms: timestamp in milliseconds of the change of db, the corresponding message must have
the later timestamp
before: a dictionary, if not None, the before field from the kafka message must
have the same values for the same keys
after: a dictionary, if not None, the after field from the kafka message must
have the same values for the same keys
"""
msg = consumer.poll()
assert msg, "Empty message"
for val in msg.values():
r = json.loads(val[-1].value)
log.info(r["payload"])
assert ts_ms < r["payload"]["ts_ms"], "Incorrect timestamp"
for param, pname in ((before, "before"), (after, "after")):
if param is not None:
for k, v in param.items():
assert r["payload"][pname][k] == v, f"{pname} mismatches"
@pytest.mark.remote_cluster
def test_debezium(debezium):
"""
Test the logical replication having Debezium as a subscriber
"""
conn, consumer = debezium
cur = conn.cursor()
ts_ms = time.time() * 1000
log.info("Insert 1 ts_ms: %s", ts_ms)
cur.execute(
"insert into inventory.customers (first_name, last_name, email) "
"values ('John', 'Dow','johndow@example.com')"
)
conn.commit()
wait_until(
lambda: get_kafka_msg(
consumer,
ts_ms,
after={"first_name": "John", "last_name": "Dow", "email": "johndow@example.com"},
),
timeout=60,
)
ts_ms = time.time() * 1000
log.info("Insert 2 ts_ms: %s", ts_ms)
cur.execute(
"insert into inventory.customers (first_name, last_name, email) "
"values ('Alex', 'Row','alexrow@example.com')"
)
conn.commit()
wait_until(
lambda: get_kafka_msg(
consumer,
ts_ms,
after={"first_name": "Alex", "last_name": "Row", "email": "alexrow@example.com"},
),
timeout=60,
)
ts_ms = time.time() * 1000
log.info("Update ts_ms: %s", ts_ms)
cur.execute("update inventory.customers set first_name = 'Alexander' where id = 2")
conn.commit()
wait_until(
lambda: get_kafka_msg(
consumer,
ts_ms,
after={"first_name": "Alexander"},
),
timeout=60,
)
time.sleep(3)
cur.execute("select 1")