Stations
Stations in Pax25 represent, at least notionally, a single packet radio station. The station object is responsible for managing the applications that are running on the station, and for managing the interfaces that the station is connected to. The station also instantiates the Frame Router and the various included services.
Setting up a station involves setting up the station object, registering applications with the station, and bringing up the interfaces that the station is connected to.
Initializing the Station
For an in-context view of initializing your station, we recommend reading the tutorial first. Station intialization involves specifying the station's callsign, and some optional, tunable parameters.
The configuration dictionary
The station is initialized using a dictionary with the following keys and values:
| Key |
Value |
"name" |
The callsign of the station, and default name SSIDs are created under. |
#!monitor |
A dictionary of interfaces that the station is connected to. Each key is a str, with the value particular to the interface. See the interfaces documentation for more details. |
Here is an example configuration dictionary:
config = {
# This is the only required configuration key-- your station's name.
# Your station's name should be your callsign, unless you have a very
# specific reason why it cannot be. If it cannot be, you should customize
# the beacon text (detailed below) to contain your own custom beacon
# message with your registered callsign and claiming the name.
"name": "N0CALL",
# The following services keys are all optional, and come with these
# defaults:
"monitor": {
# How many frames will be kept in the frame log at a time. The frame
# log can be consumed by applications as desired (such as the
# contributed command line app).
#
# If this setting is none, the log will be allowed to grow infinitely
# in size.
"max_frame_log_size": 256,
# The monitor also tracks for recently heard stations. The number of
# stations recently heard can be enumerated by applications as
# desired, as is done in the command line app.
#
# If this setting is None, never clean up old heard stations.
"max_stations_tracked": 30,
},
"digipeater": {
# Whether the digipeater is enabled.
"enabled": True,
},
"beacon": {
# Whether the standard station ID beacon should be enabled. If this
# station runs over the air, you should set this to True for legal
# compliance.
"id_beacon_enabled": True,
# How frequently, when transmitting, to send the ID beacon.
# For most jurisdictions, this should be 600 (10 mins) or lower.
"id_beacon_interval": 600,
# The destination address for the beacon. All packets must have a
# destination address, so we use "BEACON" here as the default.
"id_beacon_destination": "BEACON",
# A list of digipeater addresses to run the beacon through.
# Empty by default.
"id_beacon_digipeaters": {"interface_name": ["DIGI", "DIGI-2"]},
# The contents of the beacon. If left None, auto-generates them.
"id_beacon_content": None,
},
}
Bringing up the Station
Once your station has been instantiated and your applications have been registered, you can bring up the station using the bring_up_interfaces method. This will ready all the necessary event loop tasks. Then, we can start the event loop.
from asyncio import get_event_loop
...
station.run()
loop = get_event_loop()
loop.run_forever()
Bringing down the station
Finally, when you're ready to close the station down, await its shutdown method from within your code somewhere. This will close all the interfaces, and optionally stop the event loop as well.
# Must be called within an async function.
await station.shutdown(close_loop=True)
pax25.station.Station
The main station class for Pax25. This class is intended to manage a physical
station's digital affairs.
Source code in pax25/station.py
| class Station:
"""
The main station class for Pax25. This class is intended to manage a physical
station's digital affairs.
"""
def __init__(
self,
*,
config: StationConfig | None = None,
config_file_path: str = "",
):
"""
Initializes a station. Specifying a config dictionary configures the station
with the config dictionary's values.
Specifying a config_file_path will not load the config_file_path's values UNLESS
the config parameter is set to None.
The config_file_path is where configuration should be saved by administrative
utilities (such as the contributed save command in the CommandLine app.) Can be
left as an empty string to specify no intended save file.
"""
# Timers may get cancelled and the station may shut down before they would
# be cleaned up normally. We track these tasks so if we shut down while any
# are still active, we can reap them manually.
self.config_file_path = config_file_path
if (config is None) and self.config_file_path:
with open(config_file_path) as config_file:
config = json.load(config_file)
if config is None:
raise ConfigurationError(
"No configuration supplied. Please either set the config argument "
"or provide a config_file_path."
)
self._active_future: Future[None] | None = None
address = Address.from_string(config["name"])
assert str(address) == config["name"], (
"Station names must be all caps and have no SSID suffix."
)
self._settings = config
self.interfaces: dict[str, Interface[Any]] = {}
self.frame_router = FrameRouter(station=self)
self.digipeater = Digipeater(station=self, settings=config.get("digipeater"))
self.connection = ConnectionService(
station=self,
settings=config.get("connection"),
)
self.monitor = Monitor(station=self, settings=config.get("monitor"))
self.beacon = BeaconService(station=self, settings=config.get("beacon"))
self.clear_subsettings()
self._cleanup_lock: Future[None] | None = None
self._cleanup_task: Future[None] | None = None
self._to_clean: list[Task[None]] = []
self.closing = False
@property
def running(self) -> bool:
"""
Returns if the station is currently running.
"""
if self._active_future is None:
return False
return not self._active_future.done()
@property
def settings(self) -> StationConfig:
"""
Returns the current configuration of the station.
"""
settings = smart_clone(self._settings)
# These are all smart cloned.
settings["monitor"] = self.monitor.settings
settings["beacon"] = self.beacon.settings
settings["digipeater"] = self.digipeater.settings
settings["connection"] = self.connection.settings
return settings
@property
def name(self) -> str:
return self._settings["name"].upper()
def clear_subsettings(self) -> None:
"""
Settings are set on the station object, but the authoritative location of the
settings for each service are on that service. Remove these keys so that if we
erroneously rely on the internal representation of the settings for a service,
we raise instead of using data which may be out of date, which could cause a
very subtle bug.
"""
keys: tuple[Literal["monitor", "beacon", "digipeater", "connection"], ...] = (
"monitor",
"beacon",
"digipeater",
"connection",
)
for key in keys:
if key in self._settings:
del self._settings[key]
def collect_task(self, task: Task[None]) -> None:
"""
Tasks which are 'fire and forget' can be sent here to be cleaned up later
as needed. This prevents the garbage collector from freaking out and sending
tracebacks as tasks from functions like those in the timer module get reaped.
"""
self._to_clean.append(task)
async def reload_settings(self, settings: StationConfig) -> None:
"""
Reloads station configuration, and all components involved.
"""
address = Address.from_string(settings["name"])
assert str(address) == settings["name"], (
"Station names must be all caps and have no SSID suffix."
)
self._settings = settings
digipeater_settings = maybe_update(
self.digipeater.settings,
settings.get("digipeater"),
)
monitor_settings = maybe_update(
self.monitor.settings,
settings.get("monitor"),
)
beacon_settings = maybe_update(
self.beacon.settings,
settings.get("beacon"),
)
connection_settings = maybe_update(
self.connection.settings,
settings.get("connection"),
)
self.clear_subsettings()
await asyncio.gather(
self.digipeater.reload_settings(digipeater_settings),
self.monitor.reload_settings(monitor_settings),
self.beacon.reload_settings(beacon_settings),
self.connection.reload_settings(connection_settings),
)
async def finished(self) -> None:
"""
Await this to wait until the server is closed down programmatically.
"""
if self._active_future is None:
raise RuntimeError(
"Station has not run. We cannot wait for it to finish if "
"it never began!"
)
await self._active_future
async def _cleanup_loop(self) -> None:
"""
Cleanup loop.
"""
sleep = Task(asyncio.sleep(0.500))
while self._cleanup_lock:
await first(sleep, self._cleanup_lock)
self._to_clean = [task for task in self._to_clean if not task.done()]
await cancel_all(self._to_clean)
await cancel(sleep)
self._cleanup_task = None
def run(self) -> Future[None]:
"""
Starts the station. Tasks will not begin unless/until the asyncio loop is
running.
"""
if self._active_future and not self._active_future.done():
raise RuntimeError("Station is already running!")
self.monitor.run()
self.bring_up_interfaces()
self.beacon.run()
self._cleanup_lock = Future()
self._cleanup_task = asyncio.ensure_future(self._cleanup_loop())
self._active_future = Future()
return self._active_future
async def start(self) -> None:
"""
Starts the station and awaits its completion. Useful for projects that just
have one station and want to use it as the event loop entry point, which is
most projects, or anything which might need to wait on an entire station's
lifecycle.
"""
await self.run()
def bring_up_interfaces(self) -> None:
"""
Attempts to bring up all interfaces and queue them into the event loop.
"""
for interface in self.interfaces.values():
interface.start()
def add_interface(self, name: str, interface: Interface) -> None:
"""
Adds an interface to the station.
"""
if interface.station != self:
raise ConfigurationError(
"You cannot assign an interface to an irrelevant station."
)
if not name:
raise ConfigurationError("Interface name may not be blank.")
if name.split() != [name]:
raise ConfigurationError(
f"Interface names may not have spaces. Got: {repr(name)}",
)
try:
name.encode("ascii")
except ValueError as err:
raise ConfigurationError(
f"Interface names MUST be ASCII. Got: {repr(name)}",
) from err
if name in self.interfaces:
raise ConfigurationError(f"Interface {repr(name)} already exists!")
self.interfaces[name] = interface
self.monitor.refresh_ports_cache()
if self.running and not self.closing:
interface.start()
async def remove_interface(self, name: str) -> None:
"""
Removes an interface from the station.
"""
if name not in self.interfaces:
raise KeyError(f"Interface {repr(name)} does not exist.")
interface = self.interfaces[name]
del self.interfaces[name]
self.monitor.refresh_ports_cache()
await asyncio.ensure_future(interface.shutdown())
def get_nth_gateway(self, index: int) -> "Interface":
"""
Get the nth gateway or raise an IndexError if not present.
"""
if index <= 0:
raise IndexError("Port numbers start at 1.")
current = 0
for interface in self.interfaces.values():
if interface.gateway:
current += 1
if current == index:
return interface
if not current:
raise IndexError("No gateways available.")
raise IndexError(f"Port {index} not found.")
async def shutdown(self) -> None:
"""
Shut down all interfaces, cutting off the station.
"""
if not self.running:
# Already shut down.
return
if self.closing and self._active_future: # pragma: no cover
logger.warning(
"Shutdown called while shutdown already called. "
"This may cause a deadlock.",
)
await self._active_future
return
self.closing = True
await self.connection.shutdown()
await self.beacon.shutdown()
for interface in self.interfaces.values():
await interface.shutdown()
await self.monitor.shutdown()
if self._active_future:
self._active_future.set_result(None)
if self._cleanup_task and self._cleanup_lock:
self._cleanup_lock.cancel()
self._cleanup_lock = None
await self._cleanup_task
self.closing = False
|
running: bool
property
Returns if the station is currently running.
settings: StationConfig
property
Returns the current configuration of the station.
__init__(*, config: StationConfig | None = None, config_file_path: str = '')
Initializes a station. Specifying a config dictionary configures the station
with the config dictionary's values.
Specifying a config_file_path will not load the config_file_path's values UNLESS
the config parameter is set to None.
The config_file_path is where configuration should be saved by administrative
utilities (such as the contributed save command in the CommandLine app.) Can be
left as an empty string to specify no intended save file.
Source code in pax25/station.py
| def __init__(
self,
*,
config: StationConfig | None = None,
config_file_path: str = "",
):
"""
Initializes a station. Specifying a config dictionary configures the station
with the config dictionary's values.
Specifying a config_file_path will not load the config_file_path's values UNLESS
the config parameter is set to None.
The config_file_path is where configuration should be saved by administrative
utilities (such as the contributed save command in the CommandLine app.) Can be
left as an empty string to specify no intended save file.
"""
# Timers may get cancelled and the station may shut down before they would
# be cleaned up normally. We track these tasks so if we shut down while any
# are still active, we can reap them manually.
self.config_file_path = config_file_path
if (config is None) and self.config_file_path:
with open(config_file_path) as config_file:
config = json.load(config_file)
if config is None:
raise ConfigurationError(
"No configuration supplied. Please either set the config argument "
"or provide a config_file_path."
)
self._active_future: Future[None] | None = None
address = Address.from_string(config["name"])
assert str(address) == config["name"], (
"Station names must be all caps and have no SSID suffix."
)
self._settings = config
self.interfaces: dict[str, Interface[Any]] = {}
self.frame_router = FrameRouter(station=self)
self.digipeater = Digipeater(station=self, settings=config.get("digipeater"))
self.connection = ConnectionService(
station=self,
settings=config.get("connection"),
)
self.monitor = Monitor(station=self, settings=config.get("monitor"))
self.beacon = BeaconService(station=self, settings=config.get("beacon"))
self.clear_subsettings()
self._cleanup_lock: Future[None] | None = None
self._cleanup_task: Future[None] | None = None
self._to_clean: list[Task[None]] = []
self.closing = False
|
add_interface(name: str, interface: Interface) -> None
Adds an interface to the station.
Source code in pax25/station.py
| def add_interface(self, name: str, interface: Interface) -> None:
"""
Adds an interface to the station.
"""
if interface.station != self:
raise ConfigurationError(
"You cannot assign an interface to an irrelevant station."
)
if not name:
raise ConfigurationError("Interface name may not be blank.")
if name.split() != [name]:
raise ConfigurationError(
f"Interface names may not have spaces. Got: {repr(name)}",
)
try:
name.encode("ascii")
except ValueError as err:
raise ConfigurationError(
f"Interface names MUST be ASCII. Got: {repr(name)}",
) from err
if name in self.interfaces:
raise ConfigurationError(f"Interface {repr(name)} already exists!")
self.interfaces[name] = interface
self.monitor.refresh_ports_cache()
if self.running and not self.closing:
interface.start()
|
bring_up_interfaces() -> None
Attempts to bring up all interfaces and queue them into the event loop.
Source code in pax25/station.py
| def bring_up_interfaces(self) -> None:
"""
Attempts to bring up all interfaces and queue them into the event loop.
"""
for interface in self.interfaces.values():
interface.start()
|
clear_subsettings() -> None
Settings are set on the station object, but the authoritative location of the
settings for each service are on that service. Remove these keys so that if we
erroneously rely on the internal representation of the settings for a service,
we raise instead of using data which may be out of date, which could cause a
very subtle bug.
Source code in pax25/station.py
| def clear_subsettings(self) -> None:
"""
Settings are set on the station object, but the authoritative location of the
settings for each service are on that service. Remove these keys so that if we
erroneously rely on the internal representation of the settings for a service,
we raise instead of using data which may be out of date, which could cause a
very subtle bug.
"""
keys: tuple[Literal["monitor", "beacon", "digipeater", "connection"], ...] = (
"monitor",
"beacon",
"digipeater",
"connection",
)
for key in keys:
if key in self._settings:
del self._settings[key]
|
collect_task(task: Task[None]) -> None
Tasks which are 'fire and forget' can be sent here to be cleaned up later
as needed. This prevents the garbage collector from freaking out and sending
tracebacks as tasks from functions like those in the timer module get reaped.
Source code in pax25/station.py
| def collect_task(self, task: Task[None]) -> None:
"""
Tasks which are 'fire and forget' can be sent here to be cleaned up later
as needed. This prevents the garbage collector from freaking out and sending
tracebacks as tasks from functions like those in the timer module get reaped.
"""
self._to_clean.append(task)
|
finished() -> None
async
Await this to wait until the server is closed down programmatically.
Source code in pax25/station.py
| async def finished(self) -> None:
"""
Await this to wait until the server is closed down programmatically.
"""
if self._active_future is None:
raise RuntimeError(
"Station has not run. We cannot wait for it to finish if "
"it never began!"
)
await self._active_future
|
get_nth_gateway(index: int) -> Interface
Get the nth gateway or raise an IndexError if not present.
Source code in pax25/station.py
| def get_nth_gateway(self, index: int) -> "Interface":
"""
Get the nth gateway or raise an IndexError if not present.
"""
if index <= 0:
raise IndexError("Port numbers start at 1.")
current = 0
for interface in self.interfaces.values():
if interface.gateway:
current += 1
if current == index:
return interface
if not current:
raise IndexError("No gateways available.")
raise IndexError(f"Port {index} not found.")
|
reload_settings(settings: StationConfig) -> None
async
Reloads station configuration, and all components involved.
Source code in pax25/station.py
| async def reload_settings(self, settings: StationConfig) -> None:
"""
Reloads station configuration, and all components involved.
"""
address = Address.from_string(settings["name"])
assert str(address) == settings["name"], (
"Station names must be all caps and have no SSID suffix."
)
self._settings = settings
digipeater_settings = maybe_update(
self.digipeater.settings,
settings.get("digipeater"),
)
monitor_settings = maybe_update(
self.monitor.settings,
settings.get("monitor"),
)
beacon_settings = maybe_update(
self.beacon.settings,
settings.get("beacon"),
)
connection_settings = maybe_update(
self.connection.settings,
settings.get("connection"),
)
self.clear_subsettings()
await asyncio.gather(
self.digipeater.reload_settings(digipeater_settings),
self.monitor.reload_settings(monitor_settings),
self.beacon.reload_settings(beacon_settings),
self.connection.reload_settings(connection_settings),
)
|
remove_interface(name: str) -> None
async
Removes an interface from the station.
Source code in pax25/station.py
| async def remove_interface(self, name: str) -> None:
"""
Removes an interface from the station.
"""
if name not in self.interfaces:
raise KeyError(f"Interface {repr(name)} does not exist.")
interface = self.interfaces[name]
del self.interfaces[name]
self.monitor.refresh_ports_cache()
await asyncio.ensure_future(interface.shutdown())
|
run() -> Future[None]
Starts the station. Tasks will not begin unless/until the asyncio loop is
running.
Source code in pax25/station.py
| def run(self) -> Future[None]:
"""
Starts the station. Tasks will not begin unless/until the asyncio loop is
running.
"""
if self._active_future and not self._active_future.done():
raise RuntimeError("Station is already running!")
self.monitor.run()
self.bring_up_interfaces()
self.beacon.run()
self._cleanup_lock = Future()
self._cleanup_task = asyncio.ensure_future(self._cleanup_loop())
self._active_future = Future()
return self._active_future
|
shutdown() -> None
async
Shut down all interfaces, cutting off the station.
Source code in pax25/station.py
| async def shutdown(self) -> None:
"""
Shut down all interfaces, cutting off the station.
"""
if not self.running:
# Already shut down.
return
if self.closing and self._active_future: # pragma: no cover
logger.warning(
"Shutdown called while shutdown already called. "
"This may cause a deadlock.",
)
await self._active_future
return
self.closing = True
await self.connection.shutdown()
await self.beacon.shutdown()
for interface in self.interfaces.values():
await interface.shutdown()
await self.monitor.shutdown()
if self._active_future:
self._active_future.set_result(None)
if self._cleanup_task and self._cleanup_lock:
self._cleanup_lock.cancel()
self._cleanup_lock = None
await self._cleanup_task
self.closing = False
|
start() -> None
async
Starts the station and awaits its completion. Useful for projects that just
have one station and want to use it as the event loop entry point, which is
most projects, or anything which might need to wait on an entire station's
lifecycle.
Source code in pax25/station.py
| async def start(self) -> None:
"""
Starts the station and awaits its completion. Useful for projects that just
have one station and want to use it as the event loop entry point, which is
most projects, or anything which might need to wait on an entire station's
lifecycle.
"""
await self.run()
|