ci/lava: Extract LAVA proxy and LAVAJob abstractions

Let's make lava_job_submitter.py cleaner with only parsing and retry
mechanism capabilities.

Moved out from the submitter script:

1. proxy functions
  - moved to lava.utils.lava_proxy.py
2. LAVAJob class definition
  - moved to lava.utils.lava_job.py
  - added structural logging capabilities into LAVAJob
  - Implemented properties for job_id, is_finished, and status, with
    corresponding setter methods that update the log dictionary.
  - Added new methods show, get_lava_time, and refresh_log for improved
    log handling and data retrieval.

Signed-off-by: Guilherme Gallo <guilherme.gallo@collabora.com>
Part-of: <https://gitlab.freedesktop.org/mesa/mesa/-/merge_requests/22500>
This commit is contained in:
Guilherme Gallo 2023-04-04 07:47:09 -03:00 committed by Marge Bot
parent 6f6b892dca
commit c03f7233ca
6 changed files with 328 additions and 174 deletions

View file

@ -13,21 +13,15 @@
import argparse
import contextlib
import pathlib
import re
import sys
import time
import traceback
import urllib.parse
import xmlrpc.client
from datetime import datetime, timedelta
from io import StringIO
from os import getenv
from typing import Optional
from typing import Any, Optional
import lavacli
from lava.exceptions import (
MesaCIException,
MesaCIKnownIssueException,
MesaCIParseException,
MesaCIRetryError,
MesaCITimeoutError,
@ -36,12 +30,15 @@ from lava.utils import CONSOLE_LOG
from lava.utils import DEFAULT_GITLAB_SECTION_TIMEOUTS as GL_SECTION_TIMEOUTS
from lava.utils import (
GitlabSection,
LAVAJob,
LogFollower,
LogSectionType,
call_proxy,
fatal_err,
generate_lava_yaml_payload,
hide_sensitive_data,
print_log,
setup_lava_proxy,
)
from lavacli.utils import flow_yaml as lava_yaml
@ -51,139 +48,24 @@ DEVICE_HANGING_TIMEOUT_SEC = int(getenv("LAVA_DEVICE_HANGING_TIMEOUT_SEC", 5*60
# How many seconds the script should wait before try a new polling iteration to
# check if the dispatched LAVA job is running or waiting in the job queue.
WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 10))
WAIT_FOR_DEVICE_POLLING_TIME_SEC = int(
getenv("LAVA_WAIT_FOR_DEVICE_POLLING_TIME_SEC", 1)
)
# How many seconds the script will wait to let LAVA finalize the job and give
# the final details.
WAIT_FOR_LAVA_POST_PROCESSING_SEC = int(getenv("LAVA_WAIT_LAVA_POST_PROCESSING_SEC", 5))
WAIT_FOR_LAVA_POST_PROCESSING_RETRIES = int(
getenv("LAVA_WAIT_LAVA_POST_PROCESSING_RETRIES", 3)
)
# How many seconds to wait between log output LAVA RPC calls.
LOG_POLLING_TIME_SEC = int(getenv("LAVA_LOG_POLLING_TIME_SEC", 5))
# How many retries should be made when a timeout happen.
NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2))
def setup_lava_proxy():
config = lavacli.load_config("default")
uri, usr, tok = (config.get(key) for key in ("uri", "username", "token"))
uri_obj = urllib.parse.urlparse(uri)
uri_str = "{}://{}:{}@{}{}".format(uri_obj.scheme, usr, tok, uri_obj.netloc, uri_obj.path)
transport = lavacli.RequestsTransport(
uri_obj.scheme,
config.get("proxy"),
config.get("timeout", 120.0),
config.get("verify_ssl_cert", True),
)
proxy = xmlrpc.client.ServerProxy(
uri_str, allow_none=True, transport=transport)
print_log("Proxy for {} created.".format(config['uri']))
return proxy
def _call_proxy(fn, *args):
retries = 60
for n in range(1, retries + 1):
try:
return fn(*args)
except xmlrpc.client.ProtocolError as err:
if n == retries:
traceback.print_exc()
fatal_err("A protocol error occurred (Err {} {})".format(err.errcode, err.errmsg))
else:
time.sleep(15)
except xmlrpc.client.Fault as err:
traceback.print_exc()
fatal_err("FATAL: Fault: {} (code: {})".format(err.faultString, err.faultCode))
class LAVAJob:
COLOR_STATUS_MAP = {
"pass": CONSOLE_LOG["FG_GREEN"],
"hung": CONSOLE_LOG["FG_YELLOW"],
"fail": CONSOLE_LOG["FG_RED"],
"canceled": CONSOLE_LOG["FG_MAGENTA"],
}
def __init__(self, proxy, definition):
self.job_id = None
self.proxy = proxy
self.definition = definition
self.last_log_line = 0
self.last_log_time = None
self.is_finished = False
self.status = "created"
def heartbeat(self):
self.last_log_time = datetime.now()
self.status = "running"
def validate(self) -> Optional[dict]:
"""Returns a dict with errors, if the validation fails.
Returns:
Optional[dict]: a dict with the validation errors, if any
"""
return _call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True)
def submit(self):
try:
self.job_id = _call_proxy(self.proxy.scheduler.jobs.submit, self.definition)
except MesaCIException:
return False
return True
def cancel(self):
if self.job_id:
self.proxy.scheduler.jobs.cancel(self.job_id)
def is_started(self) -> bool:
waiting_states = ["Submitted", "Scheduling", "Scheduled"]
job_state: dict[str, str] = _call_proxy(
self.proxy.scheduler.job_state, self.job_id
)
return job_state["job_state"] not in waiting_states
def _load_log_from_data(self, data) -> list[str]:
lines = []
if isinstance(data, xmlrpc.client.Binary):
# We are dealing with xmlrpc.client.Binary
# Let's extract the data
data = data.data
# When there is no new log data, the YAML is empty
if loaded_lines := lava_yaml.load(data):
lines = loaded_lines
self.last_log_line += len(lines)
return lines
def get_logs(self) -> list[str]:
try:
(finished, data) = _call_proxy(
self.proxy.scheduler.jobs.logs, self.job_id, self.last_log_line
)
self.is_finished = finished
return self._load_log_from_data(data)
except Exception as mesa_ci_err:
raise MesaCIParseException(
f"Could not get LAVA job logs. Reason: {mesa_ci_err}"
) from mesa_ci_err
def parse_job_result_from_log(
self, lava_lines: list[dict[str, str]]
) -> list[dict[str, str]]:
"""Use the console log to catch if the job has completed successfully or
not. Returns the list of log lines until the result line."""
last_line = None # Print all lines. lines[:None] == lines[:]
for idx, line in enumerate(lava_lines):
if result := re.search(r"hwci: mesa: (pass|fail)", line):
self.is_finished = True
self.status = result.group(1)
last_line = idx + 1
# We reached the log end here. hwci script has finished.
break
return lava_lines[:last_line]
NUMBER_OF_RETRIES_TIMEOUT_DETECTION = int(
getenv("LAVA_NUMBER_OF_RETRIES_TIMEOUT_DETECTION", 2)
)
def find_exception_from_metadata(metadata, job_id):
if "result" not in metadata or metadata["result"] != "fail":
@ -212,7 +94,7 @@ def find_exception_from_metadata(metadata, job_id):
def find_lava_error(job) -> None:
# Look for infrastructure errors and retry if we see them.
results_yaml = _call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
results_yaml = call_proxy(job.proxy.results.get_testjob_results_yaml, job.job_id)
results = lava_yaml.load(results_yaml)
for res in results:
metadata = res["metadata"]
@ -231,12 +113,40 @@ def show_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{CONSOLE_LOG['FG_GREEN']}")
start_collapsed=True,
colour=colour,
):
show = _call_proxy(job.proxy.scheduler.jobs.show, job.job_id)
for field, value in show.items():
wait_post_processing_retries: int = WAIT_FOR_LAVA_POST_PROCESSING_RETRIES
while not job.is_post_processed() and wait_post_processing_retries > 0:
# Wait a little until LAVA finishes processing metadata
time.sleep(WAIT_FOR_LAVA_POST_PROCESSING_SEC)
wait_post_processing_retries -= 1
if not job.is_post_processed():
waited_for_sec: int = (
WAIT_FOR_LAVA_POST_PROCESSING_RETRIES * WAIT_FOR_DEVICE_POLLING_TIME_SEC
)
print_log(
f"Waited for {waited_for_sec} seconds"
"for LAVA to post-process the job, it haven't finished yet. "
"Dumping it's info anyway"
)
details: dict[str, str] = job.show()
for field, value in details.items():
print(f"{field:<15}: {value}")
job.refresh_log()
def fetch_logs(job, max_idle_time, log_follower) -> None:
is_job_hanging(job, max_idle_time)
time.sleep(LOG_POLLING_TIME_SEC)
new_log_lines = fetch_new_log_lines(job)
parsed_lines = parse_log_lines(job, log_follower, new_log_lines)
for line in parsed_lines:
print_log(line)
def is_job_hanging(job, max_idle_time):
# Poll to check for new logs, assuming that a prolonged period of
# silence means that the device has died and we should try it again
if datetime.now() - job.last_log_time > max_idle_time:
@ -251,17 +161,8 @@ def fetch_logs(job, max_idle_time, log_follower) -> None:
timeout_duration=max_idle_time,
)
time.sleep(LOG_POLLING_TIME_SEC)
# The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
# Retry the log fetching several times before exposing the error.
for _ in range(5):
with contextlib.suppress(MesaCIParseException):
new_log_lines = job.get_logs()
break
else:
raise MesaCIParseException
def parse_log_lines(job, log_follower, new_log_lines):
if log_follower.feed(new_log_lines):
# If we had non-empty log data, we can assure that the device is alive.
job.heartbeat()
@ -275,12 +176,22 @@ def fetch_logs(job, max_idle_time, log_follower) -> None:
LogSectionType.LAVA_POST_PROCESSING,
):
parsed_lines = job.parse_job_result_from_log(parsed_lines)
for line in parsed_lines:
print_log(line)
return parsed_lines
def follow_job_execution(job):
def fetch_new_log_lines(job):
# The XMLRPC binary packet may be corrupted, causing a YAML scanner error.
# Retry the log fetching several times before exposing the error.
for _ in range(5):
with contextlib.suppress(MesaCIParseException):
new_log_lines = job.get_logs()
break
else:
raise MesaCIParseException
return new_log_lines
def submit_job(job):
try:
job.submit()
except Exception as mesa_ci_err:
@ -288,11 +199,16 @@ def follow_job_execution(job):
f"Could not submit LAVA job. Reason: {mesa_ci_err}"
) from mesa_ci_err
def wait_for_job_get_started(job):
print_log(f"Waiting for job {job.job_id} to start.")
while not job.is_started():
time.sleep(WAIT_FOR_DEVICE_POLLING_TIME_SEC)
job.refresh_log()
print_log(f"Job {job.job_id} started.")
def bootstrap_log_follower() -> LogFollower:
gl = GitlabSection(
id="lava_boot",
header="LAVA boot",
@ -300,14 +216,16 @@ def follow_job_execution(job):
start_collapsed=True,
)
print(gl.start())
max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
with LogFollower(current_section=gl) as lf:
return LogFollower(current_section=gl)
def follow_job_execution(job, log_follower):
with log_follower:
max_idle_time = timedelta(seconds=DEVICE_HANGING_TIMEOUT_SEC)
# Start to check job's health
job.heartbeat()
while not job.is_finished:
fetch_logs(job, max_idle_time, lf)
fetch_logs(job, max_idle_time, log_follower)
# Mesa Developers expect to have a simple pass/fail job result.
# If this does not happen, it probably means a LAVA infrastructure error
@ -327,41 +245,52 @@ def print_job_final_status(job):
f"{CONSOLE_LOG['RESET']}"
)
job.refresh_log()
job.log["status"] = job.status
show_job_data(job, colour=f"{CONSOLE_LOG['BOLD']}{color}")
def retriable_follow_job(proxy, job_definition) -> LAVAJob:
retry_count = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
def execute_job_with_retries(proxy, job_definition, retry_count) -> Optional[LAVAJob]:
for attempt_no in range(1, retry_count + 2):
# Need to get the logger value from its object to enable autosave
# features, if AutoSaveDict is enabled from StructuredLogging module
job = LAVAJob(proxy, job_definition)
try:
follow_job_execution(job)
submit_job(job)
wait_for_job_get_started(job)
log_follower: LogFollower = bootstrap_log_follower()
follow_job_execution(job, log_follower)
return job
except MesaCIKnownIssueException as found_issue:
print_log(found_issue)
job.status = "canceled"
except MesaCIException as mesa_exception:
print_log(mesa_exception)
job.cancel()
except KeyboardInterrupt as e:
print_log("LAVA job submitter was interrupted. Cancelling the job.")
job.cancel()
raise e
finally:
except (MesaCIException, KeyboardInterrupt) as exception:
job.handle_exception(exception)
print_log(
f"{CONSOLE_LOG['BOLD']}"
f"Finished executing LAVA job in the attempt #{attempt_no}"
f"{CONSOLE_LOG['RESET']}"
)
finally:
print_job_final_status(job)
def retriable_follow_job(proxy, job_definition) -> LAVAJob:
number_of_retries = NUMBER_OF_RETRIES_TIMEOUT_DETECTION
if finished_job := execute_job_with_retries(
proxy, job_definition, number_of_retries
):
return finished_job
# Job failed in all attempts
raise MesaCIRetryError(
f"{CONSOLE_LOG['BOLD']}"
f"{CONSOLE_LOG['FG_RED']}"
"Job failed after it exceeded the number of "
f"{retry_count} retries."
f"{number_of_retries} retries."
f"{CONSOLE_LOG['RESET']}",
retry_count=retry_count,
retry_count=number_of_retries,
)

View file

@ -1,6 +1,8 @@
from .console_format import CONSOLE_LOG
from .gitlab_section import GitlabSection
from .lava_job import LAVAJob
from .lava_job_definition import generate_lava_yaml_payload
from .lava_proxy import call_proxy, setup_lava_proxy
from .log_follower import (
LogFollower,
fatal_err,

View file

@ -0,0 +1,175 @@
import re
import xmlrpc
from collections import defaultdict
from datetime import datetime
from typing import Any, Optional
from lava.exceptions import (
MesaCIException,
MesaCIKnownIssueException,
MesaCIParseException,
MesaCITimeoutError,
)
from lava.utils import CONSOLE_LOG
from lava.utils.log_follower import print_log
from lavacli.utils import flow_yaml as lava_yaml
from .lava_proxy import call_proxy
class LAVAJob:
COLOR_STATUS_MAP: dict[str, str] = {
"pass": CONSOLE_LOG["FG_GREEN"],
"hung": CONSOLE_LOG["FG_YELLOW"],
"fail": CONSOLE_LOG["FG_RED"],
"canceled": CONSOLE_LOG["FG_MAGENTA"],
}
def __init__(self, proxy, definition, log=defaultdict(str)) -> None:
self._job_id = None
self.proxy = proxy
self.definition = definition
self.last_log_line = 0
self.last_log_time = None
self._is_finished = False
self.log: dict[str, Any] = log
self.status = "not_submitted"
def heartbeat(self) -> None:
self.last_log_time: datetime = datetime.now()
self.status = "running"
@property
def status(self) -> str:
return self._status
@status.setter
def status(self, new_status: str) -> None:
self._status = new_status
self.log["status"] = self._status
@property
def job_id(self) -> int:
return self._job_id
@job_id.setter
def job_id(self, new_id: int) -> None:
self._job_id = new_id
self.log["lava_job_id"] = self._job_id
@property
def is_finished(self) -> bool:
return self._is_finished
def validate(self) -> Optional[dict]:
"""Returns a dict with errors, if the validation fails.
Returns:
Optional[dict]: a dict with the validation errors, if any
"""
return call_proxy(self.proxy.scheduler.jobs.validate, self.definition, True)
def show(self) -> dict[str, str]:
return call_proxy(self.proxy.scheduler.jobs.show, self._job_id)
def get_lava_time(self, key, data) -> Optional[str]:
return data[key].value if data[key] else None
def refresh_log(self) -> None:
details = self.show()
self.log["dut_start_time"] = self.get_lava_time("start_time", details)
self.log["dut_submit_time"] = self.get_lava_time("submit_time", details)
self.log["dut_end_time"] = self.get_lava_time("end_time", details)
self.log["dut_name"] = details.get("device")
self.log["dut_state"] = details.get("state")
def submit(self) -> bool:
try:
self.job_id = call_proxy(self.proxy.scheduler.jobs.submit, self.definition)
self.status = "submitted"
self.refresh_log()
except MesaCIException:
return False
return True
def lava_state(self) -> str:
job_state: dict[str, str] = call_proxy(
self.proxy.scheduler.job_state, self._job_id
)
return job_state["job_state"]
def cancel(self):
if self._job_id:
self.proxy.scheduler.jobs.cancel(self._job_id)
# If we don't have yet set another job's status, let's update it
# with canceled one
if self.status == "running":
self.status = "canceled"
def is_started(self) -> bool:
waiting_states = ("Submitted", "Scheduling", "Scheduled")
return self.lava_state() not in waiting_states
def is_post_processed(self) -> bool:
return self.lava_state() != "Running"
def _load_log_from_data(self, data) -> list[str]:
lines = []
if isinstance(data, xmlrpc.client.Binary):
# We are dealing with xmlrpc.client.Binary
# Let's extract the data
data = data.data
# When there is no new log data, the YAML is empty
if loaded_lines := lava_yaml.load(data):
lines: list[str] = loaded_lines
self.last_log_line += len(lines)
return lines
def get_logs(self) -> list[str]:
try:
(finished, data) = call_proxy(
self.proxy.scheduler.jobs.logs, self._job_id, self.last_log_line
)
self._is_finished = finished
return self._load_log_from_data(data)
except Exception as mesa_ci_err:
raise MesaCIParseException(
f"Could not get LAVA job logs. Reason: {mesa_ci_err}"
) from mesa_ci_err
def parse_job_result_from_log(
self, lava_lines: list[dict[str, str]]
) -> list[dict[str, str]]:
"""Use the console log to catch if the job has completed successfully or
not. Returns the list of log lines until the result line."""
last_line = None # Print all lines. lines[:None] == lines[:]
for idx, line in enumerate(lava_lines):
if result := re.search(r"hwci: mesa: (pass|fail)", line):
self._is_finished = True
self.status = result[1]
last_line = idx + 1
# We reached the log end here. hwci script has finished.
break
return lava_lines[:last_line]
def handle_exception(self, exception: Exception):
print_log(exception)
if isinstance(exception, MesaCIKnownIssueException):
self.status = "canceled"
elif isinstance(exception, MesaCITimeoutError):
self.status = "hung"
elif isinstance(exception, MesaCIException):
self.status = "failed"
elif isinstance(exception, KeyboardInterrupt):
self.status = "canceled_by_user"
print_log("LAVA job submitter was interrupted. Cancelling the job.")
raise exception
else:
self.status = "job_submitter_error"
self.cancel()
self.log["dut_job_fail_reason"] = str(exception)

View file

@ -0,0 +1,44 @@
import time
import traceback
import urllib
import urllib.parse
import xmlrpc
import xmlrpc.client
import lavacli
from .log_follower import fatal_err, print_log
def setup_lava_proxy():
config = lavacli.load_config("default")
uri, usr, tok = (config.get(key) for key in ("uri", "username", "token"))
uri_obj = urllib.parse.urlparse(uri)
uri_str = f"{uri_obj.scheme}://{usr}:{tok}@{uri_obj.netloc}{uri_obj.path}"
transport = lavacli.RequestsTransport(
uri_obj.scheme,
config.get("proxy"),
config.get("timeout", 120.0),
config.get("verify_ssl_cert", True),
)
proxy = xmlrpc.client.ServerProxy(uri_str, allow_none=True, transport=transport)
print_log(f'Proxy for {config["uri"]} created.')
return proxy
def call_proxy(fn, *args):
retries = 60
for n in range(1, retries + 1):
try:
return fn(*args)
except xmlrpc.client.ProtocolError as err:
if n == retries:
traceback.print_exc()
fatal_err(f"A protocol error occurred (Err {err.errcode} {err.errmsg})")
else:
time.sleep(15)
except xmlrpc.client.Fault as err:
traceback.print_exc()
fatal_err(f"FATAL: Fault: {err.faultString} (code: {err.faultCode})", err)

View file

@ -270,11 +270,13 @@ def print_log(msg: str) -> None:
print(f"{CONSOLE_LOG['RESET']}{datetime.now()}: {msg}")
def fatal_err(msg):
def fatal_err(msg, exception=None):
colored_msg = f"{CONSOLE_LOG['FG_RED']}"
f"{msg}"
f"{CONSOLE_LOG['RESET']}"
print_log(colored_msg)
if exception:
raise exception
sys.exit(1)

View file

@ -16,6 +16,7 @@ from lava.lava_job_submitter import (
DEVICE_HANGING_TIMEOUT_SEC,
NUMBER_OF_RETRIES_TIMEOUT_DETECTION,
LAVAJob,
bootstrap_log_follower,
follow_job_execution,
retriable_follow_job,
)
@ -52,7 +53,8 @@ def test_submit_and_follow_respects_exceptions(mock_sleep, mock_proxy, exception
with pytest.raises(MesaCIException):
proxy = mock_proxy(side_effect=exception)
job = LAVAJob(proxy, '')
follow_job_execution(job)
log_follower = bootstrap_log_follower()
follow_job_execution(job, log_follower)
NETWORK_EXCEPTION = xmlrpc.client.ProtocolError("", 0, "test", {})
@ -179,7 +181,7 @@ PROXY_SCENARIOS = {
"fail",
{},
),
"XMLRPC Fault": ([XMLRPC_FAULT], pytest.raises(SystemExit, match="1"), False, {}),
"XMLRPC Fault": ([XMLRPC_FAULT], pytest.raises(MesaCIRetryError), False, {}),
}