aboutsummaryrefslogtreecommitdiff
blob: 184f3454a15f45acf8834f4e32401ea40837e232 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
"""Pipeline that parallelizes check running."""

import multiprocessing
import os
import signal
import traceback
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from itertools import chain
from operator import attrgetter

from . import base
from .checks import init_checks
from .sources import UnversionedSource, VersionedSource


class Pipeline:
    """Check-running pipeline leveraging scope-based parallelism.

    All results are pushed into the results queue as lists of result objects or
    exception traceback strings. This iterator forces exceptions to be handled
    explicitly by outputting the serialized traceback and signaling the process
    group to end when an exception is raised.
    """

    def __init__(self, options):
        self.options = options
        # results flagged as errors by the --exit option
        self.errors = []

        # pkgcheck currently requires the fork start method (#254)
        self._mp_ctx = multiprocessing.get_context("fork")
        self._results_q = self._mp_ctx.SimpleQueue()

        # create checkrunners
        self._pipes = self._create_runners()

        # initialize settings used by iterator support
        self._runner = self._mp_ctx.Process(target=self._run)
        signal.signal(signal.SIGINT, self._kill_pipe)
        self._results_iter = iter(self._results_q.get, None)
        self._results = deque()

        if self.options.pkg_scan:
            # package level scans sort all returned results
            self._ordered_results = {
                scope: [] for scope in base.scopes.values() if scope >= base.package_scope
            }
        else:
            # scoped mapping for caching repo and location specific results
            self._ordered_results = {
                scope: []
                for scope in reversed(list(base.scopes.values()))
                if scope <= base.repo_scope
            }

    def _filter_checks(self, scope):
        """Verify check scope against given scope to determine activation."""
        for check in sorted(self.options.enabled_checks, key=attrgetter("__name__")):
            if isinstance(check.scope, base.ConditionalScope):
                # conditionally enabled check
                yield check
            elif isinstance(check.scope, base.LocationScope):
                if not self.options.selected_scopes:
                    if scope == base.repo_scope or check.scope in scope:
                        # allow repo scans or cwd scope to trigger location specific checks
                        yield check
                elif check.scope in self.options.selected_scopes:
                    # Allow checks with special scopes to be run when specifically
                    # requested, e.g. eclass-only scanning.
                    yield check
            elif isinstance(scope, base.PackageScope) and check.scope >= scope:
                # Only run pkg-related checks at or below the current scan scope level, if
                # pkg scanning is requested, e.g. skip repo level checks when scanning at
                # package level.
                yield check

    def _create_runners(self):
        """Initialize and categorize checkrunners for results pipeline."""
        pipes = {"async": [], "sync": [], "sequential": []}

        # use addon/source caches to avoid re-initializing objects
        addons_map = {}
        source_map = {}

        for scope, restriction in self.options.restrictions:
            # initialize enabled checks
            addons = list(base.get_addons(self._filter_checks(scope)))
            if not addons:
                raise base.PkgcheckUserException(
                    f"no matching checks available for {scope.desc} scope"
                )
            checks = init_checks(
                addons, self.options, self._results_q, addons_map=addons_map, source_map=source_map
            )

            # Initialize checkrunners per source type using separate runner for
            # async checks and categorize them for parallelization based on the
            # scan and source scope.
            runners = {
                "async": defaultdict(list),
                "sync": defaultdict(list),
                "sequential": defaultdict(list),
            }
            for (source, runner_cls), check_objs in checks.items():
                runner = runner_cls(self.options, source, check_objs)
                if not self.options.pkg_scan and source.scope >= base.package_scope:
                    runners[runner_cls.type][base.package_scope].append(runner)
                else:
                    runners[runner_cls.type][source.scope].append(runner)

            for exec_type in pipes.keys():
                if runners[exec_type]:
                    pipes[exec_type].append((scope, restriction, runners[exec_type]))

        return pipes

    def _kill_pipe(self, *args, error=None):
        """Handle terminating the pipeline process group."""
        if self._runner.is_alive():
            os.killpg(self._runner.pid, signal.SIGKILL)
        if error is not None:
            # propagate exception raised during parallel scan
            raise base.PkgcheckUserException(error)
        raise KeyboardInterrupt

    def __iter__(self):
        # start running the check pipeline
        self._runner.start()
        return self

    def __next__(self):
        while True:
            try:
                result = self._results.popleft()
                if not result._filtered and result.__class__ in self.options.filtered_keywords:
                    if result.__class__ in self.options.exit_keywords:
                        self.errors.append(result)
                    return result
            except IndexError:
                try:
                    results = next(self._results_iter)
                except StopIteration:
                    if self._ordered_results is None:
                        raise
                    self._runner.join()
                    # output cached results in registered order
                    results = chain.from_iterable(map(sorted, self._ordered_results.values()))
                    self._results.extend(results)
                    self._ordered_results = None
                    continue

                # Catch propagated, serialized exceptions, output their
                # traceback, and signal the scanning process to end.
                if isinstance(results, str):
                    self._kill_pipe(error=results.strip())

                # cache registered result scopes to forcibly order output
                try:
                    self._ordered_results[results[0].scope].extend(results)
                except KeyError:
                    self._results.extend(results)

    def _queue_work(self, sync_pipes, work_q):
        """Producer that queues scanning tasks against granular scope restrictions."""
        versioned_source = VersionedSource(self.options)
        unversioned_source = UnversionedSource(self.options)

        for i, (scan_scope, restriction, pipes) in enumerate(sync_pipes):
            for scope, runners in pipes.items():
                num_runners = len(runners)
                if base.version_scope in (scope, scan_scope):
                    for restrict in versioned_source.itermatch(restriction):
                        for j in range(num_runners):
                            work_q.put((scope, restrict, i, [j]))
                elif scope == base.package_scope:
                    for restrict in unversioned_source.itermatch(restriction):
                        work_q.put((scope, restrict, i, range(num_runners)))
                else:
                    for j in range(num_runners):
                        work_q.put((scope, restriction, i, [j]))

        # notify consumers that no more work exists
        for i in range(self.options.jobs):
            work_q.put(None)

    def _run_checks(self, pipes, work_q):
        """Consumer that runs scanning tasks, queuing results for output."""
        try:
            for scope, restrict, i, runners in iter(work_q.get, None):
                if results := sorted(
                    chain.from_iterable(pipes[i][-1][scope][j].run(restrict) for j in runners)
                ):
                    self._results_q.put(results)
        except Exception:  # pragma: no cover
            # traceback can't be pickled so serialize it
            tb = traceback.format_exc()
            self._results_q.put(tb)

    def _schedule_async(self, async_pipes):
        """Schedule asynchronous checks."""
        try:
            with ThreadPoolExecutor(max_workers=self.options.tasks) as executor:
                # schedule any existing async checks
                futures = {}
                for _scope, restriction, pipes in async_pipes:
                    for runner in chain.from_iterable(pipes.values()):
                        runner.schedule(executor, futures, restriction)
        except Exception:  # pragma: no cover
            # traceback can't be pickled so serialize it
            tb = traceback.format_exc()
            self._results_q.put(tb)

    def _run(self):
        """Run the scanning pipeline in parallel by check and scanning scope."""
        try:
            signal.signal(signal.SIGINT, signal.SIG_DFL)
            os.setpgrp()

            # schedule asynchronous checks in a separate process
            async_proc = None
            if async_pipes := self._pipes["async"]:
                async_proc = self._mp_ctx.Process(target=self._schedule_async, args=(async_pipes,))
                async_proc.start()

            # run synchronous checks using a process pool
            if sync_pipes := self._pipes["sync"]:
                work_q = self._mp_ctx.SimpleQueue()
                pool = self._mp_ctx.Pool(self.options.jobs, self._run_checks, (sync_pipes, work_q))
                pool.close()
                self._queue_work(sync_pipes, work_q)
                pool.join()

            if sequential_pipes := self._pipes["sequential"]:
                for _scope, restriction, pipes in sequential_pipes:
                    for runner in chain.from_iterable(pipes.values()):
                        if results := tuple(runner.run(restriction)):
                            self._results_q.put(results)

            if async_proc is not None:
                async_proc.join()
            # notify iterator that no more results exist
            self._results_q.put(None)
        except Exception:  # pragma: no cover
            # traceback can't be pickled so serialize it
            tb = traceback.format_exc()
            self._results_q.put(tb)