Skip to content

Stations

Stations in Pax25 represent, at least notionally, a single packet radio station. The station object is responsible for managing the applications that are running on the station, and for managing the interfaces that the station is connected to. The station also instantiates the Frame Router and the various included services.

Setting up a station involves setting up the station object, registering applications with the station, and bringing up the interfaces that the station is connected to.

Initializing the Station

For an in-context view of initializing your station, we recommend reading the tutorial first. Station intialization involves specifying the station's callsign, and some optional, tunable parameters.

The configuration dictionary

The station is initialized using a dictionary with the following keys and values:

Key Value
"name" The callsign of the station, and default name SSIDs are created under.
#!monitor A dictionary of interfaces that the station is connected to. Each key is a str, with the value particular to the interface. See the interfaces documentation for more details.

Here is an example configuration dictionary:

config = {
    # This is the only required configuration key-- your station's name.
    # Your station's name should be your callsign, unless you have a very
    # specific reason why it cannot be. If it cannot be, you should customize
    # the beacon text (detailed below) to contain your own custom beacon
    # message with your registered callsign and claiming the name.
    "name": "N0CALL",
    # The following services keys are all optional, and come with these
    # defaults:
    "monitor": {
        # How many frames will be kept in the frame log at a time. The frame
        # log can be consumed by applications as desired (such as the
        # contributed command line app).
        #
        # If this setting is none, the log will be allowed to grow infinitely
        # in size.
        "max_frame_log_size": 256,
        # The monitor also tracks for recently heard stations. The number of
        # stations recently heard can be enumerated by applications as
        # desired, as is done in the command line app.
        #
        # If this setting is None, never clean up old heard stations.
        "max_stations_tracked": 30,
    },
    "digipeater": {
        # Whether the digipeater is enabled.
        "enabled": True,
    },
    "beacon": {
        # Whether the standard station ID beacon should be enabled. If this
        # station runs over the air, you should set this to True for legal
        # compliance.
        "id_beacon_enabled": True,
        # How frequently, when transmitting, to send the ID beacon.
        # For most jurisdictions, this should be 600 (10 mins) or lower.
        "id_beacon_interval": 600,
        # The destination address for the beacon. All packets must have a
        # destination address, so we use "BEACON" here as the default.
        "id_beacon_destination": "BEACON",
        # A list of digipeater addresses to run the beacon through.
        # Empty by default.
        "id_beacon_digipeaters": {"interface_name": ["DIGI", "DIGI-2"]},
        # The contents of the beacon. If left None, auto-generates them.
        "id_beacon_content": None,
    },
}

Bringing up the Station

Once your station has been instantiated and your applications have been registered, you can bring up the station using the bring_up_interfaces method. This will ready all the necessary event loop tasks. Then, we can start the event loop.

from asyncio import get_event_loop

...

station.run()
loop = get_event_loop()
loop.run_forever()

Bringing down the station

Finally, when you're ready to close the station down, await its shutdown method from within your code somewhere. This will close all the interfaces, and optionally stop the event loop as well.

# Must be called within an async function.
await station.shutdown(close_loop=True)

pax25.station.Station

The main station class for Pax25. This class is intended to manage a physical station's digital affairs.

Source code in pax25/station.py
class Station:
    """
    The main station class for Pax25. This class is intended to manage a physical
    station's digital affairs.
    """

    def __init__(
        self,
        *,
        config: StationConfig | None = None,
        config_file_path: str = "",
    ):
        """
        Initializes a station. Specifying a config dictionary configures the station
        with the config dictionary's values.

        Specifying a config_file_path will not load the config_file_path's values UNLESS
        the config parameter is set to None.

        The config_file_path is where configuration should be saved by administrative
        utilities (such as the contributed save command in the CommandLine app.) Can be
        left as an empty string to specify no intended save file.
        """
        # Timers may get cancelled and the station may shut down before they would
        # be cleaned up normally. We track these tasks so if we shut down while any
        # are still active, we can reap them manually.
        self.config_file_path = config_file_path
        if (config is None) and self.config_file_path:
            with open(config_file_path) as config_file:
                config = json.load(config_file)
        if config is None:
            raise ConfigurationError(
                "No configuration supplied. Please either set the config argument "
                "or provide a config_file_path."
            )
        self._active_future: Future[None] | None = None
        address = Address.from_string(config["name"])
        assert str(address) == config["name"], (
            "Station names must be all caps and have no SSID suffix."
        )
        self._settings = config
        self.interfaces: dict[str, Interface[Any]] = {}
        self.frame_router = FrameRouter(station=self)
        self.digipeater = Digipeater(station=self, settings=config.get("digipeater"))
        self.connection = ConnectionService(
            station=self,
            settings=config.get("connection"),
        )
        self.monitor = Monitor(station=self, settings=config.get("monitor"))
        self.beacon = BeaconService(station=self, settings=config.get("beacon"))
        self.clear_subsettings()
        self._cleanup_lock: Future[None] | None = None
        self._cleanup_task: Future[None] | None = None
        self._to_clean: list[Task[None]] = []
        self.closing = False

    @property
    def running(self) -> bool:
        """
        Returns if the station is currently running.
        """
        if self._active_future is None:
            return False
        return not self._active_future.done()

    @property
    def settings(self) -> StationConfig:
        """
        Returns the current configuration of the station.
        """
        settings = smart_clone(self._settings)
        # These are all smart cloned.
        settings["monitor"] = self.monitor.settings
        settings["beacon"] = self.beacon.settings
        settings["digipeater"] = self.digipeater.settings
        settings["connection"] = self.connection.settings
        return settings

    @property
    def name(self) -> str:
        return self._settings["name"].upper()

    def clear_subsettings(self) -> None:
        """
        Settings are set on the station object, but the authoritative location of the
        settings for each service are on that service. Remove these keys so that if we
        erroneously rely on the internal representation of the settings for a service,
        we raise instead of using data which may be out of date, which could cause a
        very subtle bug.
        """
        keys: tuple[Literal["monitor", "beacon", "digipeater", "connection"], ...] = (
            "monitor",
            "beacon",
            "digipeater",
            "connection",
        )
        for key in keys:
            if key in self._settings:
                del self._settings[key]

    def collect_task(self, task: Task[None]) -> None:
        """
        Tasks which are 'fire and forget' can be sent here to be cleaned up later
        as needed. This prevents the garbage collector from freaking out and sending
        tracebacks as tasks from functions like those in the timer module get reaped.
        """
        self._to_clean.append(task)

    async def reload_settings(self, settings: StationConfig) -> None:
        """
        Reloads station configuration, and all components involved.
        """
        address = Address.from_string(settings["name"])
        assert str(address) == settings["name"], (
            "Station names must be all caps and have no SSID suffix."
        )
        self._settings = settings
        digipeater_settings = maybe_update(
            self.digipeater.settings,
            settings.get("digipeater"),
        )
        monitor_settings = maybe_update(
            self.monitor.settings,
            settings.get("monitor"),
        )
        beacon_settings = maybe_update(
            self.beacon.settings,
            settings.get("beacon"),
        )
        connection_settings = maybe_update(
            self.connection.settings,
            settings.get("connection"),
        )
        self.clear_subsettings()
        await asyncio.gather(
            self.digipeater.reload_settings(digipeater_settings),
            self.monitor.reload_settings(monitor_settings),
            self.beacon.reload_settings(beacon_settings),
            self.connection.reload_settings(connection_settings),
        )

    async def finished(self) -> None:
        """
        Await this to wait until the server is closed down programmatically.
        """
        if self._active_future is None:
            raise RuntimeError(
                "Station has not run. We cannot wait for it to finish if "
                "it never began!"
            )
        await self._active_future

    async def _cleanup_loop(self) -> None:
        """
        Cleanup loop.
        """
        sleep = Task(asyncio.sleep(0.500))
        while self._cleanup_lock:
            await first(sleep, self._cleanup_lock)
            self._to_clean = [task for task in self._to_clean if not task.done()]
        await cancel_all(self._to_clean)
        await cancel(sleep)
        self._cleanup_task = None

    def run(self) -> Future[None]:
        """
        Starts the station. Tasks will not begin unless/until the asyncio loop is
        running.
        """
        if self._active_future and not self._active_future.done():
            raise RuntimeError("Station is already running!")
        self.monitor.run()
        self.bring_up_interfaces()
        self.beacon.run()
        self._cleanup_lock = Future()
        self._cleanup_task = asyncio.ensure_future(self._cleanup_loop())
        self._active_future = Future()
        return self._active_future

    async def start(self) -> None:
        """
        Starts the station and awaits its completion. Useful for projects that just
        have one station and want to use it as the event loop entry point, which is
        most projects, or anything which might need to wait on an entire station's
        lifecycle.
        """
        await self.run()

    def bring_up_interfaces(self) -> None:
        """
        Attempts to bring up all interfaces and queue them into the event loop.
        """
        for interface in self.interfaces.values():
            interface.start()

    def add_interface(self, name: str, interface: Interface) -> None:
        """
        Adds an interface to the station.
        """
        if interface.station != self:
            raise ConfigurationError(
                "You cannot assign an interface to an irrelevant station."
            )
        if not name:
            raise ConfigurationError("Interface name may not be blank.")
        if name.split() != [name]:
            raise ConfigurationError(
                f"Interface names may not have spaces. Got: {repr(name)}",
            )
        try:
            name.encode("ascii")
        except ValueError as err:
            raise ConfigurationError(
                f"Interface names MUST be ASCII. Got: {repr(name)}",
            ) from err
        if name in self.interfaces:
            raise ConfigurationError(f"Interface {repr(name)} already exists!")
        self.interfaces[name] = interface
        self.monitor.refresh_ports_cache()
        if self.running and not self.closing:
            interface.start()

    async def remove_interface(self, name: str) -> None:
        """
        Removes an interface from the station.
        """
        if name not in self.interfaces:
            raise KeyError(f"Interface {repr(name)} does not exist.")
        interface = self.interfaces[name]
        del self.interfaces[name]
        self.monitor.refresh_ports_cache()
        await asyncio.ensure_future(interface.shutdown())

    def get_nth_gateway(self, index: int) -> "Interface":
        """
        Get the nth gateway or raise an IndexError if not present.
        """
        if index <= 0:
            raise IndexError("Port numbers start at 1.")
        current = 0
        for interface in self.interfaces.values():
            if interface.gateway:
                current += 1
                if current == index:
                    return interface
        if not current:
            raise IndexError("No gateways available.")
        raise IndexError(f"Port {index} not found.")

    async def shutdown(self) -> None:
        """
        Shut down all interfaces, cutting off the station.
        """
        if not self.running:
            # Already shut down.
            return
        if self.closing and self._active_future:  # pragma: no cover
            logger.warning(
                "Shutdown called while shutdown already called. "
                "This may cause a deadlock.",
            )
            await self._active_future
            return
        self.closing = True
        await self.connection.shutdown()
        await self.beacon.shutdown()
        for interface in self.interfaces.values():
            await interface.shutdown()
        await self.monitor.shutdown()
        if self._active_future:
            self._active_future.set_result(None)
        if self._cleanup_task and self._cleanup_lock:
            self._cleanup_lock.cancel()
            self._cleanup_lock = None
            await self._cleanup_task
        self.closing = False

running: bool property

Returns if the station is currently running.

settings: StationConfig property

Returns the current configuration of the station.

__init__(*, config: StationConfig | None = None, config_file_path: str = '')

Initializes a station. Specifying a config dictionary configures the station with the config dictionary's values.

Specifying a config_file_path will not load the config_file_path's values UNLESS the config parameter is set to None.

The config_file_path is where configuration should be saved by administrative utilities (such as the contributed save command in the CommandLine app.) Can be left as an empty string to specify no intended save file.

Source code in pax25/station.py
def __init__(
    self,
    *,
    config: StationConfig | None = None,
    config_file_path: str = "",
):
    """
    Initializes a station. Specifying a config dictionary configures the station
    with the config dictionary's values.

    Specifying a config_file_path will not load the config_file_path's values UNLESS
    the config parameter is set to None.

    The config_file_path is where configuration should be saved by administrative
    utilities (such as the contributed save command in the CommandLine app.) Can be
    left as an empty string to specify no intended save file.
    """
    # Timers may get cancelled and the station may shut down before they would
    # be cleaned up normally. We track these tasks so if we shut down while any
    # are still active, we can reap them manually.
    self.config_file_path = config_file_path
    if (config is None) and self.config_file_path:
        with open(config_file_path) as config_file:
            config = json.load(config_file)
    if config is None:
        raise ConfigurationError(
            "No configuration supplied. Please either set the config argument "
            "or provide a config_file_path."
        )
    self._active_future: Future[None] | None = None
    address = Address.from_string(config["name"])
    assert str(address) == config["name"], (
        "Station names must be all caps and have no SSID suffix."
    )
    self._settings = config
    self.interfaces: dict[str, Interface[Any]] = {}
    self.frame_router = FrameRouter(station=self)
    self.digipeater = Digipeater(station=self, settings=config.get("digipeater"))
    self.connection = ConnectionService(
        station=self,
        settings=config.get("connection"),
    )
    self.monitor = Monitor(station=self, settings=config.get("monitor"))
    self.beacon = BeaconService(station=self, settings=config.get("beacon"))
    self.clear_subsettings()
    self._cleanup_lock: Future[None] | None = None
    self._cleanup_task: Future[None] | None = None
    self._to_clean: list[Task[None]] = []
    self.closing = False

add_interface(name: str, interface: Interface) -> None

Adds an interface to the station.

Source code in pax25/station.py
def add_interface(self, name: str, interface: Interface) -> None:
    """
    Adds an interface to the station.
    """
    if interface.station != self:
        raise ConfigurationError(
            "You cannot assign an interface to an irrelevant station."
        )
    if not name:
        raise ConfigurationError("Interface name may not be blank.")
    if name.split() != [name]:
        raise ConfigurationError(
            f"Interface names may not have spaces. Got: {repr(name)}",
        )
    try:
        name.encode("ascii")
    except ValueError as err:
        raise ConfigurationError(
            f"Interface names MUST be ASCII. Got: {repr(name)}",
        ) from err
    if name in self.interfaces:
        raise ConfigurationError(f"Interface {repr(name)} already exists!")
    self.interfaces[name] = interface
    self.monitor.refresh_ports_cache()
    if self.running and not self.closing:
        interface.start()

bring_up_interfaces() -> None

Attempts to bring up all interfaces and queue them into the event loop.

Source code in pax25/station.py
def bring_up_interfaces(self) -> None:
    """
    Attempts to bring up all interfaces and queue them into the event loop.
    """
    for interface in self.interfaces.values():
        interface.start()

clear_subsettings() -> None

Settings are set on the station object, but the authoritative location of the settings for each service are on that service. Remove these keys so that if we erroneously rely on the internal representation of the settings for a service, we raise instead of using data which may be out of date, which could cause a very subtle bug.

Source code in pax25/station.py
def clear_subsettings(self) -> None:
    """
    Settings are set on the station object, but the authoritative location of the
    settings for each service are on that service. Remove these keys so that if we
    erroneously rely on the internal representation of the settings for a service,
    we raise instead of using data which may be out of date, which could cause a
    very subtle bug.
    """
    keys: tuple[Literal["monitor", "beacon", "digipeater", "connection"], ...] = (
        "monitor",
        "beacon",
        "digipeater",
        "connection",
    )
    for key in keys:
        if key in self._settings:
            del self._settings[key]

collect_task(task: Task[None]) -> None

Tasks which are 'fire and forget' can be sent here to be cleaned up later as needed. This prevents the garbage collector from freaking out and sending tracebacks as tasks from functions like those in the timer module get reaped.

Source code in pax25/station.py
def collect_task(self, task: Task[None]) -> None:
    """
    Tasks which are 'fire and forget' can be sent here to be cleaned up later
    as needed. This prevents the garbage collector from freaking out and sending
    tracebacks as tasks from functions like those in the timer module get reaped.
    """
    self._to_clean.append(task)

finished() -> None async

Await this to wait until the server is closed down programmatically.

Source code in pax25/station.py
async def finished(self) -> None:
    """
    Await this to wait until the server is closed down programmatically.
    """
    if self._active_future is None:
        raise RuntimeError(
            "Station has not run. We cannot wait for it to finish if "
            "it never began!"
        )
    await self._active_future

get_nth_gateway(index: int) -> Interface

Get the nth gateway or raise an IndexError if not present.

Source code in pax25/station.py
def get_nth_gateway(self, index: int) -> "Interface":
    """
    Get the nth gateway or raise an IndexError if not present.
    """
    if index <= 0:
        raise IndexError("Port numbers start at 1.")
    current = 0
    for interface in self.interfaces.values():
        if interface.gateway:
            current += 1
            if current == index:
                return interface
    if not current:
        raise IndexError("No gateways available.")
    raise IndexError(f"Port {index} not found.")

reload_settings(settings: StationConfig) -> None async

Reloads station configuration, and all components involved.

Source code in pax25/station.py
async def reload_settings(self, settings: StationConfig) -> None:
    """
    Reloads station configuration, and all components involved.
    """
    address = Address.from_string(settings["name"])
    assert str(address) == settings["name"], (
        "Station names must be all caps and have no SSID suffix."
    )
    self._settings = settings
    digipeater_settings = maybe_update(
        self.digipeater.settings,
        settings.get("digipeater"),
    )
    monitor_settings = maybe_update(
        self.monitor.settings,
        settings.get("monitor"),
    )
    beacon_settings = maybe_update(
        self.beacon.settings,
        settings.get("beacon"),
    )
    connection_settings = maybe_update(
        self.connection.settings,
        settings.get("connection"),
    )
    self.clear_subsettings()
    await asyncio.gather(
        self.digipeater.reload_settings(digipeater_settings),
        self.monitor.reload_settings(monitor_settings),
        self.beacon.reload_settings(beacon_settings),
        self.connection.reload_settings(connection_settings),
    )

remove_interface(name: str) -> None async

Removes an interface from the station.

Source code in pax25/station.py
async def remove_interface(self, name: str) -> None:
    """
    Removes an interface from the station.
    """
    if name not in self.interfaces:
        raise KeyError(f"Interface {repr(name)} does not exist.")
    interface = self.interfaces[name]
    del self.interfaces[name]
    self.monitor.refresh_ports_cache()
    await asyncio.ensure_future(interface.shutdown())

run() -> Future[None]

Starts the station. Tasks will not begin unless/until the asyncio loop is running.

Source code in pax25/station.py
def run(self) -> Future[None]:
    """
    Starts the station. Tasks will not begin unless/until the asyncio loop is
    running.
    """
    if self._active_future and not self._active_future.done():
        raise RuntimeError("Station is already running!")
    self.monitor.run()
    self.bring_up_interfaces()
    self.beacon.run()
    self._cleanup_lock = Future()
    self._cleanup_task = asyncio.ensure_future(self._cleanup_loop())
    self._active_future = Future()
    return self._active_future

shutdown() -> None async

Shut down all interfaces, cutting off the station.

Source code in pax25/station.py
async def shutdown(self) -> None:
    """
    Shut down all interfaces, cutting off the station.
    """
    if not self.running:
        # Already shut down.
        return
    if self.closing and self._active_future:  # pragma: no cover
        logger.warning(
            "Shutdown called while shutdown already called. "
            "This may cause a deadlock.",
        )
        await self._active_future
        return
    self.closing = True
    await self.connection.shutdown()
    await self.beacon.shutdown()
    for interface in self.interfaces.values():
        await interface.shutdown()
    await self.monitor.shutdown()
    if self._active_future:
        self._active_future.set_result(None)
    if self._cleanup_task and self._cleanup_lock:
        self._cleanup_lock.cancel()
        self._cleanup_lock = None
        await self._cleanup_task
    self.closing = False

start() -> None async

Starts the station and awaits its completion. Useful for projects that just have one station and want to use it as the event loop entry point, which is most projects, or anything which might need to wait on an entire station's lifecycle.

Source code in pax25/station.py
async def start(self) -> None:
    """
    Starts the station and awaits its completion. Useful for projects that just
    have one station and want to use it as the event loop entry point, which is
    most projects, or anything which might need to wait on an entire station's
    lifecycle.
    """
    await self.run()