mirror of
https://github.com/neondatabase/neon.git
synced 2026-05-31 20:10:38 +00:00
Add a test using Debezium as a client for the logical replication (#8568)
## Problem We need to test the logical replication with some external consumers. ## Summary of changes A test of the logical replication with Debezium as a consumer was added. --------- Co-authored-by: Alexander Bayandin <alexander@neon.tech>
This commit is contained in:
26
.github/workflows/pg-clients.yml
vendored
26
.github/workflows/pg-clients.yml
vendored
@@ -66,7 +66,31 @@ jobs:
|
||||
ports:
|
||||
- 9000:9000
|
||||
- 8123:8123
|
||||
|
||||
zookeeper:
|
||||
image: quay.io/debezium/zookeeper:2.7
|
||||
ports:
|
||||
- 2181:2181
|
||||
kafka:
|
||||
image: quay.io/debezium/kafka:2.7
|
||||
env:
|
||||
ZOOKEEPER_CONNECT: "zookeeper:2181"
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
KAFKA_JMX_PORT: 9991
|
||||
ports:
|
||||
- 9092:9092
|
||||
debezium:
|
||||
image: quay.io/debezium/connect:2.7
|
||||
env:
|
||||
BOOTSTRAP_SERVERS: kafka:9092
|
||||
GROUP_ID: 1
|
||||
CONFIG_STORAGE_TOPIC: debezium-config
|
||||
OFFSET_STORAGE_TOPIC: debezium-offset
|
||||
STATUS_STORAGE_TOPIC: debezium-status
|
||||
DEBEZIUM_CONFIG_CONNECTOR_CLASS: io.debezium.connector.postgresql.PostgresConnector
|
||||
ports:
|
||||
- 8083:8083
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
|
||||
16
poetry.lock
generated
16
poetry.lock
generated
@@ -1514,6 +1514,20 @@ files = [
|
||||
[package.dependencies]
|
||||
six = "*"
|
||||
|
||||
[[package]]
|
||||
name = "kafka-python"
|
||||
version = "2.0.2"
|
||||
description = "Pure Python client for Apache Kafka"
|
||||
optional = false
|
||||
python-versions = "*"
|
||||
files = [
|
||||
{file = "kafka-python-2.0.2.tar.gz", hash = "sha256:04dfe7fea2b63726cd6f3e79a2d86e709d608d74406638c5da33a01d45a9d7e3"},
|
||||
{file = "kafka_python-2.0.2-py2.py3-none-any.whl", hash = "sha256:2d92418c7cb1c298fa6c7f0fb3519b520d0d7526ac6cb7ae2a4fc65a51a94b6e"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
crc32c = ["crc32c"]
|
||||
|
||||
[[package]]
|
||||
name = "lazy-object-proxy"
|
||||
version = "1.10.0"
|
||||
@@ -3357,4 +3371,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.9"
|
||||
content-hash = "7cee6a8c30bc7f4bfb0a87c6bad3952dfb4da127fad853d2710a93ac3eab8a00"
|
||||
content-hash = "d569a3593b98baceb0a88e176bdad63cae99d6bfc2a81bf6741663a4abcafd72"
|
||||
|
||||
@@ -41,6 +41,7 @@ httpx = {extras = ["http2"], version = "^0.26.0"}
|
||||
pytest-repeat = "^0.9.3"
|
||||
websockets = "^12.0"
|
||||
clickhouse-connect = "^0.7.16"
|
||||
kafka-python = "^2.0.2"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
mypy = "==1.3.0"
|
||||
@@ -74,6 +75,7 @@ module = [
|
||||
"allure.*",
|
||||
"allure_commons.*",
|
||||
"allure_pytest.*",
|
||||
"kafka.*",
|
||||
]
|
||||
ignore_missing_imports = true
|
||||
|
||||
|
||||
@@ -389,7 +389,10 @@ WaitUntilRet = TypeVar("WaitUntilRet")
|
||||
|
||||
|
||||
def wait_until(
|
||||
number_of_iterations: int, interval: float, func: Callable[[], WaitUntilRet]
|
||||
number_of_iterations: int,
|
||||
interval: float,
|
||||
func: Callable[[], WaitUntilRet],
|
||||
show_intermediate_error=False,
|
||||
) -> WaitUntilRet:
|
||||
"""
|
||||
Wait until 'func' returns successfully, without exception. Returns the
|
||||
@@ -402,6 +405,8 @@ def wait_until(
|
||||
except Exception as e:
|
||||
log.info("waiting for %s iteration %s failed", func, i + 1)
|
||||
last_exception = e
|
||||
if show_intermediate_error:
|
||||
log.info(e)
|
||||
time.sleep(interval)
|
||||
continue
|
||||
return res
|
||||
|
||||
22
test_runner/logical_repl/README.md
Normal file
22
test_runner/logical_repl/README.md
Normal file
@@ -0,0 +1,22 @@
|
||||
# Logical replication tests
|
||||
|
||||
## Clickhouse
|
||||
|
||||
```bash
|
||||
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
|
||||
|
||||
docker compose -f clickhouse/docker-compose.yml up -d
|
||||
pytest -m remote_cluster -k test_clickhouse
|
||||
docker compose -f clickhouse/docker-compose.yml down
|
||||
```
|
||||
|
||||
## Debezium
|
||||
|
||||
```bash
|
||||
export BENCHMARK_CONNSTR=postgres://user:pass@ep-abc-xyz-123.us-east-2.aws.neon.build/neondb
|
||||
|
||||
docker compose -f debezium/docker-compose.yml up -d
|
||||
pytest -m remote_cluster -k test_debezium
|
||||
docker compose -f debezium/docker-compose.yml down
|
||||
|
||||
```
|
||||
9
test_runner/logical_repl/clickhouse/docker-compose.yml
Normal file
9
test_runner/logical_repl/clickhouse/docker-compose.yml
Normal file
@@ -0,0 +1,9 @@
|
||||
services:
|
||||
clickhouse:
|
||||
image: clickhouse/clickhouse-server
|
||||
user: "101:101"
|
||||
container_name: clickhouse
|
||||
hostname: clickhouse
|
||||
ports:
|
||||
- 127.0.0.1:8123:8123
|
||||
- 127.0.0.1:9000:9000
|
||||
24
test_runner/logical_repl/debezium/docker-compose.yml
Normal file
24
test_runner/logical_repl/debezium/docker-compose.yml
Normal file
@@ -0,0 +1,24 @@
|
||||
services:
|
||||
zookeeper:
|
||||
image: quay.io/debezium/zookeeper:2.7
|
||||
kafka:
|
||||
image: quay.io/debezium/kafka:2.7
|
||||
environment:
|
||||
ZOOKEEPER_CONNECT: "zookeeper:2181"
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||
KAFKA_JMX_PORT: 9991
|
||||
ports:
|
||||
- 127.0.0.1:9092:9092
|
||||
debezium:
|
||||
image: quay.io/debezium/connect:2.7
|
||||
environment:
|
||||
BOOTSTRAP_SERVERS: kafka:9092
|
||||
GROUP_ID: 1
|
||||
CONFIG_STORAGE_TOPIC: debezium-config
|
||||
OFFSET_STORAGE_TOPIC: debezium-offset
|
||||
STATUS_STORAGE_TOPIC: debezium-status
|
||||
DEBEZIUM_CONFIG_CONNECTOR_CLASS: io.debezium.connector.postgresql.PostgresConnector
|
||||
ports:
|
||||
- 127.0.0.1:8083:8083
|
||||
@@ -1,8 +1,9 @@
|
||||
"""
|
||||
Test the logical replication in Neon with the different consumers
|
||||
Test the logical replication in Neon with ClickHouse as a consumer
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import os
|
||||
import time
|
||||
|
||||
import clickhouse_connect
|
||||
@@ -39,22 +40,15 @@ def test_clickhouse(remote_pg: RemotePostgres):
|
||||
"""
|
||||
Test the logical replication having ClickHouse as a client
|
||||
"""
|
||||
clickhouse_host = "clickhouse" if ("CI" in os.environ) else "127.0.0.1"
|
||||
conn_options = remote_pg.conn_options()
|
||||
for _ in range(5):
|
||||
try:
|
||||
conn = psycopg2.connect(remote_pg.connstr())
|
||||
except psycopg2.OperationalError as perr:
|
||||
log.debug(perr)
|
||||
time.sleep(1)
|
||||
else:
|
||||
break
|
||||
raise TimeoutError
|
||||
conn = psycopg2.connect(remote_pg.connstr())
|
||||
cur = conn.cursor()
|
||||
cur.execute("DROP TABLE IF EXISTS table1")
|
||||
cur.execute("CREATE TABLE table1 (id integer primary key, column1 varchar(10));")
|
||||
cur.execute("INSERT INTO table1 (id, column1) VALUES (1, 'abc'), (2, 'def');")
|
||||
conn.commit()
|
||||
client = clickhouse_connect.get_client(host="clickhouse")
|
||||
client = clickhouse_connect.get_client(host=clickhouse_host)
|
||||
client.command("SET allow_experimental_database_materialized_postgresql=1")
|
||||
client.command(
|
||||
"CREATE DATABASE db1_postgres ENGINE = "
|
||||
189
test_runner/logical_repl/test_debezium.py
Normal file
189
test_runner/logical_repl/test_debezium.py
Normal file
@@ -0,0 +1,189 @@
|
||||
"""
|
||||
Test the logical replication in Neon with Debezium as a consumer
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
import psycopg2
|
||||
import pytest
|
||||
import requests
|
||||
from fixtures.log_helper import log
|
||||
from fixtures.neon_fixtures import RemotePostgres
|
||||
from fixtures.utils import wait_until
|
||||
from kafka import KafkaConsumer
|
||||
|
||||
|
||||
class DebeziumAPI:
|
||||
"""
|
||||
The class for Debezium API calls
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.__host = "debezium" if ("CI" in os.environ) else "127.0.0.1"
|
||||
self.__base_url = f"http://{self.__host}:8083"
|
||||
self.__connectors_url = f"{self.__base_url}/connectors"
|
||||
|
||||
def __request(self, method, addurl="", **kwargs):
|
||||
return requests.request(
|
||||
method,
|
||||
self.__connectors_url + addurl,
|
||||
headers={"Accept": "application/json", "Content-type": "application/json"},
|
||||
timeout=60,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
def create_pg_connector(self, remote_pg: RemotePostgres, dbz_conn_name: str):
|
||||
"""
|
||||
Create a Postgres connector in debezium
|
||||
"""
|
||||
conn_options = remote_pg.conn_options()
|
||||
payload = {
|
||||
"name": dbz_conn_name,
|
||||
"config": {
|
||||
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
|
||||
"tasks.max": "1",
|
||||
"database.hostname": conn_options["host"],
|
||||
"database.port": "5432",
|
||||
"database.user": conn_options["user"],
|
||||
"database.password": conn_options["password"],
|
||||
"database.dbname": conn_options["dbname"],
|
||||
"plugin.name": "pgoutput",
|
||||
"topic.prefix": "dbserver1",
|
||||
"schema.include.list": "inventory",
|
||||
},
|
||||
}
|
||||
return self.__request("POST", json=payload)
|
||||
|
||||
def list_connectors(self):
|
||||
"""
|
||||
Returns a list of all connectors existent in Debezium.
|
||||
"""
|
||||
resp = self.__request("GET")
|
||||
assert resp.ok
|
||||
return json.loads(resp.text)
|
||||
|
||||
def del_connector(self, connector):
|
||||
"""
|
||||
Deletes the specified connector
|
||||
"""
|
||||
return self.__request("DELETE", f"/{connector}")
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def debezium(remote_pg: RemotePostgres):
|
||||
"""
|
||||
Prepare the Debezium API handler, connection
|
||||
"""
|
||||
conn = psycopg2.connect(remote_pg.connstr())
|
||||
cur = conn.cursor()
|
||||
cur.execute("DROP SCHEMA IF EXISTS inventory CASCADE")
|
||||
cur.execute("CREATE SCHEMA inventory")
|
||||
cur.execute(
|
||||
"CREATE TABLE inventory.customers ("
|
||||
"id SERIAL NOT NULL PRIMARY KEY,"
|
||||
"first_name character varying(255) NOT NULL,"
|
||||
"last_name character varying(255) NOT NULL,"
|
||||
"email character varying(255) NOT NULL)"
|
||||
)
|
||||
conn.commit()
|
||||
dbz = DebeziumAPI()
|
||||
assert len(dbz.list_connectors()) == 0
|
||||
dbz_conn_name = "inventory-connector"
|
||||
resp = dbz.create_pg_connector(remote_pg, dbz_conn_name)
|
||||
log.debug("%s %s %s", resp.status_code, resp.ok, resp.text)
|
||||
assert resp.status_code == 201
|
||||
assert len(dbz.list_connectors()) == 1
|
||||
consumer = KafkaConsumer(
|
||||
"dbserver1.inventory.customers",
|
||||
bootstrap_servers=["kafka:9092"],
|
||||
auto_offset_reset="earliest",
|
||||
enable_auto_commit=False,
|
||||
)
|
||||
yield conn, consumer
|
||||
resp = dbz.del_connector(dbz_conn_name)
|
||||
assert resp.status_code == 204
|
||||
|
||||
|
||||
def get_kafka_msg(consumer, ts_ms, before=None, after=None) -> None:
|
||||
"""
|
||||
Gets the message from Kafka and checks its validity
|
||||
Arguments:
|
||||
consumer: the consumer object
|
||||
ts_ms: timestamp in milliseconds of the change of db, the corresponding message must have
|
||||
the later timestamp
|
||||
before: a dictionary, if not None, the before field from the kafka message must
|
||||
have the same values for the same keys
|
||||
after: a dictionary, if not None, the after field from the kafka message must
|
||||
have the same values for the same keys
|
||||
"""
|
||||
msg = consumer.poll()
|
||||
assert msg, "Empty message"
|
||||
for val in msg.values():
|
||||
r = json.loads(val[-1].value)
|
||||
log.info(r["payload"])
|
||||
assert ts_ms < r["payload"]["ts_ms"], "Incorrect timestamp"
|
||||
for param, pname in ((before, "before"), (after, "after")):
|
||||
if param is not None:
|
||||
for k, v in param.items():
|
||||
assert r["payload"][pname][k] == v, f"{pname} mismatches"
|
||||
|
||||
|
||||
@pytest.mark.remote_cluster
|
||||
def test_debezium(debezium):
|
||||
"""
|
||||
Test the logical replication having Debezium as a subscriber
|
||||
"""
|
||||
conn, consumer = debezium
|
||||
cur = conn.cursor()
|
||||
ts_ms = time.time() * 1000
|
||||
log.info("Insert 1 ts_ms: %s", ts_ms)
|
||||
cur.execute(
|
||||
"insert into inventory.customers (first_name, last_name, email) "
|
||||
"values ('John', 'Dow','johndow@example.com')"
|
||||
)
|
||||
conn.commit()
|
||||
wait_until(
|
||||
100,
|
||||
0.5,
|
||||
lambda: get_kafka_msg(
|
||||
consumer,
|
||||
ts_ms,
|
||||
after={"first_name": "John", "last_name": "Dow", "email": "johndow@example.com"},
|
||||
),
|
||||
show_intermediate_error=True,
|
||||
)
|
||||
ts_ms = time.time() * 1000
|
||||
log.info("Insert 2 ts_ms: %s", ts_ms)
|
||||
cur.execute(
|
||||
"insert into inventory.customers (first_name, last_name, email) "
|
||||
"values ('Alex', 'Row','alexrow@example.com')"
|
||||
)
|
||||
conn.commit()
|
||||
wait_until(
|
||||
100,
|
||||
0.5,
|
||||
lambda: get_kafka_msg(
|
||||
consumer,
|
||||
ts_ms,
|
||||
after={"first_name": "Alex", "last_name": "Row", "email": "alexrow@example.com"},
|
||||
),
|
||||
show_intermediate_error=True,
|
||||
)
|
||||
ts_ms = time.time() * 1000
|
||||
log.info("Update ts_ms: %s", ts_ms)
|
||||
cur.execute("update inventory.customers set first_name = 'Alexander' where id = 2")
|
||||
conn.commit()
|
||||
wait_until(
|
||||
100,
|
||||
0.5,
|
||||
lambda: get_kafka_msg(
|
||||
consumer,
|
||||
ts_ms,
|
||||
after={"first_name": "Alexander"},
|
||||
),
|
||||
show_intermediate_error=True,
|
||||
)
|
||||
time.sleep(3)
|
||||
cur.execute("select 1")
|
||||
Reference in New Issue
Block a user