summaryrefslogtreecommitdiff
path: root/lib/mesa/.gitlab-ci/lava/utils/log_follower.py
blob: 1fdf490bcb8d7f245da445e2263820d2a7d6957f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
#!/usr/bin/env python3
#
# Copyright (C) 2022 Collabora Limited
# Author: Guilherme Gallo <guilherme.gallo@collabora.com>
#
# SPDX-License-Identifier: MIT

"""
Some utilities to analyse logs, create gitlab sections and other quality of life
improvements
"""

import logging
import re
import sys
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, Union

from lava.exceptions import MesaCITimeoutError
from lava.utils.console_format import CONSOLE_LOG
from lava.utils.gitlab_section import GitlabSection
from lava.utils.lava_farm import LavaFarm, get_lava_farm
from lava.utils.lava_log_hints import LAVALogHints
from lava.utils.log_section import (
    DEFAULT_GITLAB_SECTION_TIMEOUTS,
    FALLBACK_GITLAB_SECTION_TIMEOUT,
    LOG_SECTIONS,
    LogSectionType,
)


@dataclass
class LogFollower:
    starting_section: Optional[GitlabSection] = None
    _current_section: Optional[GitlabSection] = None
    section_history: list[GitlabSection] = field(default_factory=list, init=False)
    timeout_durations: dict[LogSectionType, timedelta] = field(
        default_factory=lambda: DEFAULT_GITLAB_SECTION_TIMEOUTS,
    )
    fallback_timeout: timedelta = FALLBACK_GITLAB_SECTION_TIMEOUT
    _buffer: list[str] = field(default_factory=list, init=False)
    log_hints: LAVALogHints = field(init=False)
    lava_farm: LavaFarm = field(init=False, default=get_lava_farm())
    _merge_next_line: str = field(default_factory=str, init=False)

    def __post_init__(self):
        # Make it trigger current_section setter to populate section history
        self.current_section = self.starting_section
        section_is_created = bool(self._current_section)
        section_has_started = bool(
            self._current_section and self._current_section.has_started
        )
        self.log_hints = LAVALogHints(self)
        assert (
            section_is_created == section_has_started
        ), "Can't follow logs beginning from uninitialized GitLab sections."

        # Initialize fix_lava_gitlab_section_log generator
        self.gl_section_fix_gen = fix_lava_gitlab_section_log()
        next(self.gl_section_fix_gen)

    @property
    def current_section(self):
        return self._current_section

    @current_section.setter
    def current_section(self, new_section: GitlabSection) -> None:
        if old_section := self._current_section:
            self.section_history.append(old_section)
        self._current_section = new_section

    @property
    def phase(self) -> LogSectionType:
        return (
            self._current_section.type
            if self._current_section
            else LogSectionType.UNKNOWN
        )

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Cleanup existing buffer if this object gets out from the context"""
        self.clear_current_section()
        last_lines = self.flush()
        for line in last_lines:
            print(line)

    def watchdog(self):
        if not self._current_section:
            return

        timeout_duration = self.timeout_durations.get(
            self._current_section.type, self.fallback_timeout
        )

        if self._current_section.delta_time() > timeout_duration:
            raise MesaCITimeoutError(
                f"Gitlab Section {self._current_section} has timed out",
                timeout_duration=timeout_duration,
            )

    def clear_current_section(self):
        if self._current_section and not self._current_section.has_finished:
            self._buffer.append(self._current_section.end())
            self.current_section = None

    def update_section(self, new_section: GitlabSection):
        # Sections can have redundant regex to find them to mitigate LAVA
        # interleaving kmsg and stderr/stdout issue.
        if self.current_section and self.current_section.id == new_section.id:
            return
        self.clear_current_section()
        self.current_section = new_section
        self._buffer.append(new_section.start())

    def manage_gl_sections(self, line):
        if isinstance(line["msg"], list):
            logging.debug("Ignoring messages as list. Kernel dumps.")
            return

        for log_section in LOG_SECTIONS:
            if new_section := log_section.from_log_line_to_section(line):
                self.update_section(new_section)
                break

    def detect_kernel_dump_line(self, line: dict[str, Union[str, list]]) -> bool:
        # line["msg"] can be a list[str] when there is a kernel dump
        if isinstance(line["msg"], list):
            return line["lvl"] == "debug"

        # result level has dict line["msg"]
        if not isinstance(line["msg"], str):
            return False

        # we have a line, check if it is a kernel message
        if re.search(r"\[[\d\s]{5}\.[\d\s]{6}\] +\S{2,}", line["msg"]):
            print_log(f"{CONSOLE_LOG['BOLD']}{line['msg']}{CONSOLE_LOG['RESET']}")
            return True

        return False

    def remove_trailing_whitespace(self, line: dict[str, str]) -> None:
        """
        Removes trailing whitespace from the end of the `msg` value in the log line dictionary.

        Args:
            line: A dictionary representing a single log line.

        Note:
            LAVA treats carriage return characters as a line break, so each carriage return in an output console
            is mapped to a console line in LAVA. This method removes trailing `\r\n` characters from log lines.
        """
        msg: Optional[str] = line.get("msg")
        if not msg:
            return False

        messages = [msg] if isinstance(msg, str) else msg

        for message in messages:
            # LAVA logs brings raw messages, which includes newlines characters as \r\n.
            line["msg"]: str = re.sub(r"\r\n$", "", message)

    def merge_carriage_return_lines(self, line: dict[str, str]) -> bool:
        """
        Merges lines that end with a carriage return character into a single line.

        Args:
            line: A dictionary representing a single log line.

        Returns:
            A boolean indicating whether the current line has been merged with the next line.

        Note:
            LAVA treats carriage return characters as a line break, so each carriage return in an output console
            is mapped to a console line in LAVA.
        """
        if line["msg"].endswith("\r"):
            self._merge_next_line += line["msg"]
            return True

        if self._merge_next_line:
            line["msg"] = self._merge_next_line + line["msg"]
            self._merge_next_line = ""

        return False


    def feed(self, new_lines: list[dict[str, str]]) -> bool:
        """Input data to be processed by LogFollower instance
        Returns true if the DUT (device under test) seems to be alive.
        """

        self.watchdog()

        # No signal of job health in the log
        is_job_healthy = False

        for line in new_lines:
            self.remove_trailing_whitespace(line)

            if self.detect_kernel_dump_line(line):
                continue

            if self.merge_carriage_return_lines(line):
                continue

            # At least we are fed with a non-kernel dump log, it seems that the
            # job is progressing
            is_job_healthy = True
            self.manage_gl_sections(line)
            if parsed_line := self.parse_lava_line(line):
                self._buffer.append(parsed_line)

        self.log_hints.detect_failure(new_lines)

        return is_job_healthy

    def flush(self) -> list[str]:
        buffer = self._buffer
        self._buffer = []
        return buffer

    def parse_lava_line(self, line) -> Optional[str]:
        prefix = ""
        suffix = ""

        if line["lvl"] in ["results", "feedback", "debug"]:
            return
        elif line["lvl"] in ["warning", "error"]:
            prefix = CONSOLE_LOG["FG_RED"]
            suffix = CONSOLE_LOG["RESET"]
        elif line["lvl"] == "input":
            prefix = "$ "
            suffix = ""
        elif line["lvl"] == "target" and self.lava_farm != LavaFarm.COLLABORA:
            # gl_section_fix_gen will output the stored line if it can't find a
            # match for the first split line
            # So we can recover it and put it back to the buffer
            if recovered_first_line := self.gl_section_fix_gen.send(line):
                self._buffer.append(recovered_first_line)

        return f'{prefix}{line["msg"]}{suffix}'

def fix_lava_gitlab_section_log():
    """This function is a temporary solution for the Gitlab section markers
    splitting problem. Gitlab parses the following lines to define a collapsible
    gitlab section in their log:
    - \x1b[0Ksection_start:timestamp:section_id[collapsible=true/false]\r\x1b[0Ksection_header
    - \x1b[0Ksection_end:timestamp:section_id\r\x1b[0K
    There is some problem in message passing between the LAVA dispatcher and the
    device under test (DUT), that replaces \r control characters into \n. When
    this problem is fixed on the LAVA side, one should remove this function.
    """
    while True:
        line = yield False
        first_line = None
        split_line_pattern = re.compile(r"\x1b\[0K(section_\w+):(\d+):([^\s\r]+)$")
        second_line_pattern = re.compile(r"\x1b\[0K([\S ]+)?")

        if not re.search(split_line_pattern, line["msg"]):
            continue

        first_line = line["msg"]
        # Delete the current line and hold this log line stream to be able to
        # possibly merge it with the next line.
        line["msg"] = ""
        line = yield False

        # This code reached when we detect a possible first split line
        if re.search(second_line_pattern, line["msg"]):
            assert first_line
            line["msg"] = f"{first_line}\r{line['msg']}"
        else:
            # The current line doesn't match with the previous one, send back the
            # latter to give the user the chance to recover it.
            yield first_line



def print_log(msg: str, *args) -> None:
    # Reset color from timestamp, since `msg` can tint the terminal color
    print(f"{CONSOLE_LOG['RESET']}{datetime.now()}: {msg}", *args)


def fatal_err(msg, exception=None):
    colored_msg = f"{CONSOLE_LOG['FG_RED']}"
    print_log(colored_msg, f"{msg}", f"{CONSOLE_LOG['RESET']}")
    if exception:
        raise exception
    sys.exit(1)


def hide_sensitive_data(yaml_data: str, start_hide: str = "HIDE_START", end_hide: str = "HIDE_END") -> str:
    skip_line = False
    dump_data: list[str] = []
    for line in yaml_data.splitlines(True):
        if start_hide in line:
            skip_line = True
        elif end_hide in line:
            skip_line = False

        if skip_line:
            continue

        dump_data.append(line)

    return "".join(dump_data)