Skip to content

TCP Interface

Info

You must have your APRS password in order to connect to a TCP-enabled station. Search online for how to get yours, or check pax25's code to figure out how to generate one.

Warning

Please be mindful of your network configuration, especially your firewall. Anyone with access to the system and port running pax25 will be able to access your station, and potentially any station that station is able to access. This danger is elevated for Internet protocols, which have much lower barrier to entry than radio stations. The APRS password is not real security. It's there as a minimum effort barrier. This interface does not currently support encryption, either, just like most ham radio technologies.

The TCPInterface allows you to connect to other stations over the Internet or your local area network. The interface is unique in that any packet sent to it will be sent to all connected stations. This is noteworthy for two reasons:

  1. As the addressing schema of IP and AX.25 are incompatible, attempting to determine which addresses 'live' on the other side of any IP would either require extensive sniffing or extended manual assignment, both of which make them error-prone and difficult to maintain.
  2. By forwarding packets to every connection, we are able to emulate an RF broadcast domain. When transmitting packets over the air, anyone who is in range can hear those packets and know what stations are available. This makes discovering new nodes to play with easy and fun.

You can see a working example of the TCP Interface configuration in the tutorial.

pax25.interfaces.tcp.TCPInterface

Interface for linking pax25 stations over TCP.

Source code in pax25/interfaces/tcp.py
class TCPInterface(Interface[TCPSettings]):
    """
    Interface for linking pax25 stations over TCP.
    """

    type = "TCP"

    def __init__(self, name: str, settings: TCPSettings, station: "Station") -> None:
        """
        Initialize the TCP Interface.
        """
        self.name = name
        self._settings = settings
        self.station = station
        self.connections: set[TCPConnection] = set()
        self.shutting_down = False
        self.server_task: Task[None] | None = None
        self.server: None | Server = None

    @property
    def listening(self) -> bool:
        """
        Returns true if we have any connections still operating.
        """
        return bool(
            any(
                connection
                for connection in self.connections
                if (connection.writer and not connection.writer.is_closing())
            )
            or self.server
        )

    @property
    def gateway(self) -> bool:
        """
        Returns True if this interface can be used to make outbound connections.
        """
        return self._settings.get("gateway", True)

    @property
    def sudo(self) -> bool:
        """
        Returns True if connections on this interface should be considered privileged.
        """
        return self._settings.get("sudo", False)

    async def reload_settings(self, settings: TCPSettings) -> None:
        """
        Reload TCP interface settings.
        """
        await self.shutdown()
        self._settings = settings
        self.shutting_down = False
        self.start()

    def connection_closed(self, connection: "TCPConnection") -> None:
        """
        Callback for closing a TCP connection.
        """
        if self.shutting_down:
            # Do nothing, this will be cleaned up elsewhere.
            return
        if connection.inbound or connection.force_close:
            self.connections.remove(connection)
            return
        # Need to retry.
        connection.initialize_connection(delay=5)

    def handle_client(self, reader: StreamReader, writer: StreamWriter) -> None:
        """
        Set up a connection for an inbound client.
        """
        result = writer.transport.get_extra_info("peername")
        if result is None:  # pragma: no cover
            # Something went wrong. Underlying C library should have provided this
            # metadata.
            writer.close()
        address, port = result
        self.connections.add(
            TCPConnection(
                remote_station=ResolvedConnectionSpec(
                    host=address,
                    port=port,
                ),
                reader=reader,
                writer=writer,
                close_callback=self.connection_closed,
                interface=self,
                inbound=True,
            ),
        )

    async def start_server(self) -> None:
        """
        Start listening for inbound connections.
        """
        if not self._settings.get("allow_inbound"):
            return
        try:
            self.server = await asyncio.start_server(
                self.handle_client,
                self._settings.get("listening_address", DEFAULT_ADDRESS),
                self._settings.get("listening_port", DEFAULT_PORT),
            )
            async with self.server:
                await self.server.serve_forever()
        finally:
            self.server = None

    def build_connections(self) -> None:
        """
        Create all the connections for the TCP Interface.
        """
        connections = self._settings.get("connections", [])
        if not connections:
            return
        call_sign = self._settings.get("call_sign", "")
        password = self._settings.get("password", 0)
        if not (call_sign and password):
            raise ConfigurationError(
                "TCP settings must include both call_sign and password.",
            )
        for value in self._settings.get("connections", []):
            self.connections.add(
                TCPConnection(
                    remote_station=ResolvedConnectionSpec(
                        host=value["host"], port=value["port"]
                    ),
                    close_callback=self.connection_closed,
                    credentials=APRSPasswordFrame(
                        call_sign=call_sign,
                        password=password,
                    ),
                    interface=self,
                    inbound=False,
                )
            )

    def send_frame(self, frame: Frame) -> None:
        """
        Send a frame out to all the connections which are listening.
        """
        for connection in self.connections:
            connection.send_frame(frame)

    def start(self) -> None:
        """
        Set up the loops.
        """
        self.shutting_down = False
        self.build_connections()
        self.server_task = asyncio.ensure_future(self.start_server())

    async def shutdown(self) -> None:
        """
        Close out all connections.
        """
        self.shutting_down = True
        if self.server:
            # Must explicitly close clients so read/write file handles raise exceptions.
            # Otherwise, serve_forever could get deadlocked when cancelling if we're
            # mid-read. See https://github.com/python/cpython/issues/123720
            self.server.close_clients()
            self.server.close()
        await cancel_all([self.server_task])
        for connection in self.connections:
            await connection.close()
        self.server = None

gateway: bool property

Returns True if this interface can be used to make outbound connections.

listening: bool property

Returns true if we have any connections still operating.

sudo: bool property

Returns True if connections on this interface should be considered privileged.

__init__(name: str, settings: TCPSettings, station: Station) -> None

Initialize the TCP Interface.

Source code in pax25/interfaces/tcp.py
def __init__(self, name: str, settings: TCPSettings, station: "Station") -> None:
    """
    Initialize the TCP Interface.
    """
    self.name = name
    self._settings = settings
    self.station = station
    self.connections: set[TCPConnection] = set()
    self.shutting_down = False
    self.server_task: Task[None] | None = None
    self.server: None | Server = None

build_connections() -> None

Create all the connections for the TCP Interface.

Source code in pax25/interfaces/tcp.py
def build_connections(self) -> None:
    """
    Create all the connections for the TCP Interface.
    """
    connections = self._settings.get("connections", [])
    if not connections:
        return
    call_sign = self._settings.get("call_sign", "")
    password = self._settings.get("password", 0)
    if not (call_sign and password):
        raise ConfigurationError(
            "TCP settings must include both call_sign and password.",
        )
    for value in self._settings.get("connections", []):
        self.connections.add(
            TCPConnection(
                remote_station=ResolvedConnectionSpec(
                    host=value["host"], port=value["port"]
                ),
                close_callback=self.connection_closed,
                credentials=APRSPasswordFrame(
                    call_sign=call_sign,
                    password=password,
                ),
                interface=self,
                inbound=False,
            )
        )

connection_closed(connection: TCPConnection) -> None

Callback for closing a TCP connection.

Source code in pax25/interfaces/tcp.py
def connection_closed(self, connection: "TCPConnection") -> None:
    """
    Callback for closing a TCP connection.
    """
    if self.shutting_down:
        # Do nothing, this will be cleaned up elsewhere.
        return
    if connection.inbound or connection.force_close:
        self.connections.remove(connection)
        return
    # Need to retry.
    connection.initialize_connection(delay=5)

handle_client(reader: StreamReader, writer: StreamWriter) -> None

Set up a connection for an inbound client.

Source code in pax25/interfaces/tcp.py
def handle_client(self, reader: StreamReader, writer: StreamWriter) -> None:
    """
    Set up a connection for an inbound client.
    """
    result = writer.transport.get_extra_info("peername")
    if result is None:  # pragma: no cover
        # Something went wrong. Underlying C library should have provided this
        # metadata.
        writer.close()
    address, port = result
    self.connections.add(
        TCPConnection(
            remote_station=ResolvedConnectionSpec(
                host=address,
                port=port,
            ),
            reader=reader,
            writer=writer,
            close_callback=self.connection_closed,
            interface=self,
            inbound=True,
        ),
    )

reload_settings(settings: TCPSettings) -> None async

Reload TCP interface settings.

Source code in pax25/interfaces/tcp.py
async def reload_settings(self, settings: TCPSettings) -> None:
    """
    Reload TCP interface settings.
    """
    await self.shutdown()
    self._settings = settings
    self.shutting_down = False
    self.start()

send_frame(frame: Frame) -> None

Send a frame out to all the connections which are listening.

Source code in pax25/interfaces/tcp.py
def send_frame(self, frame: Frame) -> None:
    """
    Send a frame out to all the connections which are listening.
    """
    for connection in self.connections:
        connection.send_frame(frame)

shutdown() -> None async

Close out all connections.

Source code in pax25/interfaces/tcp.py
async def shutdown(self) -> None:
    """
    Close out all connections.
    """
    self.shutting_down = True
    if self.server:
        # Must explicitly close clients so read/write file handles raise exceptions.
        # Otherwise, serve_forever could get deadlocked when cancelling if we're
        # mid-read. See https://github.com/python/cpython/issues/123720
        self.server.close_clients()
        self.server.close()
    await cancel_all([self.server_task])
    for connection in self.connections:
        await connection.close()
    self.server = None

start() -> None

Set up the loops.

Source code in pax25/interfaces/tcp.py
def start(self) -> None:
    """
    Set up the loops.
    """
    self.shutting_down = False
    self.build_connections()
    self.server_task = asyncio.ensure_future(self.start_server())

start_server() -> None async

Start listening for inbound connections.

Source code in pax25/interfaces/tcp.py
async def start_server(self) -> None:
    """
    Start listening for inbound connections.
    """
    if not self._settings.get("allow_inbound"):
        return
    try:
        self.server = await asyncio.start_server(
            self.handle_client,
            self._settings.get("listening_address", DEFAULT_ADDRESS),
            self._settings.get("listening_port", DEFAULT_PORT),
        )
        async with self.server:
            await self.server.serve_forever()
    finally:
        self.server = None