mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-15 01:12:56 +00:00
python based regression test setup for auth_broker. This uses a http mock for cplane as well as the JWKs url. complications: 1. We cannot just use local_proxy binary, as that requires the pg_session_jwt extension which we don't have available in the current test suite 2. We cannot use just any old http mock for local_proxy, as auth_broker requires http2 to local_proxy as such, I used the h2 library to implement an echo server - copied from the examples in the h2 docs.
62 lines
2.0 KiB
Python
62 lines
2.0 KiB
Python
import enum
|
|
from collections.abc import MutableMapping
|
|
from typing import Any
|
|
|
|
from _typeshed import Incomplete
|
|
from h2.errors import ErrorCodes as ErrorCodes
|
|
from h2.exceptions import InvalidSettingsValueError as InvalidSettingsValueError
|
|
|
|
class SettingCodes(enum.IntEnum):
|
|
HEADER_TABLE_SIZE: Incomplete
|
|
ENABLE_PUSH: Incomplete
|
|
MAX_CONCURRENT_STREAMS: Incomplete
|
|
INITIAL_WINDOW_SIZE: Incomplete
|
|
MAX_FRAME_SIZE: Incomplete
|
|
MAX_HEADER_LIST_SIZE: Incomplete
|
|
ENABLE_CONNECT_PROTOCOL: Incomplete
|
|
|
|
class ChangedSetting:
|
|
setting: Incomplete
|
|
original_value: Incomplete
|
|
new_value: Incomplete
|
|
def __init__(self, setting, original_value, new_value) -> None: ...
|
|
|
|
class Settings(MutableMapping[str, Any]):
|
|
def __init__(self, client: bool = ..., initial_values: Incomplete | None = ...) -> None: ...
|
|
def acknowledge(self): ...
|
|
@property
|
|
def header_table_size(self): ...
|
|
@header_table_size.setter
|
|
def header_table_size(self, value) -> None: ...
|
|
@property
|
|
def enable_push(self): ...
|
|
@enable_push.setter
|
|
def enable_push(self, value) -> None: ...
|
|
@property
|
|
def initial_window_size(self): ...
|
|
@initial_window_size.setter
|
|
def initial_window_size(self, value) -> None: ...
|
|
@property
|
|
def max_frame_size(self): ...
|
|
@max_frame_size.setter
|
|
def max_frame_size(self, value) -> None: ...
|
|
@property
|
|
def max_concurrent_streams(self): ...
|
|
@max_concurrent_streams.setter
|
|
def max_concurrent_streams(self, value) -> None: ...
|
|
@property
|
|
def max_header_list_size(self): ...
|
|
@max_header_list_size.setter
|
|
def max_header_list_size(self, value) -> None: ...
|
|
@property
|
|
def enable_connect_protocol(self): ...
|
|
@enable_connect_protocol.setter
|
|
def enable_connect_protocol(self, value) -> None: ...
|
|
def __getitem__(self, key): ...
|
|
def __setitem__(self, key, value) -> None: ...
|
|
def __delitem__(self, key) -> None: ...
|
|
def __iter__(self): ...
|
|
def __len__(self) -> int: ...
|
|
def __eq__(self, other): ...
|
|
def __ne__(self, other): ...
|