Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Status LED on failure #1136

Open
sibertdeclercq opened this issue Feb 16, 2024 · 1 comment
Open

Status LED on failure #1136

sibertdeclercq opened this issue Feb 16, 2024 · 1 comment

Comments

@sibertdeclercq
Copy link

One of the plugs I have written is an RGB LED to indicate different phases and statusses with. The problem I'm running into is that the test result is only available after the test.execute as far as I can see.

This means I don't have my plugs available to set the LED to red. I'm now abusing teardown to read the test record outcome to:

  1. Set the LED
  2. Wait for my DUT to detach to start the next cycle

Is there a better wat to achieve this? I've looked into output callbacks but I couldn't find if it's possible to pass plugs to the callbacks.

Some pointers would be appreciated!

@mbenabda
Copy link

mbenabda commented Mar 27, 2024

you could write a client to the websocket server that the station and dashboard servers expose.
Here is a quick and dirty implementation that i have laying around. You can base off of it to show test phases execution instead of test&station status as it is currently doing. It has the following dependencies:

gpiozero
websockets
backoff
import collections
import itertools
import json
import logging
import asyncio
import secrets
import string
import time
from typing import Union
import websockets
import random
from gpiozero import RGBLED
import backoff
import argparse

RED    = (1, 0, 0)
GREEN  = (0, 1, 0)
BLUE   = (0, 0, 1)
PURPLE = (1, 0, 1)

KEEP_LED_IN_CURRENT_STATE = lambda: None

logger = logging.getLogger()

ONE_MINUTE = 60
def unwrap(payload):
  payload = str(payload)
  return payload[1:] if len(payload) > 1 else None

@backoff.on_exception(backoff.expo, ConnectionRefusedError, max_time=ONE_MINUTE)
@backoff.on_exception(backoff.expo, websockets.ConnectionClosed, max_time=1)
async def test_status_ui(logger, uri, station_id, led):
  async with websockets.connect(uri) as websocket:
    logger.info(f"Started test status UI")

    while True:
      payload = await websocket.recv()
      if payload is None:
        break # TODO continue instead ?
      try:
        msg = unwrap(payload)

        if msg is None:
          continue

        latest_test = latest_test_execution(json.loads(msg), station_id)
        show_test_status(logger, latest_test, led)
      except:
        logger.exception(f"unable to handle payload: {payload}")
        continue # ignore

@backoff.on_exception(backoff.expo, ConnectionRefusedError, max_time=ONE_MINUTE)
@backoff.on_exception(backoff.expo, websockets.ConnectionClosed, max_time=1)
async def station_status_ui(logger, uri, station_id, led):
  was_connected = False
  try:
    async with websockets.connect(uri) as websocket:
      logger.info(f"Started station UI")

      was_connected = True
      while True:
        payload = await websocket.recv()
        if payload is None:
          break # TODO continue instead ?
        try:
          msg = unwrap(payload)

          if msg is None:
            continue

          def get_station(event, station_id):
            hosts_stations = map(lambda host: host.values(), event)
            stations = itertools.chain(*hosts_stations)
            get_id = lambda station : station.get("station_id", None)
            return max(filter(lambda station: get_id(station) == station_id, stations), key = get_id, default=None)

          station = get_station(json.loads(msg), station_id)
          
          if station is None:
            continue

          station_status = station.get("status", "maybe offline")

          logger.info(f"station state is: {station_status}")
          set_led_color(led, GREEN if station_status == "ONLINE" else RED)()
        except:
          logger.exception(f"unable to handle payload: {payload}")
          continue # ignore
  except (websockets.ConnectionClosed, ConnectionRefusedError):
    if was_connected:
      logger.info(f"unknown station status: dashboard is unreachable")
      led.off()
      was_connected = False
    raise

def set_led_color(led, color):
  def handler():
    led.color = color
  return handler

def blink(led, color):
  def handler():
    led.blink(on_color = color, on_time=.2, off_time=.2)
  return handler

TestExecutionProgress = collections.namedtuple("TestExecutionProgress", ["status", "outcome", "execution", "test", "start_time_millis", "end_time_millis"])
def as_test_execution(event):
  test_state = event.get("state", {})
  test_record = test_state.get("test_record", {})
  execution_uid = test_state.get("execution_uid", None)
  test_uid = event.get("test_uid", None)
  if execution_uid and test_uid:
    return TestExecutionProgress(
      execution = execution_uid,
      test = test_uid,
      # status enum values at https://github.com/google/openhtf/blob/c85fb069a1ce407e82bb47a8fb1b64220e974c5f/openhtf/core/test_state.py#L152
      status = test_state.get("status", "UNKNOWN"),
      # outcome enum values at https://github.com/google/openhtf/blob/c85fb069a1ce407e82bb47a8fb1b64220e974c5f/openhtf/core/test_record.py#L54
      outcome = test_record.get("outcome", "UNKNOWN"),

      start_time_millis = test_record.get("start_time_millis", None),
      end_time_millis = test_record.get("end_time_millis", None)
    )
  return None

def latest_test_execution(events, station_id):
  referenced_station = lambda event: event.get("state", {}).get("test_record", {}).get("station_id", None)
  events_of_watched_station = filter(lambda event: referenced_station(event) == station_id, events)
  test_executions = list(filter(lambda e: e is not None, map(as_test_execution, events_of_watched_station)))
  now_in_millis = int(round(time.time() * 1000))
  return max(test_executions, key = lambda execution: execution.start_time_millis or now_in_millis, default = None)

def show_test_status(logger, test: Union[TestExecutionProgress, None], led):
  if test is None:
    return

  logger.info(f"test status is {test.status} with outcome {test.outcome}")
  logger.debug(f"showing status of test {json.dumps(test, indent=2)}")

  display_status = {
    "WAITING_FOR_TEST_START": KEEP_LED_IN_CURRENT_STATE,
    "RUNNING": blink(led, GREEN),
    "COMPLETED": lambda: {
      "PASS": set_led_color(led, GREEN),
      "FAIL": set_led_color(led, RED),
      "ERROR": set_led_color(led, PURPLE),
    "TIMEOUT": blink(led, PURPLE),
    "ABORTED": led.off,
    }.get(test.outcome, set_led_color(led, RED))(),
  }.get(test.status, led.off)

  display_status()

if __name__ == '__main__':
  logging.basicConfig(level=logging.INFO)

  parser = argparse.ArgumentParser(prog='ledsui', conflict_handler="error")
  parser.add_argument('--log-level', choices=logging._nameToLevel.keys(), default='INFO', help="Set the log level")

  station_opts = parser.add_argument_group("Station")
  station_opts.add_argument('--station-id', type=str, default="Skipper sensor provisioning", help="id of the test station for which to display statuses", metavar="ID")
  station_opts.add_argument('--dashboard-server-host', type=str, default="127.0.0.1", help="Host of the dashboard server", metavar="HOST")
  station_opts.add_argument('--dashboard-server-port', type=str, default="4444", help="Port of the dashboard server", metavar="PORT")

  station_status_ui_opts = parser.add_argument_group("station status LED")
  station_status_ui_opts.add_argument('--station-status-red-led-gpio', type=int, default=17, help="GPIO number of the red component for the station status LED", metavar="GPIO_NUM")
  station_status_ui_opts.add_argument('--station-status-green-led-gpio', type=int, default=27, help="GPIO number of the green component for the station status LED", metavar="GPIO_NUM")
  station_status_ui_opts.add_argument('--station-status-blue-led-gpio', type=int, default=22, help="GPIO number of the blue component for the station status LED", metavar="GPIO_NUM")

  test_status_ui_opts = parser.add_argument_group("test status LED")
  test_status_ui_opts.add_argument('--test-status-red-led-gpio', type=int, default=25, help="GPIO number of the red component for the test status LED", metavar="GPIO_NUM")
  test_status_ui_opts.add_argument('--test-status-green-led-gpio', type=int, default=24, help="GPIO number of the green component for the test status LED", metavar="GPIO_NUM")
  test_status_ui_opts.add_argument('--test-status-blue-led-gpio', type=int, default=23, help="GPIO number of the blue component for the test status LED", metavar="GPIO_NUM")

  config = parser.parse_known_args()[0]

  logger.setLevel(config.log_level)
  logging.getLogger('backoff').setLevel(logging.ERROR)

  client_id = str(random.randint(0, 1000))
  subscription_id_prefix = ''.join(secrets.choice(string.ascii_lowercase + string.digits) for _ in range(7))

  ws_url = lambda topic: f"ws://{config.dashboard_server_host}:{config.dashboard_server_port}/sub/{topic}/websocket"

  uis = [
    station_status_ui(
      logger.getChild("station_status_ui"),
      ws_url(f"dashboard/{client_id}/{subscription_id_prefix}s"),
      station_id = config.station_id,
      led = RGBLED(
        red           = config.station_status_red_led_gpio,
        green         = config.station_status_green_led_gpio,
        blue          = config.station_status_blue_led_gpio,
        initial_value = (0, 0, 0),
        active_high   = True,
        pwm           = False
      )
    ),
    test_status_ui(
      logger.getChild("test_status_ui"),
      ws_url(f"station/{client_id}/{subscription_id_prefix}t"),
      station_id = config.station_id,
      led = RGBLED(
        red           = config.test_status_red_led_gpio,
        green         = config.test_status_green_led_gpio,
        blue          = config.test_status_blue_led_gpio,
        initial_value = (0, 0, 0),
        active_high   = True,
        pwm           = False
      )
    )
  ]

  logger.info(f"Starting UI for station of id '{config.station_id}' at {config.dashboard_server_host}:{config.dashboard_server_port}")

  loop = asyncio.get_event_loop()
  done, pending = loop.run_until_complete(asyncio.wait(
    [ asyncio.ensure_future(ui) for ui in uis ],
    return_when = asyncio.FIRST_COMPLETED
  ))

  for task in pending:
    task.cancel()

  loop.close()

  exit(len(pending))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants