Files
neon/test_runner/stubs/h2/stream.pyi
Conrad Ludgate 7000aaaf75 chore: fix h2 stubgen (#10491)
## Problem

## Summary of changes

---------

Co-authored-by: Alexander Bayandin <alexander@neon.tech>
2025-01-24 14:55:48 +00:00

123 lines
7.0 KiB
Python

from .config import H2Configuration as H2Configuration
from .errors import ErrorCodes as ErrorCodes
from .events import AlternativeServiceAvailable as AlternativeServiceAvailable, DataReceived as DataReceived, Event as Event, InformationalResponseReceived as InformationalResponseReceived, PushedStreamReceived as PushedStreamReceived, RequestReceived as RequestReceived, ResponseReceived as ResponseReceived, StreamEnded as StreamEnded, StreamReset as StreamReset, TrailersReceived as TrailersReceived, WindowUpdated as WindowUpdated
from .exceptions import FlowControlError as FlowControlError, InvalidBodyLengthError as InvalidBodyLengthError, ProtocolError as ProtocolError, StreamClosedError as StreamClosedError
from .utilities import HeaderValidationFlags as HeaderValidationFlags, authority_from_headers as authority_from_headers, extract_method_header as extract_method_header, guard_increment_window as guard_increment_window, is_informational_response as is_informational_response, normalize_inbound_headers as normalize_inbound_headers, normalize_outbound_headers as normalize_outbound_headers, utf8_encode_headers as utf8_encode_headers, validate_headers as validate_headers, validate_outbound_headers as validate_outbound_headers
from .windows import WindowManager as WindowManager
from _typeshed import Incomplete
from collections.abc import Iterable
from enum import Enum, IntEnum
from hpack.hpack import Encoder as Encoder
from hpack.struct import Header as Header, HeaderWeaklyTyped as HeaderWeaklyTyped
from hyperframe.frame import AltSvcFrame, ContinuationFrame, Frame as Frame, HeadersFrame, PushPromiseFrame, RstStreamFrame
from typing import Any
class StreamState(IntEnum):
IDLE = 0
RESERVED_REMOTE = 1
RESERVED_LOCAL = 2
OPEN = 3
HALF_CLOSED_REMOTE = 4
HALF_CLOSED_LOCAL = 5
CLOSED = 6
class StreamInputs(Enum):
SEND_HEADERS = 0
SEND_PUSH_PROMISE = 1
SEND_RST_STREAM = 2
SEND_DATA = 3
SEND_WINDOW_UPDATE = 4
SEND_END_STREAM = 5
RECV_HEADERS = 6
RECV_PUSH_PROMISE = 7
RECV_RST_STREAM = 8
RECV_DATA = 9
RECV_WINDOW_UPDATE = 10
RECV_END_STREAM = 11
RECV_CONTINUATION = 12
SEND_INFORMATIONAL_HEADERS = 13
RECV_INFORMATIONAL_HEADERS = 14
SEND_ALTERNATIVE_SERVICE = 15
RECV_ALTERNATIVE_SERVICE = 16
UPGRADE_CLIENT = 17
UPGRADE_SERVER = 18
class StreamClosedBy(Enum):
SEND_END_STREAM = 0
RECV_END_STREAM = 1
SEND_RST_STREAM = 2
RECV_RST_STREAM = 3
STREAM_OPEN: Incomplete
class H2StreamStateMachine:
state: Incomplete
stream_id: Incomplete
client: Incomplete
headers_sent: Incomplete
trailers_sent: Incomplete
headers_received: Incomplete
trailers_received: Incomplete
stream_closed_by: Incomplete
def __init__(self, stream_id: int) -> None: ...
def process_input(self, input_: StreamInputs) -> Any: ...
def request_sent(self, previous_state: StreamState) -> list[Event]: ...
def response_sent(self, previous_state: StreamState) -> list[Event]: ...
def request_received(self, previous_state: StreamState) -> list[Event]: ...
def response_received(self, previous_state: StreamState) -> list[Event]: ...
def data_received(self, previous_state: StreamState) -> list[Event]: ...
def window_updated(self, previous_state: StreamState) -> list[Event]: ...
def stream_half_closed(self, previous_state: StreamState) -> list[Event]: ...
def stream_ended(self, previous_state: StreamState) -> list[Event]: ...
def stream_reset(self, previous_state: StreamState) -> list[Event]: ...
def send_new_pushed_stream(self, previous_state: StreamState) -> list[Event]: ...
def recv_new_pushed_stream(self, previous_state: StreamState) -> list[Event]: ...
def send_push_promise(self, previous_state: StreamState) -> list[Event]: ...
def recv_push_promise(self, previous_state: StreamState) -> list[Event]: ...
def send_end_stream(self, previous_state: StreamState) -> None: ...
def send_reset_stream(self, previous_state: StreamState) -> None: ...
def reset_stream_on_error(self, previous_state: StreamState) -> None: ...
def recv_on_closed_stream(self, previous_state: StreamState) -> None: ...
def send_on_closed_stream(self, previous_state: StreamState) -> None: ...
def recv_push_on_closed_stream(self, previous_state: StreamState) -> None: ...
def send_push_on_closed_stream(self, previous_state: StreamState) -> None: ...
def send_informational_response(self, previous_state: StreamState) -> list[Event]: ...
def recv_informational_response(self, previous_state: StreamState) -> list[Event]: ...
def recv_alt_svc(self, previous_state: StreamState) -> list[Event]: ...
def send_alt_svc(self, previous_state: StreamState) -> None: ...
class H2Stream:
state_machine: Incomplete
stream_id: Incomplete
max_outbound_frame_size: Incomplete
request_method: Incomplete
outbound_flow_control_window: Incomplete
config: Incomplete
def __init__(self, stream_id: int, config: H2Configuration, inbound_window_size: int, outbound_window_size: int) -> None: ...
@property
def inbound_flow_control_window(self) -> int: ...
@property
def open(self) -> bool: ...
@property
def closed(self) -> bool: ...
@property
def closed_by(self) -> StreamClosedBy | None: ...
def upgrade(self, client_side: bool) -> None: ...
def send_headers(self, headers: Iterable[HeaderWeaklyTyped], encoder: Encoder, end_stream: bool = False) -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]: ...
def push_stream_in_band(self, related_stream_id: int, headers: Iterable[HeaderWeaklyTyped], encoder: Encoder) -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]: ...
def locally_pushed(self) -> list[Frame]: ...
def send_data(self, data: bytes | memoryview, end_stream: bool = False, pad_length: int | None = None) -> list[Frame]: ...
def end_stream(self) -> list[Frame]: ...
def advertise_alternative_service(self, field_value: bytes) -> list[Frame]: ...
def increase_flow_control_window(self, increment: int) -> list[Frame]: ...
def receive_push_promise_in_band(self, promised_stream_id: int, headers: Iterable[Header], header_encoding: bool | str | None) -> tuple[list[Frame], list[Event]]: ...
def remotely_pushed(self, pushed_headers: Iterable[Header]) -> tuple[list[Frame], list[Event]]: ...
def receive_headers(self, headers: Iterable[Header], end_stream: bool, header_encoding: bool | str | None) -> tuple[list[Frame], list[Event]]: ...
def receive_data(self, data: bytes, end_stream: bool, flow_control_len: int) -> tuple[list[Frame], list[Event]]: ...
def receive_window_update(self, increment: int) -> tuple[list[Frame], list[Event]]: ...
def receive_continuation(self) -> None: ...
def receive_alt_svc(self, frame: AltSvcFrame) -> tuple[list[Frame], list[Event]]: ...
def reset_stream(self, error_code: ErrorCodes | int = 0) -> list[Frame]: ...
def stream_reset(self, frame: RstStreamFrame) -> tuple[list[Frame], list[Event]]: ...
def acknowledge_received_data(self, acknowledged_size: int) -> list[Frame]: ...