SPEC — Event System¶
- Estado: congelado en Fase 0
- ADR principal: ADR-0006
1. Responsabilidades¶
- Transportar mensajes discretos semánticos entre módulos.
- Garantizar orden total y entrega determinista.
- Soportar prioridades (severidades) y correlación de eventos.
- Persistir todos los eventos en el canal
/eventsdel MCAP.
No es responsabilidad del Event System:
- Transportar streams periódicos de sensor o estado (eso son channels del bus de telemetría con su política propia).
- Implementar reintentos, ack o lógica de transporte de red.
2. Distinción: eventos vs streams¶
| Característica | Eventos | Streams |
|---|---|---|
| Frecuencia | Baja (≤ 1 Hz típico) | Alta (10–1000 Hz) |
| Semántica | Discreta, con significado | Continua, datos |
| Prioridad | Tienen severity | No |
| Ejemplos | takeoff, sensor_fault, mission_start | /sensors/imu0, /state/nav |
| Drop | Nunca dropea (excepto BACKPRESSURE explícito) | Drop policy por sink |
3. Schema¶
class EventSeverity(IntEnum):
DEBUG = 10
INFO = 20
WARN = 30
ERROR = 40
CRITICAL = 50
class EventType(StrEnum):
# Lifecycle
ARMED = "armed"
DISARMED = "disarmed"
TAKEOFF = "takeoff"
LANDED = "landed"
KILL = "kill"
# Mission
MISSION_START = "mission_start"
MISSION_END = "mission_end"
WAYPOINT_REACHED = "waypoint_reached"
GOAL_UPDATED = "goal_updated"
# Safety
SAFETY_VIOLATION = "safety_violation"
GEOFENCE_BREACH = "geofence_breach"
COLLISION_WARNING = "collision_warning"
COLLISION = "collision"
RECOVERY_TRIGGERED = "recovery_triggered"
# Sensors / system
SENSOR_FAULT = "sensor_fault"
SENSOR_RECOVERED = "sensor_recovered"
BATTERY_LOW = "battery_low"
# Infra
TELEMETRY_BACKPRESSURE = "telemetry_backpressure"
SCHEDULER_CALLBACK_FAILED = "scheduler_callback_failed"
@dataclass(frozen=True)
class Event:
type: EventType
severity: EventSeverity
source: str # p.ej. "control.attitude", "sim.pybullet"
stamp_sim_ns: int
stamp_wall_ns: int
sequence: int # global, monotónico, asignado por el bus
payload: Mapping[str, Any] # JSON-serializable
correlation_id: str | None
schema_version: int = 1
4. Bus de eventos¶
Implementación en ghost.events:
class EventBus:
def publish(self, ev: Event) -> None: ...
def subscribe(self, types: Iterable[EventType],
cb: Callable[[Event], None],
min_severity: EventSeverity = EventSeverity.DEBUG) -> Subscription: ...
def subscribe_all(self, cb: Callable[[Event], None],
min_severity: EventSeverity = EventSeverity.DEBUG) -> Subscription: ...
Reglas:
- Cada
publish()asignasequenceglobal atómico y entrega a subscribers en orden total. CRITICAL: entrega sincrónica al subscriber registrado comosafety_handlerantes de continuar.- Resto: entrega asíncrona desde un dispatcher en thread propio.
- Subscribers que toman > N ms de wall (configurable) emiten
TELEMETRY_BACKPRESSURE. - Un subscriber no puede publicar eventos
CRITICALdesde otroCRITICAL(evita cadenas).
5. Convención de correlation_id¶
- Cadenas de eventos relacionados comparten
correlation_id(string opaco). - Generador estándar:
f"mission-{uuid7()}". - Permite reconstruir secuencias:
MISSION_START → WAYPOINT_REACHED × n → MISSION_ENDcon mismo id.
6. Persistencia¶
Todos los eventos van al canal /events del MCAP via el TelemetryBus. La persistencia es responsabilidad de telemetría; el event bus solo coordina entrega in-process.
7. Replay¶
EventReplay:
- Lee
/eventsdel MCAP. - Reinjecta cada evento en el
EventBuscon sustamp_sim_nsysequenceoriginal. - Tests pueden comparar cadenas de eventos esperadas vs producidas.
8. Casos de uso¶
8.1 Publicación¶
events.publish(Event(
type=EventType.SAFETY_VIOLATION,
severity=EventSeverity.WARN,
source="actuators.sink",
stamp_sim_ns=clock.now_ns(),
stamp_wall_ns=time.monotonic_ns(),
sequence=0, # asignado por el bus
payload={"reason": "max_tilt_exceeded", "value_rad": 0.85},
correlation_id=None,
))
8.2 Suscripción del safety handler¶
def on_critical(ev: Event) -> None:
if ev.type == EventType.KILL:
actuators.send(emergency_stop(), ev.stamp_sim_ns)
events.subscribe_all(on_critical, min_severity=EventSeverity.CRITICAL)
8.3 Correlación de misión¶
mid = f"mission-{uuid7()}"
events.publish(Event(MISSION_START, INFO, "mission.fsm", ...,
correlation_id=mid))
# ...
events.publish(Event(WAYPOINT_REACHED, INFO, "mission.fsm", ...,
correlation_id=mid, payload={"idx": 3}))
9. Prioridades operativas¶
| Severity | Política |
|---|---|
DEBUG |
Logueado a MCAP, ignorado por consola |
INFO |
Logueado a MCAP, no consola |
WARN |
Logueado a MCAP + consola |
ERROR |
MCAP + consola + se llama a on_error handlers |
CRITICAL |
MCAP + consola + entrega sincrónica al safety handler antes de seguir |
10. Errores comunes a evitar¶
- Usar eventos para datos periódicos. Saturan el canal y rompen la semántica.
- Subscribers que bloquean. El dispatcher mide tiempos; bloqueos largos producen backpressure.
- Crear nuevos
EventTypead-hoc. El enum es centralizado; añadir un tipo es decisión consciente y va con changelog. - Olvidar
correlation_iden cadenas largas (misión, recovery). Dificulta análisis post-mortem. - Publicar
CRITICALpara condiciones recuperables.CRITICALdebe disparar respuesta inmediata; abusar lo desensibiliza.
11. Evolución futura¶
- Persistencia separada para eventos críticos en archivo aparte (más fácil de auditar).
- Filtros configurables por suscriptor (regex en
source). - Integración con sistema de notificaciones externo en HW (Telegram/email).