diff options
author | Zac Medico <zmedico@gentoo.org> | 2024-02-12 20:35:02 -0800 |
---|---|---|
committer | Zac Medico <zmedico@gentoo.org> | 2024-02-21 07:27:31 -0800 |
commit | 389bb304abf5705b9f0e9e75982a682f193af985 (patch) | |
tree | 6a426da2e19aff99520d104e4d2ed62ef0331668 | |
parent | asyncio: Wrap asyncio.Lock for python 3.9 compat (diff) | |
download | portage-389bb304abf5705b9f0e9e75982a682f193af985.tar.gz portage-389bb304abf5705b9f0e9e75982a682f193af985.tar.bz2 portage-389bb304abf5705b9f0e9e75982a682f193af985.zip |
async_aux_get: Use EbuildMetadataPhase deallocate_config future
For the portdbapi async_aux_get method, there is not a very
good place to store a config pool, so instead use asyncio.Lock
to manage access to the portdbapi doebuild_settings attribute
when using the main event loop in the main thread. For other
threads, clone a config instance since we do not have a
thread-safe config pool. This cloning is expensive, but since
portage internals do not trigger this case, it suffices for now
(an AssertionError ensures that internals do not trigger it).
For the main event loop running in the main thread, performance
with the asyncio.Lock should not be significantly different to
performance prior to commit c95fc64abf96, since check_locale
results are typically cached and before there was only a single
shared doebuild_settings instance with access serialized via
the EbuildMetadataPhase _start method.
Update async_aux_get callers to use asyncio.ensure_future on
the returned coroutine when needed, since it used to return
a future instead of a coroutine, and sometimes a future is
needed for add_done_callback usage.
In the portdbapi async_fetch_map method, fix a broken reference
to "future" which should have been "aux_get_future", an error
discovered while testing this patch.
Bug: https://bugs.gentoo.org/924319
Fixes: c95fc64abf96 ("EbuildPhase: async_check_locale")
Signed-off-by: Zac Medico <zmedico@gentoo.org>
-rw-r--r-- | lib/portage/_emirrordist/FetchIterator.py | 10 | ||||
-rw-r--r-- | lib/portage/dbapi/porttree.py | 129 | ||||
-rw-r--r-- | lib/portage/tests/update/test_move_ent.py | 3 |
3 files changed, 97 insertions, 45 deletions
diff --git a/lib/portage/_emirrordist/FetchIterator.py b/lib/portage/_emirrordist/FetchIterator.py index eaf3e5359..e4fdd092a 100644 --- a/lib/portage/_emirrordist/FetchIterator.py +++ b/lib/portage/_emirrordist/FetchIterator.py @@ -1,4 +1,4 @@ -# Copyright 2013-2018 Gentoo Foundation +# Copyright 2013-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import threading @@ -14,6 +14,7 @@ from portage.exception import PortageException, PortageKeyError from portage.package.ebuild.fetch import DistfileName from portage.util._async.AsyncTaskFuture import AsyncTaskFuture from portage.util._async.TaskScheduler import TaskScheduler +from portage.util.futures import asyncio from portage.util.futures.iter_completed import iter_gather from .FetchTask import FetchTask from _emerge.CompositeTask import CompositeTask @@ -276,8 +277,11 @@ def _async_fetch_tasks(config, hash_filter, repo_config, digests_future, cpv, lo result.set_result(fetch_tasks) def future_generator(): - yield config.portdb.async_aux_get( - cpv, ("RESTRICT",), myrepo=repo_config.name, loop=loop + yield asyncio.ensure_future( + config.portdb.async_aux_get( + cpv, ("RESTRICT",), myrepo=repo_config.name, loop=loop + ), + loop, ) yield config.portdb.async_fetch_map(cpv, mytree=repo_config.location, loop=loop) diff --git a/lib/portage/dbapi/porttree.py b/lib/portage/dbapi/porttree.py index 61d431f91..4eebe1183 100644 --- a/lib/portage/dbapi/porttree.py +++ b/lib/portage/dbapi/porttree.py @@ -1,4 +1,4 @@ -# Copyright 1998-2021 Gentoo Authors +# Copyright 1998-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 __all__ = ["close_portdbapi_caches", "FetchlistDict", "portagetree", "portdbapi"] @@ -41,7 +41,9 @@ from portage.util.futures import asyncio from portage.util.futures.iter_completed import iter_gather from _emerge.EbuildMetadataPhase import EbuildMetadataPhase +import contextlib import os as _os +import threading import traceback import warnings import errno @@ -239,6 +241,7 @@ class portdbapi(dbapi): # this purpose because doebuild makes many changes to the config # instance that is passed in. self.doebuild_settings = config(clone=self.settings) + self._doebuild_settings_lock = asyncio.Lock() self.depcachedir = os.path.realpath(self.settings.depcachedir) if os.environ.get("SANDBOX_ON") == "1": @@ -356,6 +359,17 @@ class portdbapi(dbapi): self._better_cache = None self._broken_ebuilds = set() + def __getstate__(self): + state = self.__dict__.copy() + # These attributes are not picklable, so they are automatically + # regenerated after unpickling. + state["_doebuild_settings_lock"] = None + return state + + def __setstate__(self, state): + self.__dict__.update(state) + self._doebuild_settings_lock = asyncio.Lock() + def _set_porttrees(self, porttrees): """ Consumers, such as emirrordist, may modify the porttrees attribute in @@ -669,7 +683,7 @@ class portdbapi(dbapi): self.async_aux_get(mycpv, mylist, mytree=mytree, myrepo=myrepo, loop=loop) ) - def async_aux_get(self, mycpv, mylist, mytree=None, myrepo=None, loop=None): + async def async_aux_get(self, mycpv, mylist, mytree=None, myrepo=None, loop=None): """ Asynchronous form form of aux_get. @@ -694,13 +708,11 @@ class portdbapi(dbapi): # Callers of this method certainly want the same event loop to # be used for all calls. loop = asyncio._wrap_loop(loop) - future = loop.create_future() cache_me = False if myrepo is not None: mytree = self.treemap.get(myrepo) if mytree is None: - future.set_exception(PortageKeyError(myrepo)) - return future + raise PortageKeyError(myrepo) if ( mytree is not None @@ -719,16 +731,14 @@ class portdbapi(dbapi): ): aux_cache = self._aux_cache.get(mycpv) if aux_cache is not None: - future.set_result([aux_cache.get(x, "") for x in mylist]) - return future + return [aux_cache.get(x, "") for x in mylist] cache_me = True try: cat, pkg = mycpv.split("/", 1) except ValueError: # Missing slash. Can't find ebuild so raise PortageKeyError. - future.set_exception(PortageKeyError(mycpv)) - return future + raise PortageKeyError(mycpv) myebuild, mylocation = self.findname2(mycpv, mytree) @@ -737,12 +747,12 @@ class portdbapi(dbapi): "!!! aux_get(): %s\n" % _("ebuild not found for '%s'") % mycpv, noiselevel=1, ) - future.set_exception(PortageKeyError(mycpv)) - return future + raise PortageKeyError(mycpv) mydata, ebuild_hash = self._pull_valid_cache(mycpv, myebuild, mylocation) if mydata is not None: + future = loop.create_future() self._aux_get_return( future, mycpv, @@ -754,37 +764,71 @@ class portdbapi(dbapi): cache_me, None, ) - return future + return future.result() if myebuild in self._broken_ebuilds: - future.set_exception(PortageKeyError(mycpv)) - return future - - proc = EbuildMetadataPhase( - cpv=mycpv, - ebuild_hash=ebuild_hash, - portdb=self, - repo_path=mylocation, - scheduler=loop, - settings=self.doebuild_settings, - ) + raise PortageKeyError(mycpv) - proc.addExitListener( - functools.partial( - self._aux_get_return, - future, - mycpv, - mylist, - myebuild, - ebuild_hash, - mydata, - mylocation, - cache_me, - ) - ) - future.add_done_callback(functools.partial(self._aux_get_cancel, proc)) - proc.start() - return future + proc = None + deallocate_config = None + async with contextlib.AsyncExitStack() as stack: + try: + if ( + threading.current_thread() is threading.main_thread() + and loop is asyncio._safe_loop() + ): + # In this case use self._doebuild_settings_lock to manage concurrency. + deallocate_config = loop.create_future() + await stack.enter_async_context(self._doebuild_settings_lock) + settings = self.doebuild_settings + else: + if portage._internal_caller: + raise AssertionError( + f"async_aux_get called from thread {threading.current_thread()} with loop {loop}" + ) + # Clone a config instance since we do not have a thread-safe config pool. + settings = portage.config(clone=self.settings) + + proc = EbuildMetadataPhase( + cpv=mycpv, + ebuild_hash=ebuild_hash, + portdb=self, + repo_path=mylocation, + scheduler=loop, + settings=settings, + deallocate_config=deallocate_config, + ) + + future = loop.create_future() + proc.addExitListener( + functools.partial( + self._aux_get_return, + future, + mycpv, + mylist, + myebuild, + ebuild_hash, + mydata, + mylocation, + cache_me, + ) + ) + future.add_done_callback(functools.partial(self._aux_get_cancel, proc)) + proc.start() + + finally: + # Wait for deallocate_config before releasing + # self._doebuild_settings_lock if needed. + if deallocate_config is not None: + if proc is None or not proc.isAlive(): + deallocate_config.done() or deallocate_config.cancel() + else: + await deallocate_config + + # After deallocate_config is done, release self._doebuild_settings_lock + # by leaving the stack context, and wait for proc to finish and + # trigger a call to self._aux_get_return. + return await future @staticmethod def _aux_get_cancel(proc, future): @@ -889,7 +933,7 @@ class portdbapi(dbapi): ) ) else: - result.set_exception(future.exception()) + result.set_exception(aux_get_future.exception()) return eapi, myuris = aux_get_future.result() @@ -913,8 +957,9 @@ class portdbapi(dbapi): except Exception as e: result.set_exception(e) - aux_get_future = self.async_aux_get( - mypkg, ["EAPI", "SRC_URI"], mytree=mytree, loop=loop + aux_get_future = asyncio.ensure_future( + self.async_aux_get(mypkg, ["EAPI", "SRC_URI"], mytree=mytree, loop=loop), + loop, ) result.add_done_callback( lambda result: aux_get_future.cancel() if result.cancelled() else None diff --git a/lib/portage/tests/update/test_move_ent.py b/lib/portage/tests/update/test_move_ent.py index fe968f12a..0b938dd28 100644 --- a/lib/portage/tests/update/test_move_ent.py +++ b/lib/portage/tests/update/test_move_ent.py @@ -231,6 +231,9 @@ class MoveEntTestCase(TestCase): finally: playground.cleanup() + # Ignore "The loop argument is deprecated" since this argument is conditionally + # added to asyncio.Lock as needed for compatibility with python 3.9. + @pytest.mark.filterwarnings("ignore:The loop argument is deprecated") @pytest.mark.filterwarnings("error") def testMoveEntWithCorruptIndex(self): """ |