diff options
author | 2022-01-23 18:23:14 -0500 | |
---|---|---|
committer | 2022-01-24 01:17:10 +0000 | |
commit | b094ba47368bb9b16fb17ee47a54644a09860823 (patch) | |
tree | 61f77b2130133aa9ad994e58be4eb6e1abbc13a4 | |
parent | Add -X shortopt for --exclude (diff) | |
download | portage-b094ba47368bb9b16fb17ee47a54644a09860823.tar.gz portage-b094ba47368bb9b16fb17ee47a54644a09860823.tar.bz2 portage-b094ba47368bb9b16fb17ee47a54644a09860823.zip |
*/*: GPKG (new binpkg format) Support (GLEP 78, provisionally)
Bug: https://bugs.gentoo.org/500630
Bug: https://bugs.gentoo.org/659864
Bug: https://bugs.gentoo.org/672672
Bug: https://bugs.gentoo.org/773259
Bug: https://bugs.gentoo.org/820578
Signed-off-by: Rin Cat (鈴猫) <dev@rincat.ch>
Closes: https://github.com/gentoo/portage/pull/562
Signed-off-by: Sam James <sam@gentoo.org>
88 files changed, 6272 insertions, 903 deletions
diff --git a/MANIFEST.in b/MANIFEST.in index 7d9ea761a..31ca9c166 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -25,3 +25,6 @@ include misc/* # extensions include src/* + +# GPG test keys +recursive-include lib/portage/tests/.gnupg * diff --git a/bin/gpkg-helper.py b/bin/gpkg-helper.py new file mode 100755 index 000000000..d79430212 --- /dev/null +++ b/bin/gpkg-helper.py @@ -0,0 +1,71 @@ +#!/usr/bin/python -b +# Copyright 2020 Gentoo Authors +# Distributed under the terms of the GNU General Public License v2 + +import argparse +import sys +import portage + +portage._internal_caller = True +from portage import os + + +def command_compose(args): + + usage = "usage: compose <package_cpv> <binpkg_path> <metadata_dir> <image_dir>\n" + + if len(args) != 4: + sys.stderr.write(usage) + sys.stderr.write("4 arguments are required, got %s\n" % len(args)) + return 1 + + cpv, binpkg_path, metadata_dir, image_dir = args + + if not os.path.isdir(metadata_dir): + sys.stderr.write(usage) + sys.stderr.write("Argument 3 is not a directory: '%s'\n" % metadata_dir) + return 1 + + if not os.path.isdir(image_dir): + sys.stderr.write(usage) + sys.stderr.write("Argument 4 is not a directory: '%s'\n" % image_dir) + return 1 + + gpkg_file = portage.gpkg.gpkg(portage.settings, cpv, binpkg_path) + metadata = gpkg_file._generate_metadata_from_dir(metadata_dir) + gpkg_file.compress(image_dir, metadata) + return os.EX_OK + + +def main(argv): + + if argv and isinstance(argv[0], bytes): + for i, x in enumerate(argv): + argv[i] = portage._unicode_decode(x, errors="strict") + + valid_commands = ("compress",) + description = "Perform metadata operations on a binary package." + usage = "usage: %s COMMAND [args]" % os.path.basename(argv[0]) + + parser = argparse.ArgumentParser(description=description, usage=usage) + options, args = parser.parse_known_args(argv[1:]) + + if not args: + parser.error("missing command argument") + + command = args[0] + + if command not in valid_commands: + parser.error("invalid command: '%s'" % command) + + if command == "compress": + rval = command_compose(args[1:]) + else: + raise AssertionError("invalid command: '%s'" % command) + + return rval + + +if __name__ == "__main__": + rval = main(sys.argv[:]) + sys.exit(rval) diff --git a/bin/misc-functions.sh b/bin/misc-functions.sh index e4defa550..ccb07075f 100755 --- a/bin/misc-functions.sh +++ b/bin/misc-functions.sh @@ -503,38 +503,53 @@ __dyn_package() { # in there in case any tools were built with -pg in CFLAGS. cd "${T}" || die - local tar_options="" - [[ $PORTAGE_VERBOSE = 1 ]] && tar_options+=" -v" - has xattr ${FEATURES} && [[ $(tar --help 2> /dev/null) == *--xattrs* ]] && tar_options+=" --xattrs" # Sandbox is disabled in case the user wants to use a symlink # for $PKGDIR and/or $PKGDIR/All. export SANDBOX_ON="0" - [ -z "${PORTAGE_BINPKG_TMPFILE}" ] && \ + [[ -z "${PORTAGE_BINPKG_TMPFILE}" ]] && \ die "PORTAGE_BINPKG_TMPFILE is unset" mkdir -p "${PORTAGE_BINPKG_TMPFILE%/*}" || die "mkdir failed" - [ -z "${PORTAGE_COMPRESSION_COMMAND}" ] && \ - die "PORTAGE_COMPRESSION_COMMAND is unset" - tar $tar_options -cf - $PORTAGE_BINPKG_TAR_OPTS -C "${D}" . | \ - $PORTAGE_COMPRESSION_COMMAND > "$PORTAGE_BINPKG_TMPFILE" - assert "failed to pack binary package: '$PORTAGE_BINPKG_TMPFILE'" - PYTHONPATH=${PORTAGE_PYTHONPATH:-${PORTAGE_PYM_PATH}} \ - "${PORTAGE_PYTHON:-/usr/bin/python}" "$PORTAGE_BIN_PATH"/xpak-helper.py recompose \ - "$PORTAGE_BINPKG_TMPFILE" "$PORTAGE_BUILDDIR/build-info" - if [ $? -ne 0 ]; then - rm -f "${PORTAGE_BINPKG_TMPFILE}" - die "Failed to append metadata to the tbz2 file" - fi - local md5_hash="" - if type md5sum &>/dev/null ; then - md5_hash=$(md5sum "${PORTAGE_BINPKG_TMPFILE}") - md5_hash=${md5_hash%% *} - elif type md5 &>/dev/null ; then - md5_hash=$(md5 "${PORTAGE_BINPKG_TMPFILE}") - md5_hash=${md5_hash##* } + + if [[ "${BINPKG_FORMAT}" == "xpak" ]]; then + local tar_options="" + [[ $PORTAGE_VERBOSE = 1 ]] && tar_options+=" -v" + has xattr ${FEATURES} && [[ $(tar --help 2> /dev/null) == *--xattrs* ]] && tar_options+=" --xattrs" + [[ -z "${PORTAGE_COMPRESSION_COMMAND}" ]] && \ + die "PORTAGE_COMPRESSION_COMMAND is unset" + tar $tar_options -cf - $PORTAGE_BINPKG_TAR_OPTS -C "${D}" . | \ + $PORTAGE_COMPRESSION_COMMAND > "$PORTAGE_BINPKG_TMPFILE" + assert "failed to pack binary package: '$PORTAGE_BINPKG_TMPFILE'" + PYTHONPATH=${PORTAGE_PYTHONPATH:-${PORTAGE_PYM_PATH}} \ + "${PORTAGE_PYTHON:-/usr/bin/python}" "$PORTAGE_BIN_PATH"/xpak-helper.py recompose \ + "$PORTAGE_BINPKG_TMPFILE" "$PORTAGE_BUILDDIR/build-info" + if [[ $? -ne 0 ]]; then + rm -f "${PORTAGE_BINPKG_TMPFILE}" + die "Failed to append metadata to the tbz2 file" + fi + local md5_hash="" + if type md5sum &>/dev/null ; then + md5_hash=$(md5sum "${PORTAGE_BINPKG_TMPFILE}") + md5_hash=${md5_hash%% *} + elif type md5 &>/dev/null ; then + md5_hash=$(md5 "${PORTAGE_BINPKG_TMPFILE}") + md5_hash=${md5_hash##* } + fi + [[ -n "${md5_hash}" ]] && \ + echo ${md5_hash} > "${PORTAGE_BUILDDIR}"/build-info/BINPKGMD5 + __vecho ">>> Done." + + elif [[ "${BINPKG_FORMAT}" == "gpkg" ]]; then + PYTHONPATH=${PORTAGE_PYTHONPATH:-${PORTAGE_PYM_PATH}} \ + "${PORTAGE_PYTHON:-/usr/bin/python}" "$PORTAGE_BIN_PATH"/gpkg-helper.py compress \ + "${CATEGORY}/${PF}" "$PORTAGE_BINPKG_TMPFILE" "$PORTAGE_BUILDDIR/build-info" "${D}" + if [[ $? -ne 0 ]]; then + rm -f "${PORTAGE_BINPKG_TMPFILE}" + die "Failed to create binpkg file" + fi + __vecho ">>> Done." + else + die "Unknown BINPKG_FORMAT ${BINPKG_FORMAT}" fi - [ -n "${md5_hash}" ] && \ - echo ${md5_hash} > "${PORTAGE_BUILDDIR}"/build-info/BINPKGMD5 - __vecho ">>> Done." cd "${PORTAGE_BUILDDIR}" >> "$PORTAGE_BUILDDIR/.packaged" || \ diff --git a/bin/quickpkg b/bin/quickpkg index 1b7ad666c..7733bc833 100755 --- a/bin/quickpkg +++ b/bin/quickpkg @@ -15,17 +15,20 @@ if osp.isfile(osp.join(osp.dirname(osp.dirname(osp.realpath(__file__))), ".porta import portage portage._internal_caller = True from portage import os -from portage import xpak +from portage import xpak, gpkg +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.dbapi.dep_expand import dep_expand from portage.dep import Atom, use_reduce from portage.exception import (AmbiguousPackageName, InvalidAtom, InvalidData, - InvalidDependString, PackageSetNotFound, PermissionDenied) + InvalidBinaryPackageFormat, InvalidDependString, PackageSetNotFound, + PermissionDenied) from portage.util import ensure_dirs, shlex_split, varexpand, _xattr xattr = _xattr.xattr from portage._sets import load_default_config, SETPREFIX from portage.process import find_binary from portage.util.compression_probe import _compressors from portage.util._eventloop.global_event_loop import global_event_loop +from portage.gpg import GPG def quickpkg_atom(options, infos, arg, eout): @@ -109,52 +112,66 @@ def quickpkg_atom(options, infos, arg, eout): update_metadata[k] = v if update_metadata: vardb.aux_update(cpv, update_metadata) - xpdata = xpak.xpak(dblnk.dbdir) - binpkg_tmpfile = os.path.join(bintree.pkgdir, - cpv + ".tbz2." + str(portage.getpid())) - ensure_dirs(os.path.dirname(binpkg_tmpfile)) - binpkg_compression = settings.get("BINPKG_COMPRESS", "bzip2") - try: - compression = _compressors[binpkg_compression] - except KeyError as e: - if binpkg_compression: + + binpkg_format = settings.get("BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0]) + if binpkg_format == "xpak": + xpdata = xpak.xpak(dblnk.dbdir) + binpkg_tmpfile = os.path.join(bintree.pkgdir, + cpv + ".tbz2." + str(portage.getpid())) + ensure_dirs(os.path.dirname(binpkg_tmpfile)) + binpkg_compression = settings.get("BINPKG_COMPRESS", "bzip2") + try: + compression = _compressors[binpkg_compression] + except KeyError as e: + if binpkg_compression: + eout.eerror("Invalid or unsupported compression method: %s" % e.args[0]) + return 1 + # Empty BINPKG_COMPRESS disables compression. + binpkg_compression = 'none' + compression = { + 'compress': 'cat', + 'package': 'sys-apps/coreutils', + } + try: + compression_binary = shlex_split(varexpand(compression["compress"], mydict=settings))[0] + except IndexError as e: eout.eerror("Invalid or unsupported compression method: %s" % e.args[0]) return 1 - # Empty BINPKG_COMPRESS disables compression. - binpkg_compression = 'none' - compression = { - 'compress': 'cat', - 'package': 'sys-apps/coreutils', - } - try: - compression_binary = shlex_split(varexpand(compression["compress"], mydict=settings))[0] - except IndexError as e: - eout.eerror("Invalid or unsupported compression method: %s" % e.args[0]) - return 1 - if find_binary(compression_binary) is None: - missing_package = compression["package"] - eout.eerror("File compression unsupported %s. Missing package: %s" % (binpkg_compression, missing_package)) - return 1 - cmd = shlex_split(varexpand(compression["compress"], mydict=settings)) - # Filter empty elements that make Popen fail - cmd = [x for x in cmd if x != ""] - with open(binpkg_tmpfile, "wb") as fobj: - proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=fobj) - excluded_config_files = dblnk.quickpkg(proc.stdin, + if find_binary(compression_binary) is None: + missing_package = compression["package"] + eout.eerror("File compression unsupported %s. Missing package: %s" % (binpkg_compression, missing_package)) + return 1 + cmd = [varexpand(x, mydict=settings) for x in shlex_split(compression["compress"])] + # Filter empty elements that make Popen fail + cmd = [x for x in cmd if x != ""] + with open(binpkg_tmpfile, "wb") as fobj: + proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=fobj) + excluded_config_files = dblnk.quickpkg(proc.stdin, + include_config=include_config, + include_unmodified_config=include_unmodified_config) + proc.stdin.close() + if proc.wait() != os.EX_OK: + eout.eend(1) + eout.eerror("Compressor failed for package %s" % cpv) + retval |= 1 + try: + os.unlink(binpkg_tmpfile) + except OSError as e: + if e.errno not in (errno.ENOENT, errno.ESTALE): + raise + continue + xpak.tbz2(binpkg_tmpfile).recompose_mem(xpdata) + elif binpkg_format == "gpkg": + metadata = gpkg.gpkg(settings)._generate_metadata_from_dir(dblnk.dbdir) + binpkg_tmpfile = os.path.join(bintree.pkgdir, + cpv + ".gpkg.tar." + str(os.getpid())) + ensure_dirs(os.path.dirname(binpkg_tmpfile)) + excluded_config_files = dblnk.quickpkg(binpkg_tmpfile, + metadata, include_config=include_config, include_unmodified_config=include_unmodified_config) - proc.stdin.close() - if proc.wait() != os.EX_OK: - eout.eend(1) - eout.eerror("Compressor failed for package %s" % cpv) - retval |= 1 - try: - os.unlink(binpkg_tmpfile) - except OSError as e: - if e.errno not in (errno.ENOENT, errno.ESTALE): - raise - continue - xpak.tbz2(binpkg_tmpfile).recompose_mem(xpdata) + else: + raise InvalidBinaryPackageFormat(binpkg_format) finally: if have_lock: dblnk.unlockdb() @@ -268,6 +285,9 @@ def quickpkg_main(options, args, eout): portage.settings.features.remove('xattr') portage.settings.lock() + gpg = GPG(portage.settings) + gpg.unlock() + infos = {} infos["successes"] = [] infos["missing"] = [] diff --git a/cnf/make.conf.example b/cnf/make.conf.example index a309a5c43..5b2229465 100644 --- a/cnf/make.conf.example +++ b/cnf/make.conf.example @@ -183,6 +183,42 @@ # This ftp connection is active ftp. #PORTAGE_BINHOST="ftp://login:pass@grp.mirror.site:21*/pub/grp/i686/athlon-xp/" +# Binary packages GPG commands +# ============================ +# +# Only works with GPKG format. +# "binpkg-signing" needed to be set in FEATURES if need signing packages. +# "binpkg-request-signature" needed to be set in FEATURES if you want all +# binpkgs must have signature. +# You need uncomment related commands and set "USER" and "SIGN_KEY" to yours. +# +# Binary package GPG singing base command +# Basic command for all signature operations. +# You need change this if you want to use other configurations, +# Note that some configurations are configured separately below, +# please do not add duplicate configurations +#BINPKG_GPG_SIGNING_BASE_COMMAND="/usr/bin/flock /run/lock/portage-binpkg-gpg.lock /usr/bin/gpg --sign --armor [PORTAGE_CONFIG]" + +# Binary package GPG signature digests algorithm. +#BINPKG_GPG_SIGNING_DIGEST="SHA512" + +# gnupg home directory used for signing. +#BINPKG_GPG_SIGNING_GPG_HOME="/root/.gnupg" + +# GPG key ID used for signing. +#BINPKG_GPG_SIGNING_KEY="0x1234567890ABCD!" + +# Binary package GPG verify base command. +# Basic command for all verify operations. +#BINPKG_GPG_VERIFY_BASE_COMMAND="/usr/bin/gpg --verify --batch --no-tty --no-auto-check-trustdb --status-fd 2 [PORTAGE_CONFIG] [SIGNATURE]" + +# GPG home directory where store all trust binary package public keys. +#BINPKG_GPG_VERIFY_GPG_HOME="/etc/portage/gnupg" + +# The user and group will be used when drop root privileges during GPG verify +#GPG_VERIFY_USER_DROP="nobody" +#GPG_VERIFY_GROUP_DROP="nogroup" + # Synchronizing Portage # ===================== # diff --git a/cnf/make.globals b/cnf/make.globals index cf4ad3533..69b365f71 100644 --- a/cnf/make.globals +++ b/cnf/make.globals @@ -38,6 +38,27 @@ PORTAGE_TMPDIR="/var/tmp" # existing installs where bzip2 is used for backward compatibility. BINPKG_COMPRESS="zstd" +# The format used for binary packages. The default is use old "xpak" format. +# Set to "gpkg" to use new gentoo binary package format. +BINPKG_FORMAT="xpak" + +# The binary package default GPG signing command. +# flock is used to avoid a racing condition of gnupg +BINPKG_GPG_SIGNING_BASE_COMMAND="/usr/bin/flock /run/lock/portage-binpkg-gpg.lock /usr/bin/gpg --sign --armor [PORTAGE_CONFIG]" + +# The default binary package GPG digests algorithm. +BINPKG_GPG_SIGNING_DIGEST="SHA512" + +# The binary package default GPG verify command. +BINPKG_GPG_VERIFY_BASE_COMMAND="/usr/bin/gpg --verify --batch --no-tty --no-auto-check-trustdb --status-fd 2 [PORTAGE_CONFIG] [SIGNATURE]" + +# The binary package default GPG home directory for verify +BINPKG_GPG_VERIFY_GPG_HOME="/etc/portage/gnupg" + +# The user and group will be used when drop root privileges during GPG verify +GPG_VERIFY_USER_DROP="nobody" +GPG_VERIFY_GROUP_DROP="nogroup" + # Fetching command (3 tries, passive ftp for firewall compatibility) FETCHCOMMAND="wget -t 3 -T 60 --passive-ftp -O \"\${DISTDIR}/\${FILE}\" \"\${URI}\"" RESUMECOMMAND="wget -c -t 3 -T 60 --passive-ftp -O \"\${DISTDIR}/\${FILE}\" \"\${URI}\"" diff --git a/lib/_emerge/Binpkg.py b/lib/_emerge/Binpkg.py index 001283611..15eb56092 100644 --- a/lib/_emerge/Binpkg.py +++ b/lib/_emerge/Binpkg.py @@ -12,6 +12,7 @@ from _emerge.EbuildMerge import EbuildMerge from _emerge.EbuildBuildDir import EbuildBuildDir from _emerge.SpawnProcess import SpawnProcess from portage.eapi import eapi_exports_replace_vars +from portage.exception import PortageException from portage.output import colorize from portage.util import ensure_dirs from portage.util._async.AsyncTaskFuture import AsyncTaskFuture @@ -417,9 +418,17 @@ class Binpkg(CompositeTask): def _unpack_contents_exit(self, unpack_contents): if self._default_exit(unpack_contents) != os.EX_OK: - unpack_contents.future.result() + try: + unpack_contents.future.result() + err = "" + except PortageException as e: + err = e + self._writemsg_level( - "!!! Error Extracting '%s'\n" % self._pkg_path, + colorize( + "BAD", + f"!!! Error Extracting '{self._pkg_path}', {err}\n", + ), noiselevel=-1, level=logging.ERROR, ) diff --git a/lib/_emerge/BinpkgExtractorAsync.py b/lib/_emerge/BinpkgExtractorAsync.py index a0380a0a4..919837fc1 100644 --- a/lib/_emerge/BinpkgExtractorAsync.py +++ b/lib/_emerge/BinpkgExtractorAsync.py @@ -15,6 +15,8 @@ from portage.util import ( shlex_split, varexpand, ) +from portage.exception import InvalidBinaryPackageFormat +from portage.binpkg import get_binpkg_format import signal import subprocess import tarfile @@ -27,6 +29,13 @@ class BinpkgExtractorAsync(SpawnProcess): _shell_binary = portage.const.BASH_BINARY def _start(self): + binpkg_format = get_binpkg_format(self.pkg_path) + if binpkg_format == "xpak": + self._xpak_start() + else: + raise InvalidBinaryPackageFormat(self.pkg_path) + + def _xpak_start(self): tar_options = "" if "xattr" in self.features: process = subprocess.Popen( diff --git a/lib/_emerge/BinpkgFetcher.py b/lib/_emerge/BinpkgFetcher.py index de3dd42ed..d5275ea11 100644 --- a/lib/_emerge/BinpkgFetcher.py +++ b/lib/_emerge/BinpkgFetcher.py @@ -11,6 +11,8 @@ import stat import sys import portage from portage import os +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS +from portage.exception import FileNotFound, InvalidBinaryPackageFormat from portage.util._async.AsyncTaskFuture import AsyncTaskFuture from portage.util._pty import _create_pty_or_pipe @@ -21,8 +23,23 @@ class BinpkgFetcher(CompositeTask): def __init__(self, **kwargs): CompositeTask.__init__(self, **kwargs) + pkg = self.pkg - self.pkg_path = pkg.root_config.trees["bintree"].getname(pkg.cpv) + ".partial" + bintree = pkg.root_config.trees["bintree"] + binpkg_path = None + + if bintree._remote_has_index: + instance_key = bintree.dbapi._instance_key(pkg.cpv) + binpkg_path = bintree._remotepkgs[instance_key].get("PATH") + if binpkg_path: + self.pkg_path = binpkg_path + ".partial" + else: + self.pkg_path = ( + pkg.root_config.trees["bintree"].getname(pkg.cpv, allocate_new=True) + + ".partial" + ) + else: + raise FileNotFound("Binary packages index not found") def _start(self): fetcher = _BinpkgFetcherProcess( @@ -106,15 +123,23 @@ class _BinpkgFetcherProcess(SpawnProcess): resumecommand = None if bintree._remote_has_index: remote_metadata = bintree._remotepkgs[bintree.dbapi._instance_key(pkg.cpv)] + binpkg_format = remote_metadata.get( + "BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0] + ) + if binpkg_format not in SUPPORTED_GENTOO_BINPKG_FORMATS: + raise InvalidBinaryPackageFormat(binpkg_format) rel_uri = remote_metadata.get("PATH") if not rel_uri: - rel_uri = pkg.cpv + ".tbz2" + if binpkg_format == "xpak": + rel_uri = pkg.cpv + ".tbz2" + elif binpkg_format == "gpkg": + rel_uri = pkg.cpv + ".gpkg.tar" remote_base_uri = remote_metadata["BASE_URI"] uri = remote_base_uri.rstrip("/") + "/" + rel_uri.lstrip("/") fetchcommand = remote_metadata.get("FETCHCOMMAND") resumecommand = remote_metadata.get("RESUMECOMMAND") else: - uri = settings["PORTAGE_BINHOST"].rstrip("/") + "/" + pkg.pf + ".tbz2" + raise FileNotFound("Binary packages index not found") if pretend: portage.writemsg_stdout("\n%s\n" % uri, noiselevel=-1) diff --git a/lib/_emerge/EbuildBinpkg.py b/lib/_emerge/EbuildBinpkg.py index 5942d245a..ccdd30f7b 100644 --- a/lib/_emerge/EbuildBinpkg.py +++ b/lib/_emerge/EbuildBinpkg.py @@ -6,6 +6,8 @@ from _emerge.EbuildPhase import EbuildPhase import portage from portage import os +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS +from portage.exception import InvalidBinaryPackageFormat class EbuildBinpkg(CompositeTask): @@ -19,9 +21,19 @@ class EbuildBinpkg(CompositeTask): pkg = self.pkg root_config = pkg.root_config bintree = root_config.trees["bintree"] - binpkg_tmpfile = os.path.join( - bintree.pkgdir, pkg.cpv + ".tbz2." + str(portage.getpid()) + binpkg_format = self.settings.get( + "BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0] ) + if binpkg_format == "xpak": + binpkg_tmpfile = os.path.join( + bintree.pkgdir, pkg.cpv + ".tbz2." + str(portage.getpid()) + ) + elif binpkg_format == "gpkg": + binpkg_tmpfile = os.path.join( + bintree.pkgdir, pkg.cpv + ".gpkg.tar." + str(portage.getpid()) + ) + else: + raise InvalidBinaryPackageFormat(binpkg_format) bintree._ensure_dir(os.path.dirname(binpkg_tmpfile)) self._binpkg_tmpfile = binpkg_tmpfile diff --git a/lib/_emerge/EbuildPhase.py b/lib/_emerge/EbuildPhase.py index 12326fffd..9a04f9c1f 100644 --- a/lib/_emerge/EbuildPhase.py +++ b/lib/_emerge/EbuildPhase.py @@ -29,6 +29,8 @@ from portage.util._async.AsyncTaskFuture import AsyncTaskFuture from portage.util._async.BuildLogger import BuildLogger from portage.util.futures import asyncio from portage.util.futures.executor.fork import ForkExecutor +from portage.exception import InvalidBinaryPackageFormat +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS try: from portage.xml.metadata import MetaDataXML @@ -157,14 +159,31 @@ class EbuildPhase(CompositeTask): if self.phase == "package": if "PORTAGE_BINPKG_TMPFILE" not in self.settings: - self.settings["PORTAGE_BINPKG_TMPFILE"] = ( - os.path.join( - self.settings["PKGDIR"], - self.settings["CATEGORY"], - self.settings["PF"], - ) - + ".tbz2" + binpkg_format = self.settings.get( + "BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0] ) + if binpkg_format == "xpak": + self.settings["BINPKG_FORMAT"] = "xpak" + self.settings["PORTAGE_BINPKG_TMPFILE"] = ( + os.path.join( + self.settings["PKGDIR"], + self.settings["CATEGORY"], + self.settings["PF"], + ) + + ".tbz2" + ) + elif binpkg_format == "gpkg": + self.settings["BINPKG_FORMAT"] = "gpkg" + self.settings["PORTAGE_BINPKG_TMPFILE"] = ( + os.path.join( + self.settings["PKGDIR"], + self.settings["CATEGORY"], + self.settings["PF"], + ) + + ".gpkg.tar" + ) + else: + raise InvalidBinaryPackageFormat(binpkg_format) def _async_start_exit(self, task): task.future.cancelled() or task.future.result() diff --git a/lib/_emerge/Package.py b/lib/_emerge/Package.py index 90dfccdef..cc2cb07e1 100644 --- a/lib/_emerge/Package.py +++ b/lib/_emerge/Package.py @@ -63,6 +63,7 @@ class Package(Task): metadata_keys = [ "BDEPEND", + "BINPKG_FORMAT", "BUILD_ID", "BUILD_TIME", "CHOST", diff --git a/lib/_emerge/actions.py b/lib/_emerge/actions.py index 515b22b66..0255e3e97 100644 --- a/lib/_emerge/actions.py +++ b/lib/_emerge/actions.py @@ -41,7 +41,7 @@ from portage.dbapi._expand_new_virt import expand_new_virt from portage.dbapi.IndexedPortdb import IndexedPortdb from portage.dbapi.IndexedVardb import IndexedVardb from portage.dep import Atom, _repo_separator, _slot_separator -from portage.exception import InvalidAtom, InvalidData, ParseError +from portage.exception import InvalidAtom, InvalidData, ParseError, GPGException from portage.output import ( colorize, create_color_func, @@ -77,6 +77,8 @@ from portage.sync.old_tree_timestamp import old_tree_timestamp_warn from portage.localization import _ from portage.metadata import action_metadata from portage.emaint.main import print_results +from portage.gpg import GPG +from portage.binpkg import get_binpkg_format from _emerge.clear_caches import clear_caches from _emerge.create_depgraph_params import create_depgraph_params @@ -604,6 +606,28 @@ def action_build( ) return 1 + # unlock GPG if needed + if ( + need_write_bindb + and (eroot in ebuild_eroots) + and ( + "binpkg-signing" + in trees[eroot]["root_config"].settings.features + ) + ): + portage.writemsg_stdout(">>> Unlocking GPG... ") + sys.stdout.flush() + gpg = GPG(trees[eroot]["root_config"].settings) + try: + gpg.unlock() + except GPGException as e: + writemsg_level( + colorize("BAD", "!!! %s\n" % e), + level=logging.ERROR, + noiselevel=-1, + ) + return 1 + if "--resume" in myopts: favorites = mtimedb["resume"]["favorites"] @@ -2271,11 +2295,21 @@ def action_info(settings, trees, myopts, myfiles): elif pkg_type == "ebuild": ebuildpath = portdb.findname(pkg.cpv, myrepo=pkg.repo) elif pkg_type == "binary": - tbz2_file = bindb.bintree.getname(pkg.cpv) + binpkg_file = bindb.bintree.getname(pkg.cpv) ebuild_file_name = pkg.cpv.split("/")[1] + ".ebuild" - ebuild_file_contents = portage.xpak.tbz2(tbz2_file).getfile( - ebuild_file_name - ) + binpkg_format = pkg.cpv._metadata.get("BINPKG_FORMAT", None) + if not binpkg_format: + binpkg_format = get_binpkg_format(binpkg_file) + if binpkg_format == "xpak": + ebuild_file_contents = portage.xpak.tbz2(binpkg_file).getfile( + ebuild_file_name + ) + elif binpkg_format == "gpkg": + ebuild_file_contents = portage.gpkg.gpkg( + settings, pkg.cpv, binpkg_file + ).get_metadata(ebuild_file_name) + else: + continue tmpdir = tempfile.mkdtemp() ebuildpath = os.path.join(tmpdir, ebuild_file_name) file = open(ebuildpath, "w") diff --git a/lib/_emerge/depgraph.py b/lib/_emerge/depgraph.py index f6549eba6..14a71a610 100644 --- a/lib/_emerge/depgraph.py +++ b/lib/_emerge/depgraph.py @@ -15,7 +15,12 @@ from itertools import chain import portage from portage import os from portage import _unicode_decode, _unicode_encode, _encodings -from portage.const import PORTAGE_PACKAGE_ATOM, USER_CONFIG_PATH, VCS_DIRS +from portage.const import ( + PORTAGE_PACKAGE_ATOM, + USER_CONFIG_PATH, + VCS_DIRS, + SUPPORTED_GPKG_EXTENSIONS, +) from portage.dbapi import dbapi from portage.dbapi.dep_expand import dep_expand from portage.dbapi.DummyTree import DummyTree @@ -55,6 +60,7 @@ from portage.util.digraph import digraph from portage.util.futures import asyncio from portage.util._async.TaskScheduler import TaskScheduler from portage.versions import _pkg_str, catpkgsplit +from portage.binpkg import get_binpkg_format from _emerge.AtomArg import AtomArg from _emerge.Blocker import Blocker @@ -4558,8 +4564,7 @@ class depgraph: onlydeps = "--onlydeps" in self._frozen_config.myopts lookup_owners = [] for x in myfiles: - ext = os.path.splitext(x)[1] - if ext == ".tbz2": + if x.endswith(".tbz2") or x.endswith(SUPPORTED_GPKG_EXTENSIONS): if not os.path.exists(x): if os.path.exists(os.path.join(pkgsettings["PKGDIR"], "All", x)): x = os.path.join(pkgsettings["PKGDIR"], "All", x) @@ -4571,13 +4576,22 @@ class depgraph: noiselevel=-1, ) writemsg( - "!!! Please ensure the tbz2 exists as specified.\n\n", + "!!! Please ensure the binpkg exists as specified.\n\n", noiselevel=-1, ) return 0, myfavorites - mytbz2 = portage.xpak.tbz2(x) - mykey = None - cat = mytbz2.getfile("CATEGORY") + binpkg_format = get_binpkg_format(x) + if binpkg_format == "xpak": + mytbz2 = portage.xpak.tbz2(x) + mykey = None + cat = mytbz2.getfile("CATEGORY") + elif binpkg_format == "gpkg": + mygpkg = portage.gpkg.gpkg(self.frozen_config, None, x) + mykey = None + cat = mygpkg.get_metadata("CATEGORY") + else: + raise InvalidBinaryPackageFormat(x) + if cat is not None: cat = _unicode_decode( cat.strip(), encoding=_encodings["repo.content"] @@ -4619,7 +4633,7 @@ class depgraph: return 0, myfavorites args.append(PackageArg(arg=x, package=pkg, root_config=root_config)) - elif ext == ".ebuild": + elif x.endswith(".ebuild"): ebuild_path = portage.util.normalize_path(os.path.abspath(x)) pkgdir = os.path.dirname(ebuild_path) tree_root = os.path.dirname(os.path.dirname(pkgdir)) diff --git a/lib/portage/__init__.py b/lib/portage/__init__.py index 13af8da09..3042de1aa 100644 --- a/lib/portage/__init__.py +++ b/lib/portage/__init__.py @@ -124,6 +124,7 @@ try: + "cpv_getkey@getCPFromCPV,endversion_keys," + "suffix_value@endversion,pkgcmp,pkgsplit,vercmp,ververify", "portage.xpak", + "portage.gpkg", "subprocess", "time", ) diff --git a/lib/portage/binpkg.py b/lib/portage/binpkg.py new file mode 100644 index 000000000..ed2fda827 --- /dev/null +++ b/lib/portage/binpkg.py @@ -0,0 +1,56 @@ +# Copyright 2001-2020 Gentoo Authors +# Distributed under the terms of the GNU General Public License v2 + +import tarfile +from portage.const import SUPPORTED_XPAK_EXTENSIONS, SUPPORTED_GPKG_EXTENSIONS +from portage.output import colorize +from portage.util import writemsg + + +def get_binpkg_format(binpkg_path): + if binpkg_path.endswith(SUPPORTED_XPAK_EXTENSIONS): + file_ext_format = "xpak" + elif binpkg_path.endswith(SUPPORTED_GPKG_EXTENSIONS): + file_ext_format = "gpkg" + else: + file_ext_format = None + + try: + with open(binpkg_path, "rb") as binpkg_file: + header = binpkg_file.read(6) + if header == b"gpkg-1": + file_format = "gpkg" + else: + binpkg_file.seek(-16, 2) + tail = binpkg_file.read(16) + if (tail[0:8] == b"XPAKSTOP") and (tail[12:16] == b"STOP"): + file_format = "xpak" + else: + file_format = None + + # check if wrong order gpkg + if file_format is None: + try: + with tarfile.open(binpkg_path) as gpkg_tar: + if "gpkg-1" in gpkg_tar.getnames(): + file_format = "gpkg" + except tarfile.TarError: + pass + + except OSError: + file_format = None + + if file_format is None: + return None + + if (file_ext_format is not None) and (file_ext_format != file_format): + writemsg( + colorize( + "WARN", + "File {} binpkg format mismatch, actual format: {}".format( + binpkg_path, file_format + ), + ) + ) + + return file_format diff --git a/lib/portage/const.py b/lib/portage/const.py index 1edc5fcf1..9e6f9311d 100644 --- a/lib/portage/const.py +++ b/lib/portage/const.py @@ -3,6 +3,7 @@ # Distributed under the terms of the GNU General Public License v2 import os +import sys # =========================================================================== # START OF CONSTANTS -- START OF CONSTANTS -- START OF CONSTANTS -- START OF @@ -128,8 +129,11 @@ SUPPORTED_FEATURES = frozenset( "assume-digests", "binpkg-docompress", "binpkg-dostrip", + "binpkg-ignore-signature", "binpkg-logs", "binpkg-multi-instance", + "binpkg-request-signature", + "binpkg-signing", "buildpkg", "buildpkg-live", "buildsyspkg", @@ -155,6 +159,7 @@ SUPPORTED_FEATURES = frozenset( "force-mirror", "force-prefix", "getbinpkg", + "gpg-keepalive", "icecream", "installsources", "ipc-sandbox", @@ -256,7 +261,14 @@ LIVE_ECLASSES = frozenset( ) SUPPORTED_BINPKG_FORMATS = ("tar", "rpm") + +if sys.version_info.major < 3: + SUPPORTED_GENTOO_BINPKG_FORMATS = ("xpak",) +else: + SUPPORTED_GENTOO_BINPKG_FORMATS = ("xpak", "gpkg") + SUPPORTED_XPAK_EXTENSIONS = (".tbz2", ".xpak") +SUPPORTED_GPKG_EXTENSIONS = (".gpkg.tar",) # Time formats used in various places like metadata.chk. TIMESTAMP_FORMAT = "%a, %d %b %Y %H:%M:%S +0000" # to be used with time.gmtime() diff --git a/lib/portage/dbapi/bintree.py b/lib/portage/dbapi/bintree.py index 9dbf9ee8b..8bfe5e97d 100644 --- a/lib/portage/dbapi/bintree.py +++ b/lib/portage/dbapi/bintree.py @@ -26,20 +26,29 @@ portage.proxy.lazyimport.lazyimport( from portage.binrepo.config import BinRepoConfigLoader from portage.cache.mappings import slot_dict_class -from portage.const import BINREPOS_CONF_FILE, CACHE_PATH, SUPPORTED_XPAK_EXTENSIONS +from portage.const import ( + BINREPOS_CONF_FILE, + CACHE_PATH, + SUPPORTED_XPAK_EXTENSIONS, + SUPPORTED_GPKG_EXTENSIONS, + SUPPORTED_GENTOO_BINPKG_FORMATS, +) from portage.dbapi.virtual import fakedbapi from portage.dep import Atom, use_reduce, paren_enclose from portage.exception import ( AlarmSignal, InvalidPackageName, + InvalidBinaryPackageFormat, ParseError, PortageException, + PortagePackageException, ) from portage.localization import _ from portage.package.ebuild.profile_iuse import iter_iuse_vars from portage.util.file_copy import copyfile from portage.util.futures import asyncio from portage.util.futures.executor.fork import ForkExecutor +from portage.binpkg import get_binpkg_format from portage import _movefile from portage import os from portage import _encodings @@ -49,6 +58,7 @@ from portage import _unicode_encode import codecs import errno import io +import re import stat import subprocess import tempfile @@ -72,6 +82,7 @@ class bindbapi(fakedbapi): list(fakedbapi._known_keys) + ["CHOST", "repository", "USE"] ) _pkg_str_aux_keys = fakedbapi._pkg_str_aux_keys + ( + "BINPKG_FORMAT", "BUILD_ID", "BUILD_TIME", "_mtime_", @@ -94,6 +105,7 @@ class bindbapi(fakedbapi): self._aux_cache_keys = set( [ "BDEPEND", + "BINPKG_FORMAT", "BUILD_ID", "BUILD_TIME", "CHOST", @@ -170,28 +182,44 @@ class bindbapi(fakedbapi): return add_pkg._db.aux_get(add_pkg, wants) if not self.bintree._remotepkgs or not self.bintree.isremote(mycpv): try: - tbz2_path = self.bintree._pkg_paths[instance_key] + binpkg_path = self.bintree._pkg_paths[instance_key] except KeyError: raise KeyError(mycpv) - tbz2_path = os.path.join(self.bintree.pkgdir, tbz2_path) + binpkg_path = os.path.join(self.bintree.pkgdir, binpkg_path) try: - st = os.lstat(tbz2_path) + st = os.lstat(binpkg_path) except OSError: raise KeyError(mycpv) - metadata_bytes = portage.xpak.tbz2(tbz2_path).get_data() + binpkg_format = self.cpvdict[instance_key]["BINPKG_FORMAT"] + if binpkg_format == "xpak": + metadata_bytes = portage.xpak.tbz2(binpkg_path).get_data() + decode_metadata_name = False + elif binpkg_format == "gpkg": + metadata_bytes = portage.gpkg.gpkg( + self.settings, mycpv, binpkg_path + ).get_metadata() + decode_metadata_name = True + else: + raise InvalidBinaryPackageFormat( + "Unknown binary package format %s" % binpkg_path + ) def getitem(k): if k == "_mtime_": return str(st[stat.ST_MTIME]) if k == "SIZE": return str(st.st_size) - v = metadata_bytes.get( - _unicode_encode( - k, - encoding=_encodings["repo.content"], - errors="backslashreplace", - ) - ) + else: + if decode_metadata_name: + v = metadata_bytes.get(k) + else: + v = metadata_bytes.get( + _unicode_encode( + k, + encoding=_encodings["repo.content"], + errors="backslashreplace", + ) + ) if v is not None: v = _unicode_decode( v, encoding=_encodings["repo.content"], errors="replace" @@ -202,6 +230,7 @@ class bindbapi(fakedbapi): getitem = self.cpvdict[instance_key].get mydata = {} mykeys = wants + for x in mykeys: myval = getitem(x) # myval is None if the key doesn't exist @@ -230,16 +259,29 @@ class bindbapi(fakedbapi): cpv = self._instance_key(cpv, support_string=True)[0] build_id = cpv.build_id - tbz2path = self.bintree.getname(cpv) - if not os.path.exists(tbz2path): + binpkg_path = self.bintree.getname(cpv) + if not os.path.exists(binpkg_path): raise KeyError(cpv) - mytbz2 = portage.xpak.tbz2(tbz2path) - mydata = mytbz2.get_data() - for k, v in values.items(): - k = _unicode_encode( - k, encoding=_encodings["repo.content"], errors="backslashreplace" + binpkg_format = cpv.binpkg_format + if binpkg_format == "xpak": + mytbz2 = portage.xpak.tbz2(binpkg_path) + mydata = mytbz2.get_data() + encoding_key = True + elif binpkg_format == "gpkg": + mybinpkg = portage.gpkg.gpkg(self.settings, cpv, binpkg_path) + mydata = mybinpkg.get_metadata() + encoding_key = False + else: + raise InvalidBinaryPackageFormat( + "Unknown binary package format %s" % binpkg_path ) + + for k, v in values.items(): + if encoding_key: + k = _unicode_encode( + k, encoding=_encodings["repo.content"], errors="backslashreplace" + ) v = _unicode_encode( v, encoding=_encodings["repo.content"], errors="backslashreplace" ) @@ -248,7 +290,15 @@ class bindbapi(fakedbapi): for k, v in list(mydata.items()): if not v: del mydata[k] - mytbz2.recompose_mem(portage.xpak.xpak_mem(mydata)) + if binpkg_format == "xpak": + mytbz2.recompose_mem(portage.xpak.xpak_mem(mydata)) + elif binpkg_format == "gpkg": + mybinpkg.update_metadata(mydata) + else: + raise InvalidBinaryPackageFormat( + "Unknown binary package format %s" % binpkg_path + ) + # inject will clear stale caches via cpv_inject. self.bintree.inject(cpv) @@ -271,12 +321,24 @@ class bindbapi(fakedbapi): if add_pkg is not None: await add_pkg._db.unpack_metadata(pkg, dest_dir, loop=loop) else: - tbz2_file = self.bintree.getname(cpv) - await loop.run_in_executor( - ForkExecutor(loop=loop), - portage.xpak.tbz2(tbz2_file).unpackinfo, - dest_dir, - ) + binpkg_file = self.bintree.getname(cpv) + binpkg_format = cpv.binpkg_format + if binpkg_format == "xpak": + await loop.run_in_executor( + ForkExecutor(loop=loop), + portage.xpak.tbz2(binpkg_file).unpackinfo, + dest_dir, + ) + elif binpkg_format == "gpkg": + await loop.run_in_executor( + ForkExecutor(loop=loop), + portage.gpkg.gpkg(self.settings, cpv, binpkg_file).unpack_metadata, + dest_dir, + ) + else: + raise InvalidBinaryPackageFormat( + "Unknown binary package format %s" % binpkg_file + ) async def unpack_contents(self, pkg, dest_dir, loop=None): """ @@ -297,23 +359,31 @@ class bindbapi(fakedbapi): pkg_path = self.bintree.getname(cpv) if pkg_path is not None: + binpkg_format = cpv.binpkg_format + if binpkg_format == "xpak": + extractor = BinpkgExtractorAsync( + background=settings.get("PORTAGE_BACKGROUND") == "1", + env=settings.environ(), + features=settings.features, + image_dir=dest_dir, + pkg=cpv, + pkg_path=pkg_path, + logfile=settings.get("PORTAGE_LOG_FILE"), + scheduler=SchedulerInterface(loop), + ) - extractor = BinpkgExtractorAsync( - background=settings.get("PORTAGE_BACKGROUND") == "1", - env=settings.environ(), - features=settings.features, - image_dir=dest_dir, - pkg=cpv, - pkg_path=pkg_path, - logfile=settings.get("PORTAGE_LOG_FILE"), - scheduler=SchedulerInterface(loop), - ) - - extractor.start() - await extractor.async_wait() - if extractor.returncode != os.EX_OK: - raise PortageException("Error Extracting '{}'".format(pkg_path)) - + extractor.start() + await extractor.async_wait() + if extractor.returncode != os.EX_OK: + raise PortageException("Error Extracting '{}'".format(pkg_path)) + elif binpkg_format == "gpkg": + await loop.run_in_executor( + ForkExecutor(loop=loop), + portage.gpkg.gpkg(self.settings, cpv, pkg_path).decompress, + dest_dir, + ) + else: + raise portage.exception.InvalidBinaryPackageFormat(pkg_path) else: instance_key = self._instance_key(cpv) add_pkg = self.bintree._additional_pkgs.get(instance_key) @@ -430,6 +500,7 @@ class binarytree: self._pkgindex_aux_keys = [ "BASE_URI", "BDEPEND", + "BINPKG_FORMAT", "BUILD_ID", "BUILD_TIME", "CHOST", @@ -473,6 +544,7 @@ class binarytree: "ACCEPT_LICENSE", "ACCEPT_PROPERTIES", "ACCEPT_RESTRICT", + "BINPKG_FORMAT", "CBUILD", "CONFIG_PROTECT", "CONFIG_PROTECT_MASK", @@ -508,10 +580,13 @@ class binarytree: "SLOT": "0", "USE": "", } - self._pkgindex_inherited_keys = ["CHOST", "repository"] + self._pkgindex_inherited_keys = ["BINPKG_FORMAT", "CHOST", "repository"] # Populate the header with appropriate defaults. self._pkgindex_default_header_data = { + "BINPKG_FORMAT": self.settings.get( + "BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0] + ), "CHOST": self.settings.get("CHOST", ""), "repository": "", } @@ -594,24 +669,42 @@ class binarytree: writemsg("!!! " + mycpv + " -> " + mynewcpv + "\n", noiselevel=-1) continue - tbz2path = self.getname(mycpv) - if os.path.exists(tbz2path) and not os.access(tbz2path, os.W_OK): + binpkg_path = self.getname(mycpv) + if os.path.exists(binpkg_path) and not os.access(binpkg_path, os.W_OK): writemsg( _("!!! Cannot update readonly binary: %s\n") % mycpv, noiselevel=-1 ) continue moves += 1 - mytbz2 = portage.xpak.tbz2(tbz2path) - mydata = mytbz2.get_data() + binpkg_format = mycpv.binpkg_format + if binpkg_format == "xpak": + mytbz2 = portage.xpak.tbz2(binpkg_path) + mydata = mytbz2.get_data() + decode_metadata_name = False + elif binpkg_format == "gpkg": + mybinpkg = portage.gpkg.gpkg(self.settings, mycpv, binpkg_path) + mydata = mybinpkg.get_metadata() + decode_metadata_name = True + else: + continue + updated_items = update_dbentries([mylist], mydata, parent=mycpv) mydata.update(updated_items) - mydata[b"PF"] = _unicode_encode( - mynewpkg + "\n", encoding=_encodings["repo.content"] - ) - mydata[b"CATEGORY"] = _unicode_encode( - mynewcat + "\n", encoding=_encodings["repo.content"] - ) + if decode_metadata_name: + mydata["PF"] = _unicode_encode( + mynewpkg + "\n", encoding=_encodings["repo.content"] + ) + mydata["CATEGORY"] = _unicode_encode( + mynewcat + "\n", encoding=_encodings["repo.content"] + ) + else: + mydata[b"PF"] = _unicode_encode( + mynewpkg + "\n", encoding=_encodings["repo.content"] + ) + mydata[b"CATEGORY"] = _unicode_encode( + mynewcat + "\n", encoding=_encodings["repo.content"] + ) if mynewpkg != myoldpkg: ebuild_data = mydata.pop( _unicode_encode( @@ -628,7 +721,7 @@ class binarytree: metadata = self.dbapi._aux_cache_slot_dict() for k in self.dbapi._aux_cache_keys: - v = mydata.get(_unicode_encode(k)) + v = mydata.get(k if decode_metadata_name else _unicode_encode(k)) if v is not None: v = _unicode_decode(v) metadata[k] = " ".join(v.split()) @@ -643,9 +736,15 @@ class binarytree: update_path_lock = None try: update_path_lock = lockfile(update_path, wantnewlockfile=True) - copyfile(tbz2path, update_path) - mytbz2 = portage.xpak.tbz2(update_path) - mytbz2.recompose_mem(portage.xpak.xpak_mem(mydata)) + copyfile(binpkg_path, update_path) + if binpkg_format == "xpak": + mytbz2 = portage.xpak.tbz2(update_path) + mytbz2.recompose_mem(portage.xpak.xpak_mem(mydata)) + elif binpkg_format == "gpkg": + mybinpkg = portage.gpkg.gpkg(self.settings, mycpv, update_path) + mybinpkg.update_metadata(mydata, newcpv=mynewcpv) + else: + raise InvalidBinaryPackageFormat(binpkg_format) self.inject(mynewcpv, filename=update_path) finally: if update_path_lock is not None: @@ -810,7 +909,13 @@ class binarytree: metadata[_instance_key(cpv)] = d path = d.get("PATH") if not path: - path = cpv + ".tbz2" + binpkg_format = d["BINPKG_FORMAT"] + if binpkg_format == "xpak": + path = cpv + ".tbz2" + elif binpkg_format == "gpkg": + path = cpv + ".gpkg.tar" + else: + continue if reindex: basename = os.path.basename(path) @@ -835,7 +940,9 @@ class binarytree: ) except UnicodeDecodeError: continue - if not myfile.endswith(SUPPORTED_XPAK_EXTENSIONS): + if not myfile.endswith( + SUPPORTED_XPAK_EXTENSIONS + SUPPORTED_GPKG_EXTENSIONS + ): continue mypath = os.path.join(mydir, myfile) full_path = os.path.join(self.pkgdir, mypath) @@ -846,9 +953,9 @@ class binarytree: # Validate data from the package index and try to avoid # reading the xpak if possible. + match = None possibilities = basename_index.get(myfile) if possibilities: - match = None for d in possibilities: try: if int(d["_mtime_"]) != s[stat.ST_MTIME]: @@ -873,7 +980,9 @@ class binarytree: update_pkgindex = True # Omit PATH if it is the default path for # the current Packages format version. - if mypath != mycpv + ".tbz2": + if (mypath != mycpv + ".tbz2") and ( + mypath != mycpv + ".gpkg.tar" + ): d["PATH"] = mypath if not oldpath: update_pkgindex = True @@ -891,15 +1000,30 @@ class binarytree: ) self.invalids.append(myfile[:-5]) continue - pkg_metadata = self._read_metadata( - full_path, - s, - keys=chain(self.dbapi._aux_cache_keys, ("PF", "CATEGORY")), - ) + + binpkg_format = None + if match: + binpkg_format = match.get("BINPKG_FORMAT", None) + try: + pkg_metadata = self._read_metadata( + full_path, + s, + keys=chain(self.dbapi._aux_cache_keys, ("PF", "CATEGORY")), + binpkg_format=binpkg_format, + ) + except PortagePackageException as e: + writemsg( + f"!!! Invalid binary package: '{full_path}', {e}\n", + noiselevel=-1, + ) + continue mycat = pkg_metadata.get("CATEGORY", "") mypf = pkg_metadata.get("PF", "") slot = pkg_metadata.get("SLOT", "") - mypkg = myfile[:-5] + for ext in SUPPORTED_XPAK_EXTENSIONS + SUPPORTED_GPKG_EXTENSIONS: + if myfile.endswith(ext): + mypkg = myfile[: -len(ext)] + break if not mycat or not mypf or not slot: # old-style or corrupt package writemsg( @@ -943,9 +1067,19 @@ class binarytree: invalid_name = True else: mypkg = mypkg[: -len(str(build_id)) - 1] + elif myfile.endswith(".gpkg.tar"): + build_id = self._parse_build_id(myfile) + if build_id > 0: + multi_instance = True + if myfile != "%s-%s.gpkg.tar" % (mypf, build_id): + invalid_name = True + else: + mypkg = mypkg[: -len(str(build_id)) - 1] + else: + if myfile != "%s.gpkg.tar" % mypf: + invalid_name = True elif myfile != mypf + ".tbz2": invalid_name = True - if invalid_name: writemsg( _("\n!!! Binary package name is " "invalid: '%s'\n") @@ -1043,7 +1177,7 @@ class binarytree: del pkg_paths[_instance_key(mycpv)] # record location if it's non-default - if mypath != mycpv + ".tbz2": + if (mypath != mycpv + ".tbz2") and (mypath != mycpv + ".gpkg.tar"): d["PATH"] = mypath else: d.pop("PATH", None) @@ -1407,7 +1541,10 @@ class binarytree: noiselevel=-1, ) return + metadata = self._read_metadata(full_path, s) + binpkg_format = metadata["BINPKG_FORMAT"] + invalid_depend = False try: self._eval_use_flags(cpv, metadata) @@ -1455,21 +1592,35 @@ class binarytree: basename = os.path.basename(full_path) pf = catsplit(cpv)[1] - if build_id is None and not fetched and basename.endswith(".xpak"): + if (build_id is None) and (not fetched) and binpkg_format: # Apply the newly assigned BUILD_ID. This is intended # to occur only for locally built packages. If the # package was fetched, we want to preserve its # attributes, so that we can later distinguish that it # is identical to its remote counterpart. build_id = self._parse_build_id(basename) - metadata["BUILD_ID"] = str(build_id) - cpv = _pkg_str( - cpv, metadata=metadata, settings=self.settings, db=self.dbapi - ) - binpkg = portage.xpak.tbz2(full_path) - binary_data = binpkg.get_data() - binary_data[b"BUILD_ID"] = _unicode_encode(metadata["BUILD_ID"]) - binpkg.recompose_mem(portage.xpak.xpak_mem(binary_data)) + if build_id > 0: + metadata["BUILD_ID"] = str(build_id) + cpv = _pkg_str( + cpv, metadata=metadata, settings=self.settings, db=self.dbapi + ) + if binpkg_format == "xpak": + if basename.endswith(".xpak"): + binpkg = portage.xpak.tbz2(full_path) + binary_data = binpkg.get_data() + binary_data[b"BUILD_ID"] = _unicode_encode( + metadata["BUILD_ID"] + ) + binpkg.recompose_mem(portage.xpak.xpak_mem(binary_data)) + elif binpkg_format == "gpkg": + binpkg = portage.gpkg.gpkg(self.settings, cpv, full_path) + binpkg_metadata = binpkg.get_metadata() + binpkg_metadata["BUILD_ID"] = _unicode_encode( + metadata["BUILD_ID"] + ) + binpkg.update_metadata(binpkg_metadata) + else: + raise InvalidBinaryPackageFormat(basename) self._file_permissions(full_path) pkgindex = self._load_pkgindex() @@ -1490,7 +1641,7 @@ class binarytree: return cpv - def _read_metadata(self, filename, st, keys=None): + def _read_metadata(self, filename, st, keys=None, binpkg_format=None): """ Read metadata from a binary package. The returned metadata dictionary will contain empty strings for any values that @@ -1511,14 +1662,35 @@ class binarytree: metadata = self.dbapi._aux_cache_slot_dict() else: metadata = {} - binary_metadata = portage.xpak.tbz2(filename).get_data() + + # xpak return key as binary, gpkg return key as str + decode_metadata_name = True + + if not binpkg_format: + binpkg_format = get_binpkg_format(filename) + if binpkg_format == "xpak": + binpkg_metadata = portage.xpak.tbz2(filename).get_data() + elif binpkg_format == "gpkg": + binpkg_metadata = portage.gpkg.gpkg( + self.settings, None, filename + ).get_metadata() + decode_metadata_name = False + else: + raise InvalidBinaryPackageFormat( + f"Unrecognized binary package format in '{filename}'" + ) + for k in keys: if k == "_mtime_": metadata[k] = str(st[stat.ST_MTIME]) elif k == "SIZE": metadata[k] = str(st.st_size) else: - v = binary_metadata.get(_unicode_encode(k)) + if decode_metadata_name: + v = binpkg_metadata.get(_unicode_encode(k)) + else: + # check gpkg + v = binpkg_metadata.get(k) if v is None: if k == "EAPI": metadata[k] = "0" @@ -1527,6 +1699,9 @@ class binarytree: else: v = _unicode_decode(v) metadata[k] = " ".join(v.split()) + + metadata["BINPKG_FORMAT"] = binpkg_format + return metadata def _inject_file(self, pkgindex, cpv, filename): @@ -1608,6 +1783,10 @@ class binarytree: """ pkg_path = self.getname(cpv) + try: + binpkg_format = cpv.binpkg_format + except AttributeError: + raise KeyError("{} metadata not found!".format(cpv)) d = dict(cpv._metadata.items()) d.update(perform_multiple_checksums(pkg_path, hashes=self._pkgindex_hashes)) @@ -1616,11 +1795,18 @@ class binarytree: st = os.lstat(pkg_path) d["_mtime_"] = str(st[stat.ST_MTIME]) d["SIZE"] = str(st.st_size) + d["BINPKG_FORMAT"] = binpkg_format rel_path = pkg_path[len(self.pkgdir) + 1 :] # record location if it's non-default - if rel_path != cpv + ".tbz2": - d["PATH"] = rel_path + if binpkg_format == "xpak": + if rel_path != cpv + ".tbz2": + d["PATH"] = rel_path + elif binpkg_format == "gpkg": + if rel_path != cpv + ".gpkg.tar": + d["PATH"] = rel_path + else: + raise InvalidBinaryPackageFormat(binpkg_format) return d @@ -1822,11 +2008,45 @@ class binarytree: return None if filename is None: - if self._multi_instance: - pf = catsplit(cpv)[1] - filename = "%s-%s.xpak" % (os.path.join(self.pkgdir, cpv.cp, pf), "1") + try: + binpkg_format = cpv.binpkg_format + except AttributeError: + # In order to force the caller to clarify its intent, do not + # use default BINPKG_FORMAT unless allocate_new is True. + # The caller can set cpv.binpkg_format in advance if something + # other than the default is desired here. + if allocate_new: + binpkg_format = self.settings.get( + "BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0] + ) + else: + binpkg_format = None + + if not binpkg_format: + # Raise an error if the desired binpkg_format is not clear. + # The caller should either set allocate_new to True or else + # ensure that cpv.binpkg_format is set to a particular format. + raise InvalidBinaryPackageFormat(binpkg_format) + elif binpkg_format == "xpak": + if self._multi_instance: + pf = catsplit(cpv)[1] + filename = "%s-%s.xpak" % ( + os.path.join(self.pkgdir, cpv.cp, pf), + "1", + ) + else: + filename = os.path.join(self.pkgdir, cpv + ".tbz2") + elif binpkg_format == "gpkg": + if self._multi_instance: + pf = catsplit(cpv)[1] + filename = "%s-%s.gpkg.tar" % ( + os.path.join(self.pkgdir, cpv.cp, pf), + "1", + ) + else: + filename = os.path.join(self.pkgdir, cpv + ".gpkg.tar") else: - filename = os.path.join(self.pkgdir, cpv + ".tbz2") + raise InvalidBinaryPackageFormat(binpkg_format) return filename @@ -1850,7 +2070,19 @@ class binarytree: return max_build_id def _allocate_filename(self, cpv): - return os.path.join(self.pkgdir, cpv + ".tbz2") + try: + binpkg_format = cpv.binpkg_format + except AttributeError: + binpkg_format = self.settings.get( + "BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0] + ) + + if binpkg_format == "xpak": + return os.path.join(self.pkgdir, cpv + ".tbz2") + elif binpkg_format == "gpkg": + return os.path.join(self.pkgdir, cpv + ".gpkg.tar") + else: + raise InvalidBinaryPackageFormat(binpkg_format) def _allocate_filename_multi(self, cpv): @@ -1864,8 +2096,25 @@ class binarytree: pf = catsplit(cpv)[1] build_id = max_build_id + 1 + try: + binpkg_format = cpv.binpkg_format + except AttributeError: + binpkg_format = self.settings.get( + "BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0] + ) + + if binpkg_format == "xpak": + filename_format = "%s-%s.xpak" + elif binpkg_format == "gpkg": + filename_format = "%s-%s.gpkg.tar" + else: + raise InvalidBinaryPackageFormat(binpkg_format) + while True: - filename = "%s-%s.xpak" % (os.path.join(self.pkgdir, cpv.cp, pf), build_id) + filename = filename_format % ( + os.path.join(self.pkgdir, cpv.cp, pf), + build_id, + ) if os.path.exists(filename): build_id += 1 else: @@ -1874,13 +2123,17 @@ class binarytree: @staticmethod def _parse_build_id(filename): build_id = -1 - suffixlen = len(".xpak") - hyphen = filename.rfind("-", 0, -(suffixlen + 1)) - if hyphen != -1: - try: - build_id = int(filename[hyphen + 1 : -suffixlen]) - except ValueError: - pass + if filename.endswith(SUPPORTED_XPAK_EXTENSIONS): + suffixlen = len(".xpak") + elif filename.endswith(SUPPORTED_GPKG_EXTENSIONS): + suffixlen = len(".gpkg.tar") + else: + raise InvalidBinaryPackageFormat(filename) + + filename = filename[:-suffixlen] + if re.match(r".*-[\w.]*\d+[\w.]*-\d+$", filename): + build_id = int(filename.split("-")[-1]) + return build_id def isremote(self, pkgname): diff --git a/lib/portage/dbapi/vartree.py b/lib/portage/dbapi/vartree.py index 8ffb23b1c..863efe9cc 100644 --- a/lib/portage/dbapi/vartree.py +++ b/lib/portage/dbapi/vartree.py @@ -43,6 +43,7 @@ portage.proxy.lazyimport.lazyimport( "portage.util._eventloop.global_event_loop:global_event_loop", "portage.versions:best,catpkgsplit,catsplit,cpv_getkey,vercmp," + "_get_slot_re,_pkgsplit@pkgsplit,_pkg_str,_unknown_repo", + "portage.gpkg", "subprocess", "tarfile", ) @@ -54,6 +55,7 @@ from portage.const import ( PORTAGE_PACKAGE_ATOM, PRIVATE_PATH, VDB_PATH, + SUPPORTED_GENTOO_BINPKG_FORMATS, ) from portage.dbapi import dbapi from portage.exception import ( @@ -61,6 +63,7 @@ from portage.exception import ( InvalidData, InvalidLocation, InvalidPackageName, + InvalidBinaryPackageFormat, FileNotFound, PermissionDenied, UnsupportedAPIException, @@ -1089,26 +1092,45 @@ class vardbapi(dbapi): "y" if include_unmodified_config else "n" ) ) - opts, args = parser.parse_known_args(opts_list) - tar_cmd = ("tar", "-x", "--xattrs", "--xattrs-include=*", "-C", dest_dir) - pr, pw = os.pipe() - proc = await asyncio.create_subprocess_exec(*tar_cmd, stdin=pr) - os.close(pr) - with os.fdopen(pw, "wb", 0) as pw_file: + binpkg_format = settings.get( + "BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0] + ) + if binpkg_format == "xpak": + tar_cmd = ("tar", "-x", "--xattrs", "--xattrs-include=*", "-C", dest_dir) + pr, pw = os.pipe() + proc = await asyncio.create_subprocess_exec(*tar_cmd, stdin=pr) + os.close(pr) + with os.fdopen(pw, "wb", 0) as pw_file: + excluded_config_files = await loop.run_in_executor( + ForkExecutor(loop=loop), + functools.partial( + self._dblink(cpv).quickpkg, + pw_file, + include_config=opts.include_config == "y", + include_unmodified_config=opts.include_unmodified_config == "y", + ), + ) + await proc.wait() + if proc.returncode != os.EX_OK: + raise PortageException("command failed: {}".format(tar_cmd)) + elif binpkg_format == "gpkg": + gpkg_tmp_fd, gpkg_tmp = tempfile.mkstemp(suffix=".gpkg.tar") + os.close(gpkg_tmp_fd) excluded_config_files = await loop.run_in_executor( ForkExecutor(loop=loop), functools.partial( self._dblink(cpv).quickpkg, - pw_file, + gpkg_tmp, include_config=opts.include_config == "y", include_unmodified_config=opts.include_unmodified_config == "y", ), ) - await proc.wait() - if proc.returncode != os.EX_OK: - raise PortageException("command failed: {}".format(tar_cmd)) + portage.gpkg.gpkg(settings, cpv, gpkg_tmp).decompress(dest_dir) + os.remove(gpkg_tmp) + else: + raise InvalidBinaryPackageFormat(binpkg_format) if excluded_config_files: log_lines = [ @@ -2107,7 +2129,11 @@ class dblink: return pkgfiles def quickpkg( - self, output_file, include_config=False, include_unmodified_config=False + self, + output_file, + metadata=None, + include_config=False, + include_unmodified_config=False, ): """ Create a tar file appropriate for use by quickpkg. @@ -2130,6 +2156,9 @@ class dblink: contents = self.getcontents() excluded_config_files = [] protect = None + binpkg_format = settings.get( + "BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0] + ) if not include_config: confprot = ConfigProtect( @@ -2152,16 +2181,22 @@ class dblink: excluded_config_files.append(filename) return True - # The tarfile module will write pax headers holding the - # xattrs only if PAX_FORMAT is specified here. - with tarfile.open( - fileobj=output_file, - mode="w|", - format=tarfile.PAX_FORMAT if xattrs else tarfile.DEFAULT_FORMAT, - ) as tar: - tar_contents( - contents, settings["ROOT"], tar, protect=protect, xattrs=xattrs - ) + if binpkg_format == "xpak": + # The tarfile module will write pax headers holding the + # xattrs only if PAX_FORMAT is specified here. + with tarfile.open( + fileobj=output_file, + mode="w|", + format=tarfile.PAX_FORMAT if xattrs else tarfile.DEFAULT_FORMAT, + ) as tar: + tar_contents( + contents, settings["ROOT"], tar, protect=protect, xattrs=xattrs + ) + elif binpkg_format == "gpkg": + gpkg_file = portage.gpkg.gpkg(settings, cpv, output_file) + gpkg_file._quickpkg(contents, metadata, settings["ROOT"], protect=protect) + else: + raise InvalidBinaryPackageFormat(binpkg_format) return excluded_config_files diff --git a/lib/portage/exception.py b/lib/portage/exception.py index ec8ea1980..3df4ce8fd 100644 --- a/lib/portage/exception.py +++ b/lib/portage/exception.py @@ -181,6 +181,22 @@ class InvalidPackageName(PortagePackageException): """Malformed package name""" +class InvalidBinaryPackageFormat(PortagePackageException): + """Invalid Binary Package Format""" + + +class InvalidCompressionMethod(PortagePackageException): + """Invalid or unsupported compression method""" + + +class CompressorNotFound(PortagePackageException): + """A required compressor binary was not available or executable""" + + +class CompressorOperationFailed(PortagePackageException): + """An error occurred during external operation""" + + class InvalidAtom(PortagePackageException): """Malformed atom spec""" @@ -208,6 +224,10 @@ class UnsupportedAPIException(PortagePackageException): return _unicode_decode(msg, encoding=_encodings["content"], errors="replace") +class GPGException(PortageException): + """GPG operation failed""" + + class SignatureException(PortageException): """Signature was not present in the checked file""" diff --git a/lib/portage/gpg.py b/lib/portage/gpg.py new file mode 100644 index 000000000..57be2ebc0 --- /dev/null +++ b/lib/portage/gpg.py @@ -0,0 +1,106 @@ +# Copyright 2001-2020 Gentoo Authors +# Distributed under the terms of the GNU General Public License v2 + +import subprocess +import sys +import threading +import time + +from portage import os +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS +from portage.exception import GPGException +from portage.output import colorize +from portage.util import shlex_split, varexpand, writemsg, writemsg_stdout + + +class GPG: + """ + Unlock GPG, must call dircetly from main program for get correct TTY + """ + + def __init__(self, settings): + """ + Portage settings are needed to run GPG unlock command. + """ + self.settings = settings + self.thread = None + self.GPG_signing_base_command = self.settings.get( + "BINPKG_GPG_SIGNING_BASE_COMMAND" + ) + self.digest_algo = self.settings.get("BINPKG_GPG_SIGNING_DIGEST") + self.signing_gpg_home = self.settings.get("BINPKG_GPG_SIGNING_GPG_HOME") + self.signing_gpg_key = self.settings.get("BINPKG_GPG_SIGNING_KEY") + self.GPG_unlock_command = self.GPG_signing_base_command.replace( + "[PORTAGE_CONFIG]", + f"--homedir {self.signing_gpg_home} " + f"--digest-algo {self.digest_algo} " + f"--local-user {self.signing_gpg_key} " + "--output /dev/null /dev/null", + ) + + if "gpg-keepalive" in self.settings.features: + self.keepalive = True + else: + self.keepalive = False + + def unlock(self): + """ + Set GPG_TTY and run GPG unlock command. + If gpg-keepalive is set, start keepalive thread. + """ + if self.GPG_unlock_command and ( + self.settings.get("BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0]) + == "gpkg" + ): + try: + os.environ["GPG_TTY"] = os.ttyname(sys.stdout.fileno()) + except OSError as e: + # When run with no input/output tty, this will fail. + # However, if the password is given by command, + # GPG does not need to ask password, so can be ignored. + writemsg(colorize("WARN", str(e)) + "\n") + + cmd = shlex_split(varexpand(self.GPG_unlock_command, mydict=self.settings)) + return_code = subprocess.Popen(cmd).wait() + + if return_code == os.EX_OK: + writemsg_stdout(colorize("GOOD", "unlocked") + "\n") + sys.stdout.flush() + else: + raise GPGException("GPG unlock failed") + + if self.keepalive: + self.GPG_unlock_command = shlex_split( + varexpand(self.GPG_unlock_command, mydict=self.settings) + ) + self.thread = threading.Thread(target=self.gpg_keepalive, daemon=True) + self.thread.start() + + def stop(self): + """ + Stop keepalive thread. + """ + if self.thread is not None: + self.keepalive = False + + def gpg_keepalive(self): + """ + Call GPG unlock command every 5 mins to avoid the passphrase expired. + """ + count = 0 + while self.keepalive: + if count < 5: + time.sleep(60) + count += 1 + continue + else: + count = 0 + + proc = subprocess.Popen( + self.GPG_unlock_command, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.STDOUT, + ) + if proc.wait() != os.EX_OK: + raise GPGException("GPG keepalive failed") diff --git a/lib/portage/gpkg.py b/lib/portage/gpkg.py new file mode 100644 index 000000000..b642e74ec --- /dev/null +++ b/lib/portage/gpkg.py @@ -0,0 +1,2015 @@ +# Copyright 2001-2020 Gentoo Authors +# Distributed under the terms of the GNU General Public License v2 + +import tarfile +import io +import threading +import subprocess +import errno +import pwd +import grp +import stat +import sys +import tempfile +from copy import copy +from datetime import datetime + +from portage import checksum +from portage import os +from portage import shutil +from portage import normalize_path +from portage import _encodings +from portage import _unicode_decode +from portage import _unicode_encode +from portage.exception import ( + FileNotFound, + InvalidBinaryPackageFormat, + InvalidCompressionMethod, + CompressorNotFound, + CompressorOperationFailed, + CommandNotFound, + GPGException, + DigestException, + MissingSignature, + InvalidSignature, +) +from portage.output import colorize +from portage.util._urlopen import urlopen +from portage.util import writemsg +from portage.util import shlex_split, varexpand +from portage.util.compression_probe import _compressors +from portage.process import find_binary +from portage.const import MANIFEST2_HASH_DEFAULTS, HASHING_BLOCKSIZE + + +class tar_stream_writer: + """ + One-pass helper function that return a file-like object + for create a file inside of a tar container. + + This helper allowed streaming add a new file to tar + without prior knows the file size. + + With optional call and pipe data through external program, + the helper can transparently save compressed data. + + With optional checksum helper, this helper can create + corresponding checksum and GPG signature. + + Example: + + writer = tar_stream_writer( + file_tarinfo, # the file tarinfo that need to be added + container, # the outer container tarfile object + tarfile.USTAR_FORMAT, # the outer container format + ["gzip"], # compression command + checksum_helper # checksum helper + ) + + writer.write(data) + writer.close() + """ + + def __init__( + self, + tarinfo, + container, + tar_format, + cmd=None, + checksum_helper=None, + uid=None, + gid=None, + ): + """ + tarinfo # the file tarinfo that need to be added + container # the outer container tarfile object + tar_format # the outer container format for create the tar header + cmd # subprocess.Popen format compression command + checksum_helper # checksum helper + uid # drop root user to uid + gid # drop root group to gid + """ + self.checksum_helper = checksum_helper + self.closed = False + self.container = container + self.killed = False + self.tar_format = tar_format + self.tarinfo = tarinfo + self.uid = uid + self.gid = gid + + # Record container end position + self.container.fileobj.seek(0, io.SEEK_END) + self.begin_position = self.container.fileobj.tell() + self.end_position = 0 + self.file_size = 0 + + # Write tar header without size + tar_header = self.tarinfo.tobuf( + self.tar_format, self.container.encoding, self.container.errors + ) + self.header_size = len(tar_header) + self.container.fileobj.write(tar_header) + self.container.fileobj.flush() + self.container.offset += self.header_size + + # Start external compressor if needed + if cmd is None: + self.proc = None + else: + if sys.hexversion >= 0x03090000: + self.proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + user=self.uid, + group=self.gid, + ) + else: + self.proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=self._drop_privileges, + ) + + self.read_thread = threading.Thread( + target=self._cmd_read_thread, name="tar_stream_cmd_read", daemon=True + ) + self.read_thread.start() + + def __del__(self): + self.close() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.close() + + def _drop_privileges(self): + if self.uid: + try: + os.setuid(self.uid) + except PermissionError: + writemsg( + colorize( + "BAD", f"!!! Drop root privileges to user {self.uid} failed." + ) + ) + raise + + if self.gid: + try: + os.setgid(self.gid) + except PermissionError: + writemsg( + colorize( + "BAD", f"!!! Drop root privileges to group {self.gid} failed." + ) + ) + raise + + def kill(self): + """ + kill external program if any error happened in python + """ + if self.proc is not None: + self.killed = True + self.proc.kill() + self.proc.stdin.close() + self.close() + + def _cmd_read_thread(self): + """ + Use thread to avoid block. + Read stdout from external compressor, then write to the file + in container, and to checksum helper if needed. + """ + while True: + try: + buffer = self.proc.stdout.read(HASHING_BLOCKSIZE) + if not buffer: + self.proc.stdout.close() + self.proc.stderr.close() + return + except BrokenPipeError: + self.proc.stdout.close() + if not self.killed: + # Do not raise error if killed by portage + raise CompressorOperationFailed("PIPE broken") + + self.container.fileobj.write(buffer) + if self.checksum_helper: + self.checksum_helper.update(buffer) + + def write(self, data): + """ + Write data to tarfile or external compressor stdin + """ + if self.closed: + raise OSError("writer closed") + + if self.proc: + # Write to external program + self.proc.stdin.write(data) + else: + # Write to container + self.container.fileobj.write(data) + if self.checksum_helper: + self.checksum_helper.update(data) + + def close(self): + """ + Update the new file tar header when close + """ + if self.closed: + return + + # Wait compressor exit + if self.proc is not None: + self.proc.stdin.close() + if self.proc.wait() != os.EX_OK: + raise CompressorOperationFailed("compression failed") + if self.read_thread.is_alive(): + self.read_thread.join() + + # Get container end position and calculate file size + self.container.fileobj.seek(0, io.SEEK_END) + self.end_position = self.container.fileobj.tell() + self.file_size = self.end_position - self.begin_position - self.header_size + self.tarinfo.size = self.file_size + + # Tar block is 512, need padding \0 + _, remainder = divmod(self.file_size, 512) + if remainder > 0: + padding_size = 512 - remainder + self.container.fileobj.write(b"\0" * padding_size) + self.container.offset += padding_size + self.container.fileobj.flush() + + # Update tar header + tar_header = self.tarinfo.tobuf( + self.tar_format, self.container.encoding, self.container.errors + ) + self.container.fileobj.seek(self.begin_position) + self.container.fileobj.write(tar_header) + self.container.fileobj.seek(0, io.SEEK_END) + self.container.fileobj.flush() + self.container.offset = self.container.fileobj.tell() + self.closed = True + + # Add tarinfo to tarfile + self.container.members.append(self.tarinfo) + + if self.checksum_helper: + self.checksum_helper.finish() + + self.closed = True + + +class tar_stream_reader: + """ + helper function that return a file-like object + for read a file inside of a tar container. + + This helper allowed transparently streaming read a compressed + file in tar. + + With optional call and pipe compressed data through external + program, and return the uncompressed data. + + reader = tar_stream_reader( + fileobj, # the fileobj from tarfile.extractfile(f) + ["gzip", "-d"], # decompression command + ) + + reader.read() + reader.close() + """ + + def __init__(self, fileobj, cmd=None, uid=None, gid=None): + """ + fileobj should be a file-like object that have read(). + cmd is optional external decompressor command. + """ + self.closed = False + self.cmd = cmd + self.fileobj = fileobj + self.killed = False + self.uid = uid + self.gid = gid + + if cmd is None: + self.read_io = fileobj + self.proc = None + else: + # Start external decompressor + if sys.hexversion >= 0x03090000: + self.proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + user=self.uid, + group=self.gid, + ) + else: + self.proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + preexec_fn=self._drop_privileges, + ) + self.read_io = self.proc.stdout + # Start stdin block writing thread + self.thread = threading.Thread( + target=self._write_thread, name="tar_stream_stdin_writer", daemon=True + ) + self.thread.start() + + def __del__(self): + try: + self.close() + except CompressorOperationFailed: + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + try: + self.close() + except CompressorOperationFailed: + pass + + def _write_thread(self): + """ + writing thread to avoid full buffer blocking + """ + try: + while True: + buffer = self.fileobj.read(HASHING_BLOCKSIZE) + if buffer: + try: + self.proc.stdin.write(buffer) + except ValueError: + if self.killed: + return + else: + raise + else: + self.proc.stdin.flush() + self.proc.stdin.close() + break + except BrokenPipeError: + if self.killed is False: + raise CompressorOperationFailed("PIPE broken") + + def _drop_privileges(self): + if self.uid: + try: + os.setuid(self.uid) + except PermissionError: + writemsg( + colorize( + "BAD", f"!!! Drop root privileges to user {self.uid} failed." + ) + ) + raise + + if self.gid: + try: + os.setgid(self.gid) + except PermissionError: + writemsg( + colorize( + "BAD", f"!!! Drop root privileges to group {self.gid} failed." + ) + ) + raise + + def kill(self): + """ + kill external program if any error happened in python + """ + if self.proc is not None: + self.killed = True + self.proc.kill() + self.proc.stdin.close() + self.close() + + def read(self, bufsize=-1): + """ + return decompressor stdout data + """ + if self.closed: + raise OSError("writer closed") + else: + return self.read_io.read(bufsize) + + def close(self): + """ + wait external program complete and do clean up + """ + if self.closed: + return + + self.closed = True + + if self.proc is not None: + self.thread.join() + try: + if self.proc.wait() != os.EX_OK: + if not self.proc.stderr.closed: + stderr = self.proc.stderr.read().decode() + if not self.killed: + writemsg(colorize("BAD", "!!!" + "\n" + stderr)) + raise CompressorOperationFailed("decompression failed") + finally: + self.proc.stdout.close() + self.proc.stderr.close() + + +class checksum_helper: + """ + Do checksum generation and GPG Signature generation and verification + """ + + SIGNING = 0 + VERIFY = 1 + + def __init__(self, settings, gpg_operation=None, detached=True, signature=None): + """ + settings # portage settings + gpg_operation # either SIGNING or VERIFY + signature # GPG signature string used for GPG verify only + """ + self.settings = settings + self.gpg_operation = gpg_operation + self.gpg_proc = None + self.gpg_result = None + self.gpg_output = None + self.finished = False + self.sign_file_path = None + + if (gpg_operation == checksum_helper.VERIFY) and (os.getuid() == 0): + try: + drop_user = self.settings.get("GPG_VERIFY_USER_DROP", "nobody") + if drop_user == "": + self.uid = None + else: + self.uid = pwd.getpwnam(drop_user).pw_uid + except KeyError: + writemsg(colorize("BAD", f"!!! Failed to find user {drop_user}.")) + raise + + try: + drop_group = self.settings.get("GPG_VERIFY_GROUP_DROP", "nogroup") + if drop_group == "": + self.gid = None + else: + self.gid = grp.getgrnam(drop_group).gr_gid + except KeyError: + writemsg(colorize("BAD", f"!!! Failed to find group {drop_group}.")) + raise + else: + self.uid = None + self.gid = None + + # Initialize the hash libs + self.libs = {} + for hash_name in MANIFEST2_HASH_DEFAULTS: + self.libs[hash_name] = checksum.hashfunc_map[hash_name]._hashobject() + + # GPG + env = self.settings.environ() + if self.gpg_operation == checksum_helper.SIGNING: + gpg_signing_base_command = self.settings.get( + "BINPKG_GPG_SIGNING_BASE_COMMAND" + ) + digest_algo = self.settings.get("BINPKG_GPG_SIGNING_DIGEST") + gpg_home = self.settings.get("BINPKG_GPG_SIGNING_GPG_HOME") + gpg_key = self.settings.get("BINPKG_GPG_SIGNING_KEY") + + if detached: + gpg_detached = "--detach-sig" + else: + gpg_detached = "--clear-sign" + + if gpg_signing_base_command: + gpg_signing_command = gpg_signing_base_command.replace( + "[PORTAGE_CONFIG]", + f"--homedir {gpg_home} " + f"--digest-algo {digest_algo} " + f"--local-user {gpg_key} " + f"{gpg_detached} " + "--batch --no-tty", + ) + + gpg_signing_command = shlex_split( + varexpand(gpg_signing_command, mydict=self.settings) + ) + gpg_signing_command = [x for x in gpg_signing_command if x != ""] + try: + env["GPG_TTY"] = os.ttyname(sys.stdout.fileno()) + except OSError: + pass + else: + raise CommandNotFound("GPG signing command is not set") + + self.gpg_proc = subprocess.Popen( + gpg_signing_command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + ) + + elif self.gpg_operation == checksum_helper.VERIFY: + if (signature is None) and (detached == True): + raise MissingSignature("No signature provided") + + gpg_verify_base_command = self.settings.get( + "BINPKG_GPG_VERIFY_BASE_COMMAND" + ) + gpg_home = self.settings.get("BINPKG_GPG_VERIFY_GPG_HOME") + + if not gpg_verify_base_command: + raise CommandNotFound("GPG verify command is not set") + + gpg_verify_command = gpg_verify_base_command.replace( + "[PORTAGE_CONFIG]", f"--homedir {gpg_home} " + ) + + if detached: + self.sign_file_fd, self.sign_file_path = tempfile.mkstemp( + ".sig", "portage-sign-" + ) + + gpg_verify_command = gpg_verify_command.replace( + "[SIGNATURE]", f"{self.sign_file_path} -" + ) + + # Create signature file and allow everyone read + with open(self.sign_file_fd, "wb") as sign: + sign.write(signature) + os.chmod(self.sign_file_path, 0o644) + else: + gpg_verify_command = gpg_verify_command.replace( + "[SIGNATURE]", "--output - -" + ) + + gpg_verify_command = shlex_split( + varexpand(gpg_verify_command, mydict=self.settings) + ) + gpg_verify_command = [x for x in gpg_verify_command if x != ""] + + if sys.hexversion >= 0x03090000: + self.gpg_proc = subprocess.Popen( + gpg_verify_command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + user=self.uid, + group=self.gid, + ) + + else: + self.gpg_proc = subprocess.Popen( + gpg_verify_command, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + preexec_fn=self._drop_privileges, + ) + + def __del__(self): + self.finish() + + def _check_gpg_status(self, gpg_status): + """ + Check GPG status log for extra info. + GPG will return OK even if the signature owner is not trusted. + """ + good_signature = False + trust_signature = False + + for l in gpg_status.splitlines(): + if l.startswith("[GNUPG:] GOODSIG"): + good_signature = True + + if l.startswith("[GNUPG:] TRUST_ULTIMATE") or l.startswith( + "[GNUPG:] TRUST_FULLY" + ): + trust_signature = True + + if (not good_signature) or (not trust_signature): + writemsg(colorize("BAD", "!!!" + "\n" + self.gpg_result.decode())) + raise InvalidSignature("GPG verify failed") + + def _drop_privileges(self): + if self.uid: + try: + os.setuid(self.uid) + except PermissionError: + writemsg( + colorize( + "BAD", f"!!! Drop root privileges to user {self.uid} failed." + ) + ) + raise + + if self.gid: + try: + os.setgid(self.gid) + except PermissionError: + writemsg( + colorize( + "BAD", f"!!! Drop root privileges to group {self.gid} failed." + ) + ) + raise + + def update(self, data): + """ + Write data to hash libs and GPG stdin. + """ + for c in self.libs: + self.libs[c].update(data) + + if self.gpg_proc is not None: + self.gpg_proc.stdin.write(data) + + def finish(self): + """ + Tell GPG file is EOF, and get results, then do clean up. + """ + if self.finished: + return + + if self.gpg_proc is not None: + # Tell GPG EOF + self.gpg_proc.stdin.close() + + return_code = self.gpg_proc.wait() + + if self.sign_file_path: + os.remove(self.sign_file_path) + + self.finished = True + + self.gpg_result = self.gpg_proc.stderr.read() + self.gpg_output = self.gpg_proc.stdout.read() + self.gpg_proc.stdout.close() + self.gpg_proc.stderr.close() + + if return_code == os.EX_OK: + if self.gpg_operation == checksum_helper.VERIFY: + self._check_gpg_status(self.gpg_result.decode()) + else: + writemsg(colorize("BAD", "!!!" + "\n" + self.gpg_result.decode())) + if self.gpg_operation == checksum_helper.SIGNING: + writemsg(colorize("BAD", self.gpg_output.decode())) + raise GPGException("GPG signing failed") + elif self.gpg_operation == checksum_helper.VERIFY: + raise InvalidSignature("GPG verify failed") + + +class tar_safe_extract: + """ + A safer version of tar extractall that doing sanity check. + Note that this does not solve all security problems. + """ + + def __init__(self, tar: tarfile.TarFile, prefix: str = ""): + """ + tar: an opened TarFile that ready to be read. + prefix: a optional prefix for an inner directory should be considered + as the root directory. e.g. "metadata" and "image". + """ + self.tar = tar + self.prefix = prefix + self.closed = False + self.file_list = [] + + def extractall(self, dest_dir: str): + """ + Extract all files to a temporary directory in the dest_dir, and move + them to the dest_dir after sanity check. + """ + if self.closed: + raise IOError("Tar file is closed.") + temp_dir = tempfile.TemporaryDirectory(dir=dest_dir) + try: + while True: + member = self.tar.next() + if member is None: + break + if (member.name in self.file_list) or ( + os.path.join(".", member.name) in self.file_list + ): + writemsg( + colorize( + "BAD", f"Danger: duplicate files detected: {member.name}" + ) + ) + raise ValueError("Duplicate files detected.") + if member.name.startswith("/"): + writemsg( + colorize( + "BAD", f"Danger: absolute path detected: {member.name}" + ) + ) + raise ValueError("Absolute path detected.") + if member.name.startswith("../") or ("/../" in member.name): + writemsg( + colorize( + "BAD", f"Danger: path traversal detected: {member.name}" + ) + ) + raise ValueError("Path traversal detected.") + if member.isdev(): + writemsg( + colorize("BAD", f"Danger: device file detected: {member.name}") + ) + raise ValueError("Device file detected.") + if member.islnk() and (member.linkname not in self.file_list): + writemsg( + colorize( + "BAD", f"Danger: hardlink escape detected: {member.name}" + ) + ) + raise ValueError("Hardlink escape detected.") + + self.file_list.append(member.name) + self.tar.extract(member, path=temp_dir.name) + + data_dir = os.path.join(temp_dir.name, self.prefix) + for file in os.listdir(data_dir): + shutil.move(os.path.join(data_dir, file), os.path.join(dest_dir, file)) + finally: + temp_dir.cleanup() + self.closed = True + + +class gpkg: + """ + Gentoo binary package + https://www.gentoo.org/glep/glep-0078.html + """ + + def __init__(self, settings, base_name=None, gpkg_file=None): + """ + gpkg class handle all gpkg operations for one package. + base_name is the package basename. + gpkg_file should be exists file path for read or will create. + """ + if sys.version_info.major < 3: + raise InvalidBinaryPackageFormat("GPKG not support Python 2") + self.settings = settings + self.gpkg_version = "gpkg-1" + if gpkg_file is None: + self.gpkg_file = None + else: + self.gpkg_file = _unicode_decode( + gpkg_file, encoding=_encodings["fs"], errors="strict" + ) + self.base_name = base_name + self.checksums = [] + self.manifest_old = [] + + # Compression is the compression algorithm, if set to None will + # not use compression. + self.compression = self.settings.get("BINPKG_COMPRESS", None) + if self.compression in ["", "none"]: + self.compression = None + + # The create_signature is whether create signature for the package or not. + if "binpkg-signing" in self.settings.features: + self.create_signature = True + else: + self.create_signature = False + + # The request_signature is whether signature files are mandatory. + # If set true, any missing signature file will cause reject processing. + if "binpkg-request-signature" in self.settings.features: + self.request_signature = True + else: + self.request_signature = False + + # The verify_signature is whether verify package signature or not. + # In rare case user may want to ignore signature, + # E.g. package with expired signature. + if "binpkg-ignore-signature" in self.settings.features: + self.verify_signature = False + else: + self.verify_signature = True + + self.ext_list = { + "gzip": ".gz", + "bzip2": ".bz2", + "lz4": ".lz4", + "lzip": ".lz", + "lzop": ".lzo", + "xz": ".xz", + "zstd": ".zst", + } + + def unpack_metadata(self, dest_dir=None): + """ + Unpack metadata to dest_dir. + If dest_dir is None, return files and values in dict. + The dict key will be UTF-8, not bytes. + """ + self._verify_binpkg(metadata_only=True) + + with tarfile.open(self.gpkg_file, "r") as container: + metadata_tarinfo, metadata_comp = self._get_inner_tarinfo( + container, "metadata" + ) + + with tar_stream_reader( + container.extractfile(metadata_tarinfo), + self._get_decompression_cmd(metadata_comp), + ) as metadata_reader: + metadata_tar = io.BytesIO(metadata_reader.read()) + + with tarfile.open(mode="r:", fileobj=metadata_tar) as metadata: + if dest_dir is None: + metadata_ = { + os.path.relpath(k.name, "metadata"): metadata.extractfile( + k + ).read() + for k in metadata.getmembers() + } + else: + metadata_safe = tar_safe_extract(metadata, "metadata") + metadata_safe.extractall(dest_dir) + metadata_ = True + metadata_tar.close() + return metadata_ + + def get_metadata(self, want=None): + """ + get package metadata. + if want is list, return all want key-values in dict + if want is str, return the want key value + """ + if want is None: + return self.unpack_metadata() + elif isinstance(want, str): + metadata = self.unpack_metadata() + metadata_want = metadata.get(want, None) + return metadata_want + else: + metadata = self.unpack_metadata() + metadata_want = {k: metadata.get(k, None) for k in want} + return metadata_want + + def get_metadata_url(self, url, want=None): + """ + Return the requested metadata from url gpkg. + Default return all meta data. + Use 'want' to get specific name from metadata. + This method only support the correct package format. + Wrong files order or incorrect basename will be considered invalid + to reduce potential attacks. + Only signature will be check if the signature file is the next file. + Manifest will be ignored since it will be at the end of package. + """ + # The init download file head size + init_size = 51200 + + # Load remote container + container_file = io.BytesIO( + urlopen(url, headers={"Range": "bytes=0-" + str(init_size)}).read() + ) + + # Check gpkg and metadata + with tarfile.open(mode="r", fileobj=container_file) as container: + if self.gpkg_version not in container.getnames(): + raise InvalidBinaryPackageFormat("Invalid gpkg file.") + + metadata_tarinfo, metadata_comp = self._get_inner_tarinfo( + container, "metadata" + ) + + # Extra 10240 bytes for signature + end_size = metadata_tarinfo.offset_data + metadata_tarinfo.size + 10240 + _, remainder = divmod(end_size, 512) + end_size += 512 - remainder + + # If need more data + if end_size > 10000000: + raise InvalidBinaryPackageFormat("metadata too large " + str(end_size)) + if end_size > init_size: + container_file.seek(0, io.SEEK_END) + container_file.write( + urlopen( + url, + headers={ + "Range": "bytes=" + str(init_size + 1) + "-" + str(end_size) + }, + ).read() + ) + + container_file.seek(0) + + # Reload and process full metadata + with tarfile.open(mode="r", fileobj=container_file) as container: + metadata_tarinfo, metadata_comp = self._get_inner_tarinfo( + container, "metadata" + ) + + # Verify metadata file signature if needed + # binpkg-ignore-signature can override this. + signature_filename = metadata_tarinfo.name + ".sig" + if signature_filename in container.getnames(): + if self.request_signature and self.verify_signature: + metadata_signature = container.extractfile( + signature_filename + ).read() + checksum_info = checksum_helper( + self.settings, + gpg_operation=checksum_helper.VERIFY, + signature=metadata_signature, + ) + checksum_info.update(container.extractfile(metadata_tarinfo).read()) + checksum_info.finish() + + # Load metadata + with tar_stream_reader( + container.extractfile(metadata_tarinfo), + self._get_decompression_cmd(metadata_comp), + ) as metadata_reader: + metadata_file = io.BytesIO(metadata_reader.read()) + + with tarfile.open(mode="r:", fileobj=metadata_file) as metadata: + if want is None: + metadata_ = { + os.path.relpath(k.name, "metadata"): metadata.extractfile( + k + ).read() + for k in metadata.getmembers() + } + else: + metadata_ = { + os.path.relpath(k.name, "metadata"): metadata.extractfile( + k + ).read() + for k in metadata.getmembers() + if k in want + } + metadata_file.close() + container_file.close() + return metadata_ + + def compress(self, root_dir, metadata, clean=False): + """ + Use initialized configuation create new gpkg file from root_dir. + Will overwrite any exists file. + metadata is a dict, the key will be file name, the value will be + the file contents. + """ + + root_dir = normalize_path( + _unicode_decode(root_dir, encoding=_encodings["fs"], errors="strict") + ) + + # Get pre image info + container_tar_format, image_tar_format = self._get_tar_format_from_stats( + *self._check_pre_image_files(root_dir) + ) + + # Long CPV + if len(self.base_name) >= 154: + container_tar_format = tarfile.GNU_FORMAT + + # gpkg container + container = tarfile.TarFile( + name=self.gpkg_file, mode="w", format=container_tar_format + ) + + # gpkg version + gpkg_version_file = tarfile.TarInfo(self.gpkg_version) + gpkg_version_file.mtime = datetime.utcnow().timestamp() + container.addfile(gpkg_version_file) + + compression_cmd = self._get_compression_cmd() + + # metadata + self._add_metadata(container, metadata, compression_cmd) + + # image + if self.create_signature: + checksum_info = checksum_helper( + self.settings, gpg_operation=checksum_helper.SIGNING + ) + else: + checksum_info = checksum_helper(self.settings) + + image_tarinfo = self._create_tarinfo("image") + image_tarinfo.mtime = datetime.utcnow().timestamp() + with tar_stream_writer( + image_tarinfo, container, image_tar_format, compression_cmd, checksum_info + ) as image_writer: + with tarfile.open( + mode="w|", fileobj=image_writer, format=image_tar_format + ) as image_tar: + image_tar.add(root_dir, "image", recursive=True) + + image_tarinfo = container.getmember(image_tarinfo.name) + self._record_checksum(checksum_info, image_tarinfo) + + if self.create_signature: + self._add_signature(checksum_info, image_tarinfo, container) + + self._add_manifest(container) + container.close() + + def decompress(self, decompress_dir): + """ + decompress current gpkg to decompress_dir + """ + decompress_dir = normalize_path( + _unicode_decode(decompress_dir, encoding=_encodings["fs"], errors="strict") + ) + + self._verify_binpkg() + os.makedirs(decompress_dir, mode=0o755, exist_ok=True) + + with tarfile.open(self.gpkg_file, "r") as container: + image_tarinfo, image_comp = self._get_inner_tarinfo(container, "image") + + with tar_stream_reader( + container.extractfile(image_tarinfo), + self._get_decompression_cmd(image_comp), + ) as image_tar: + + with tarfile.open(mode="r|", fileobj=image_tar) as image: + try: + image_safe = tar_safe_extract(image, "image") + image_safe.extractall(decompress_dir) + except Exception as ex: + writemsg(colorize("BAD", "!!!" + "Extract failed.")) + raise + finally: + image_tar.kill() + + def update_metadata(self, metadata, newcpv=None): + """ + Update metadata in the gpkg file. + """ + self._verify_binpkg() + self.checksums = [] + oldcpv = None + + if newcpv: + oldcpv = self.base_name + + with open(self.gpkg_file, "rb") as container: + container_tar_format = self._get_tar_format(container) + if container_tar_format is None: + raise InvalidBinaryPackageFormat("Cannot identify tar format") + + # container + tmp_gpkg_file_name = self.gpkg_file + "." + str(os.getpid()) + with tarfile.TarFile( + name=tmp_gpkg_file_name, mode="w", format=container_tar_format + ) as container: + # gpkg version + gpkg_version_file = tarfile.TarInfo(self.gpkg_version) + gpkg_version_file.mtime = datetime.utcnow().timestamp() + container.addfile(gpkg_version_file) + + compression_cmd = self._get_compression_cmd() + + # metadata + if newcpv: + self.base_name = newcpv + self._add_metadata(container, metadata, compression_cmd) + self.base_name = oldcpv + else: + self._add_metadata(container, metadata, compression_cmd) + + # reuse other stuffs + with tarfile.open(self.gpkg_file, "r") as container_old: + manifest_old = self.manifest_old.copy() + + for m in manifest_old: + file_name_old = m[1] + if os.path.basename(file_name_old).startswith("metadata"): + continue + old_data_tarinfo = container_old.getmember(file_name_old) + new_data_tarinfo = copy(old_data_tarinfo) + if newcpv: + m[1] = m[1].replace(oldcpv, newcpv, 1) + new_data_tarinfo.name = new_data_tarinfo.name.replace( + oldcpv, newcpv, 1 + ) + container.addfile( + new_data_tarinfo, container_old.extractfile(old_data_tarinfo) + ) + self.checksums.append(m) + + self._add_manifest(container) + + shutil.move(tmp_gpkg_file_name, self.gpkg_file) + + def _add_metadata(self, container, metadata, compression_cmd): + """ + add metadata to container + """ + if metadata is None: + metadata = {} + metadata_tarinfo = self._create_tarinfo("metadata") + metadata_tarinfo.mtime = datetime.utcnow().timestamp() + + if self.create_signature: + checksum_info = checksum_helper( + self.settings, gpg_operation=checksum_helper.SIGNING + ) + else: + checksum_info = checksum_helper(self.settings) + + with tar_stream_writer( + metadata_tarinfo, + container, + tarfile.USTAR_FORMAT, + compression_cmd, + checksum_info, + ) as metadata_writer: + with tarfile.open( + mode="w|", fileobj=metadata_writer, format=tarfile.USTAR_FORMAT + ) as metadata_tar: + + for m in metadata: + m_info = tarfile.TarInfo(os.path.join("metadata", m)) + m_info.mtime = datetime.utcnow().timestamp() + + if isinstance(metadata[m], bytes): + m_data = io.BytesIO(metadata[m]) + else: + m_data = io.BytesIO(metadata[m].encode("UTF-8")) + + m_data.seek(0, io.SEEK_END) + m_info.size = m_data.tell() + m_data.seek(0) + metadata_tar.addfile(m_info, m_data) + m_data.close() + + metadata_tarinfo = container.getmember(metadata_tarinfo.name) + self._record_checksum(checksum_info, metadata_tarinfo) + + if self.create_signature: + self._add_signature(checksum_info, metadata_tarinfo, container) + + def _quickpkg(self, contents, metadata, root_dir, protect=None): + """ + Similar to compress, but for quickpkg. + Will compress the given files to image with root, + ignoring all other files. + """ + + protect_file = io.BytesIO( + b"# empty file because --include-config=n " + b"when `quickpkg` was used\n" + ) + protect_file.seek(0, io.SEEK_END) + protect_file_size = protect_file.tell() + + root_dir = normalize_path( + _unicode_decode(root_dir, encoding=_encodings["fs"], errors="strict") + ) + + # Get pre image info + container_tar_format, image_tar_format = self._get_tar_format_from_stats( + *self._check_pre_quickpkg_files(contents, root_dir) + ) + + # Long CPV + if len(self.base_name) >= 154: + container_tar_format = tarfile.GNU_FORMAT + + # GPKG container + container = tarfile.TarFile( + name=self.gpkg_file, mode="w", format=container_tar_format + ) + + # GPKG version + gpkg_version_file = tarfile.TarInfo(self.gpkg_version) + gpkg_version_file.mtime = datetime.utcnow().timestamp() + container.addfile(gpkg_version_file) + + compression_cmd = self._get_compression_cmd() + # Metadata + self._add_metadata(container, metadata, compression_cmd) + + # Image + if self.create_signature: + checksum_info = checksum_helper( + self.settings, gpg_operation=checksum_helper.SIGNING + ) + else: + checksum_info = checksum_helper(self.settings) + + paths = list(contents) + paths.sort() + image_tarinfo = self._create_tarinfo("image") + image_tarinfo.mtime = datetime.utcnow().timestamp() + with tar_stream_writer( + image_tarinfo, container, image_tar_format, compression_cmd, checksum_info + ) as image_writer: + with tarfile.open( + mode="w|", fileobj=image_writer, format=image_tar_format + ) as image_tar: + if len(paths) == 0: + tarinfo = image_tar.tarinfo("image") + tarinfo.type = tarfile.DIRTYPE + tarinfo.size = 0 + tarinfo.mode = 0o755 + image_tar.addfile(tarinfo) + + for path in paths: + try: + lst = os.lstat(path) + except OSError as e: + if e.errno != errno.ENOENT: + raise + del e + continue + contents_type = contents[path][0] + if path.startswith(root_dir): + arcname = "image/" + path[len(root_dir) :] + else: + raise ValueError("invalid root argument: '%s'" % root_dir) + live_path = path + if ( + "dir" == contents_type + and not stat.S_ISDIR(lst.st_mode) + and os.path.isdir(live_path) + ): + # Even though this was a directory in the original ${D}, it exists + # as a symlink to a directory in the live filesystem. It must be + # recorded as a real directory in the tar file to ensure that tar + # can properly extract it's children. + live_path = os.path.realpath(live_path) + lst = os.lstat(live_path) + + # Since os.lstat() inside TarFile.gettarinfo() can trigger a + # UnicodeEncodeError when python has something other than utf_8 + # return from sys.getfilesystemencoding() (as in bug #388773), + # we implement the needed functionality here, using the result + # of our successful lstat call. An alternative to this would be + # to pass in the fileobj argument to TarFile.gettarinfo(), so + # that it could use fstat instead of lstat. However, that would + # have the unwanted effect of dereferencing symlinks. + + tarinfo = image_tar.tarinfo(arcname) + tarinfo.mode = lst.st_mode + tarinfo.uid = lst.st_uid + tarinfo.gid = lst.st_gid + tarinfo.size = 0 + tarinfo.mtime = lst.st_mtime + tarinfo.linkname = "" + if stat.S_ISREG(lst.st_mode): + inode = (lst.st_ino, lst.st_dev) + if ( + lst.st_nlink > 1 + and inode in image_tar.inodes + and arcname != image_tar.inodes[inode] + ): + tarinfo.type = tarfile.LNKTYPE + tarinfo.linkname = image_tar.inodes[inode] + else: + image_tar.inodes[inode] = arcname + tarinfo.type = tarfile.REGTYPE + tarinfo.size = lst.st_size + elif stat.S_ISDIR(lst.st_mode): + tarinfo.type = tarfile.DIRTYPE + elif stat.S_ISLNK(lst.st_mode): + tarinfo.type = tarfile.SYMTYPE + tarinfo.linkname = os.readlink(live_path) + else: + continue + try: + tarinfo.uname = pwd.getpwuid(tarinfo.uid)[0] + except KeyError: + pass + try: + tarinfo.gname = grp.getgrgid(tarinfo.gid)[0] + except KeyError: + pass + + if stat.S_ISREG(lst.st_mode): + if protect and protect(path): + protect_file.seek(0) + tarinfo.size = protect_file_size + image_tar.addfile(tarinfo, protect_file) + else: + path_bytes = _unicode_encode( + path, encoding=_encodings["fs"], errors="strict" + ) + + with open(path_bytes, "rb") as f: + image_tar.addfile(tarinfo, f) + + else: + image_tar.addfile(tarinfo) + + image_tarinfo = container.getmember(image_tarinfo.name) + self._record_checksum(checksum_info, image_tarinfo) + + if self.create_signature: + self._add_signature(checksum_info, image_tarinfo, container) + + self._add_manifest(container) + container.close() + + def _record_checksum(self, checksum_info, tarinfo): + """ + Record checksum result for the given file. + Replace old checksum if already exists. + """ + for c in self.checksums: + if c[1] == tarinfo.name: + self.checksums.remove(c) + break + + checksum_record = ["DATA", tarinfo.name, str(tarinfo.size)] + + for c in checksum_info.libs: + checksum_record.append(c) + checksum_record.append(checksum_info.libs[c].hexdigest()) + + self.checksums.append(checksum_record) + + def _add_manifest(self, container): + """ + Add Manifest to the container based on current checksums. + Creare GPG signatue if needed. + """ + manifest = io.BytesIO() + + for m in self.checksums: + manifest.write((" ".join(m) + "\n").encode("UTF-8")) + + if self.create_signature: + checksum_info = checksum_helper( + self.settings, gpg_operation=checksum_helper.SIGNING, detached=False + ) + checksum_info.update(manifest.getvalue()) + checksum_info.finish() + manifest.seek(0) + manifest.write(checksum_info.gpg_output) + + manifest_tarinfo = tarfile.TarInfo("Manifest") + manifest_tarinfo.size = manifest.tell() + manifest_tarinfo.mtime = datetime.utcnow().timestamp() + manifest.seek(0) + container.addfile(manifest_tarinfo, manifest) + manifest.close() + + def _load_manifest(self, manifest_string): + """ + Check, load, and return manifest in a list by files + """ + manifest = [] + manifest_filenames = [] + + for manifest_record in manifest_string.splitlines(): + if manifest_record == "": + continue + manifest_record = manifest_record.strip().split() + + if manifest_record[0] != "DATA": + raise DigestException("invalied Manifest") + + if manifest_record[1] in manifest_filenames: + raise DigestException("Manifest duplicate file exists") + + try: + int(manifest_record[2]) + except ValueError: + raise DigestException("Manifest invalied file size") + + manifest.append(manifest_record) + manifest_filenames.append(manifest_record[1]) + + return manifest + + def _add_signature(self, checksum_info, tarinfo, container, manifest=True): + """ + Add GPG signature for the given tarinfo file. + manifest: add to manifest + """ + if checksum_info.gpg_output is None: + raise GPGException("GPG signature is not exists") + + signature = io.BytesIO(checksum_info.gpg_output) + signature_tarinfo = tarfile.TarInfo(tarinfo.name + ".sig") + signature_tarinfo.size = len(signature.getvalue()) + signature_tarinfo.mtime = datetime.utcnow().timestamp() + container.addfile(signature_tarinfo, signature) + + if manifest: + signature_checksum_info = checksum_helper(self.settings) + signature.seek(0) + signature_checksum_info.update(signature.read()) + signature_checksum_info.finish() + self._record_checksum(signature_checksum_info, signature_tarinfo) + + signature.close() + + def _verify_binpkg(self, metadata_only=False): + """ + Verify current GPKG file. + """ + # Check file path + if self.gpkg_file is None: + raise FileNotFound("no gpkg file provided") + + # Check if is file + if not os.path.isfile(self.gpkg_file): + raise FileNotFound(f"File not found {self.gpkg_file}") + + # Check if is tar file + with open(self.gpkg_file, "rb") as container: + container_tar_format = self._get_tar_format(container) + if container_tar_format is None: + raise InvalidBinaryPackageFormat( + f"Cannot identify tar format: {self.gpkg_file}" + ) + + # Check container + with tarfile.open(self.gpkg_file, "r") as container: + try: + container_files = container.getnames() + except tarfile.ReadError: + raise InvalidBinaryPackageFormat( + f"Cannot read tar file: {self.gpkg_file}" + ) + + # Check gpkg header + if self.gpkg_version not in container_files: + raise InvalidBinaryPackageFormat(f"Invalid gpkg file: {self.gpkg_file}") + + # If any signature exists, we assume all files have signature. + if any(f.endswith(".sig") for f in container_files): + signature_exist = True + else: + signature_exist = False + + # Check if all files are unique to avoid same name attack + container_files_unique = [] + for f in container_files: + if f in container_files_unique: + raise InvalidBinaryPackageFormat( + "Duplicate file %s exist, potential attack?" % f + ) + container_files_unique.append(f) + + del container_files_unique + + # Add all files to check list + unverified_files = container_files.copy() + unverified_files.remove(self.gpkg_version) + + # Check Manifest file + if "Manifest" not in unverified_files: + raise MissingSignature(f"Manifest not found: {self.gpkg_file}") + + manifest_file = container.extractfile("Manifest") + manifest_data = manifest_file.read() + manifest_file.close() + + if b"-----BEGIN PGP SIGNATURE-----" in manifest_data: + signature_exist = True + + # Check Manifest signature if needed. + # binpkg-ignore-signature can override this. + if self.request_signature or signature_exist: + checksum_info = checksum_helper( + self.settings, gpg_operation=checksum_helper.VERIFY, detached=False + ) + + try: + checksum_info.update(manifest_data) + checksum_info.finish() + except (InvalidSignature, MissingSignature): + if self.verify_signature: + raise + + manifest_data = checksum_info.gpg_output + unverified_files.remove("Manifest") + else: + unverified_files.remove("Manifest") + + # Load manifest and create manifest check list + manifest = self._load_manifest(manifest_data.decode("UTF-8")) + unverified_manifest = manifest.copy() + + # Check all remaining files + for f in unverified_files.copy(): + if f.endswith(".sig"): + f_signature = None + else: + f_signature = f + ".sig" + + # Find current file manifest record + manifest_record = None + for m in manifest: + if m[1] == f: + manifest_record = m + + if manifest_record is None: + raise DigestException(f"{f} checksum not found in {self.gpkg_file}") + + if int(manifest_record[2]) != int(container.getmember(f).size): + raise DigestException( + f"{f} file size mismatched in {self.gpkg_file}" + ) + + # Ignore image file and signature if not needed + if os.path.basename(f).startswith("image") and metadata_only: + unverified_files.remove(f) + unverified_manifest.remove(manifest_record) + continue + + # Verify current file signature if needed + # binpkg-ignore-signature can override this. + if ( + (self.request_signature or signature_exist) + and self.verify_signature + and f_signature + ): + if f_signature in unverified_files: + signature_file = container.extractfile(f_signature) + signature = signature_file.read() + signature_file.close() + checksum_info = checksum_helper( + self.settings, + gpg_operation=checksum_helper.VERIFY, + signature=signature, + ) + else: + raise MissingSignature( + f"{f} signature not found in {self.gpkg_file}" + ) + else: + checksum_info = checksum_helper(self.settings) + + # Verify current file checksum + f_io = container.extractfile(f) + while True: + buffer = f_io.read(HASHING_BLOCKSIZE) + if buffer: + checksum_info.update(buffer) + else: + checksum_info.finish() + break + f_io.close() + + # At least one supported checksum must be checked + verified_hash_count = 0 + for c in checksum_info.libs: + try: + if ( + checksum_info.libs[c].hexdigest().lower() + == manifest_record[manifest_record.index(c) + 1].lower() + ): + verified_hash_count += 1 + else: + raise DigestException( + f"{f} checksum mismatched in {self.gpkg_file}" + ) + except KeyError: + # Checksum method not supported + pass + + if verified_hash_count < 1: + raise DigestException( + f"{f} no supported checksum found in {self.gpkg_file}" + ) + + # Current file verified + unverified_files.remove(f) + unverified_manifest.remove(manifest_record) + + # Check if any file IN Manifest but NOT IN binary package + if len(unverified_manifest) != 0: + raise DigestException( + f"Missing files: {str(unverified_manifest)} in {self.gpkg_file}" + ) + + # Check if any file NOT IN Manifest but IN binary package + if len(unverified_files) != 0: + raise DigestException( + f"Unknown files exists: {str(unverified_files)} in {self.gpkg_file}" + ) + + # Save current Manifest for other operations. + self.manifest_old = manifest.copy() + + def _generate_metadata_from_dir(self, metadata_dir): + """ + read all files in metadata_dir and return as dict + """ + metadata = {} + metadata_dir = normalize_path( + _unicode_decode(metadata_dir, encoding=_encodings["fs"], errors="strict") + ) + for parent, dirs, files in os.walk(metadata_dir): + for f in files: + try: + f = _unicode_decode(f, encoding=_encodings["fs"], errors="strict") + except UnicodeDecodeError: + continue + with open(os.path.join(parent, f), "rb") as metafile: + metadata[f] = metafile.read() + return metadata + + def _get_binary_cmd(self, compression, mode): + """ + get command list form portage and try match compressor + """ + if compression not in _compressors: + raise InvalidCompressionMethod(compression) + + compressor = _compressors[compression] + if mode not in compressor: + raise InvalidCompressionMethod("{}: {}".format(compression, mode)) + + cmd = shlex_split(varexpand(compressor[mode], mydict=self.settings)) + # Filter empty elements that make Popen fail + cmd = [x for x in cmd if x != ""] + + if (not cmd) and ((mode + "_alt") in compressor): + cmd = shlex_split( + varexpand(compressor[mode + "_alt"], mydict=self.settings) + ) + cmd = [x for x in cmd if x != ""] + + if not cmd: + raise CompressorNotFound(compression) + if not find_binary(cmd[0]): + raise CompressorNotFound(cmd[0]) + + return cmd + + def _get_compression_cmd(self, compression=None): + """ + return compression command for Popen + """ + if compression is None: + compression = self.compression + if compression is None: + return None + else: + return self._get_binary_cmd(compression, "compress") + + def _get_decompression_cmd(self, compression=None): + """ + return decompression command for Popen + """ + if compression is None: + compression = self.compression + if compression is None: + return None + else: + return self._get_binary_cmd(compression, "decompress") + + def _get_tar_format(self, fileobj): + """ + Try to detect tar version + """ + old_position = fileobj.tell() + fileobj.seek(0x101) + magic = fileobj.read(8) + fileobj.seek(0x9C) + typeflag = fileobj.read(1) + fileobj.seek(old_position) + + if magic == b"ustar \x00": + return tarfile.GNU_FORMAT + elif magic == b"ustar\x0000": + if typeflag == b"x" or typeflag == b"g": + return tarfile.PAX_FORMAT + else: + return tarfile.USTAR_FORMAT + + return None + + def _get_tar_format_from_stats( + self, + image_max_prefix_length, + image_max_name_length, + image_max_linkname_length, + image_max_file_size, + image_total_size, + ): + """ + Choose the corresponding tar format according to + the image information + """ + # Max possible size in UStar is 8 GiB (8589934591 bytes) + # stored in 11 octets + # Use 8000000000, just in case we need add something extra + + # Total size > 8 GiB, container need use GNU tar format + if image_total_size < 8000000000: + container_tar_format = tarfile.USTAR_FORMAT + else: + container_tar_format = tarfile.GNU_FORMAT + + # Image at least one file > 8 GiB, image need use GNU tar format + if image_max_file_size < 8000000000: + image_tar_format = tarfile.USTAR_FORMAT + else: + image_tar_format = tarfile.GNU_FORMAT + + # UStar support max 155 prefix length, 100 file name and 100 link name, + # ends with \x00. If any exceeded, failback to GNU format. + if image_max_prefix_length >= 155: + image_tar_format = tarfile.GNU_FORMAT + + if image_max_name_length >= 100: + image_tar_format = tarfile.GNU_FORMAT + + if image_max_linkname_length >= 100: + image_tar_format = tarfile.GNU_FORMAT + return container_tar_format, image_tar_format + + def _check_pre_image_files(self, root_dir, image_prefix="image"): + """ + Check the pre image files size and path, return the longest + path length, largest single file size, and total files size. + """ + image_prefix_length = len(image_prefix) + 1 + root_dir = os.path.join( + normalize_path( + _unicode_decode(root_dir, encoding=_encodings["fs"], errors="strict") + ), + "", + ) + root_dir_length = len( + _unicode_encode(root_dir, encoding=_encodings["fs"], errors="strict") + ) + + image_max_prefix_length = 0 + image_max_name_length = 0 + image_max_link_length = 0 + image_max_file_size = 0 + image_total_size = 0 + + for parent, dirs, files in os.walk(root_dir): + parent = _unicode_decode(parent, encoding=_encodings["fs"], errors="strict") + for d in dirs: + try: + d = _unicode_decode(d, encoding=_encodings["fs"], errors="strict") + except UnicodeDecodeError as err: + writemsg(colorize("BAD", "\n*** %s\n\n" % err), noiselevel=-1) + raise + + d = os.path.join(parent, d) + prefix_length = ( + len(_unicode_encode(d, encoding=_encodings["fs"], errors="strict")) + - root_dir_length + + image_prefix_length + ) + + if os.path.islink(d): + path_link = os.readlink(d) + path_link_length = len( + _unicode_encode( + path_link, encoding=_encodings["fs"], errors="strict" + ) + ) + image_max_link_length = max(image_max_link_length, path_link_length) + + image_max_prefix_length = max(image_max_prefix_length, prefix_length) + + for f in files: + try: + f = _unicode_decode(f, encoding=_encodings["fs"], errors="strict") + except UnicodeDecodeError as err: + writemsg(colorize("BAD", "\n*** %s\n\n" % err), noiselevel=-1) + raise + + filename_length = len( + _unicode_encode(f, encoding=_encodings["fs"], errors="strict") + ) + image_max_name_length = max(image_max_name_length, filename_length) + + f = os.path.join(parent, f) + path_length = ( + len(_unicode_encode(f, encoding=_encodings["fs"], errors="strict")) + - root_dir_length + + image_prefix_length + ) + + file_stat = os.lstat(f) + + if os.path.islink(f): + path_link = os.readlink(f) + path_link_length = len( + _unicode_encode( + path_link, encoding=_encodings["fs"], errors="strict" + ) + ) + elif file_stat.st_nlink > 1: + # Hardlink exists + path_link_length = path_length + else: + path_link_length = 0 + + image_max_link_length = max(image_max_link_length, path_link_length) + + try: + file_size = os.path.getsize(f) + except FileNotFoundError: + # Ignore file not found if symlink to non-existing file + if os.path.islink(f): + continue + else: + raise + image_total_size += file_size + image_max_file_size = max(image_max_file_size, file_size) + + return ( + image_max_prefix_length, + image_max_name_length, + image_max_link_length, + image_max_file_size, + image_total_size, + ) + + def _check_pre_quickpkg_files(self, contents, root): + """ + Check the pre quickpkg files size and path, return the longest + path length, largest single file size, and total files size. + """ + root_dir = os.path.join( + normalize_path( + _unicode_decode(root, encoding=_encodings["fs"], errors="strict") + ), + "", + ) + root_dir_length = len( + _unicode_encode(root_dir, encoding=_encodings["fs"], errors="strict") + ) + + image_max_prefix_length = 0 + image_max_name_length = 0 + image_max_link_length = 0 + image_max_file_size = 0 + image_total_size = 0 + + paths = list(contents) + for path in paths: + try: + path = _unicode_decode(path, encoding=_encodings["fs"], errors="strict") + except UnicodeDecodeError as err: + writemsg(colorize("BAD", "\n*** %s\n\n" % err), noiselevel=-1) + raise + + d, f = os.path.split(path) + + prefix_length = ( + len(_unicode_encode(d, encoding=_encodings["fs"], errors="strict")) + - root_dir_length + ) + image_max_prefix_length = max(image_max_prefix_length, prefix_length) + + filename_length = len( + _unicode_encode(f, encoding=_encodings["fs"], errors="strict") + ) + image_max_name_length = max(image_max_name_length, filename_length) + + path_length = ( + len(_unicode_encode(path, encoding=_encodings["fs"], errors="strict")) + - root_dir_length + ) + + file_stat = os.lstat(path) + + if os.path.islink(path): + path_link = os.readlink(path) + path_link_length = len( + _unicode_encode( + path_link, encoding=_encodings["fs"], errors="strict" + ) + ) + elif file_stat.st_nlink > 1: + # Hardlink exists + path_link_length = path_length + else: + path_link_length = 0 + + image_max_link_length = max(image_max_link_length, path_link_length) + + if os.path.isfile(path): + try: + file_size = os.path.getsize(path) + except FileNotFoundError: + # Ignore file not found if symlink to non-existing file + if os.path.islink(path): + continue + else: + raise + image_total_size += file_size + if file_size > image_max_file_size: + image_max_file_size = file_size + + return ( + image_max_prefix_length, + image_max_name_length, + image_max_link_length, + image_max_file_size, + image_total_size, + ) + + def _create_tarinfo(self, file_name): + """ + Create new tarinfo for the new file + """ + if self.compression is None: + ext = "" + elif self.compression in self.ext_list: + ext = self.ext_list[self.compression] + else: + raise InvalidCompressionMethod(self.compression) + + data_tarinfo = tarfile.TarInfo( + os.path.join(self.base_name, file_name + ".tar" + ext) + ) + return data_tarinfo + + def _extract_filename_compression(self, file_name): + """ + Extract the file basename and compression method + """ + file_name = os.path.basename(file_name) + if file_name.endswith(".tar"): + return file_name[:-4], None + + for compression in self.ext_list: + if file_name.endswith(".tar" + self.ext_list[compression]): + return ( + file_name[: -len(".tar" + self.ext_list[compression])], + compression, + ) + + raise InvalidCompressionMethod(file_name) + + def _get_inner_tarinfo(self, tar, file_name): + """ + Get inner tarinfo from given container. + Will try get file_name from correct basename first, + if it fail, try any file that have same name as file_name, and + return the first one. + """ + if self.gpkg_version not in tar.getnames(): + raise InvalidBinaryPackageFormat("Invalid gpkg file.") + + # Try get file with correct basename + inner_tarinfo = None + if self.base_name is None: + base_name = "" + else: + base_name = self.base_name + all_files = tar.getmembers() + for f in all_files: + if os.path.dirname(f.name) == base_name: + try: + f_name, f_comp = self._extract_filename_compression(f.name) + except InvalidCompressionMethod: + continue + + if f_name == file_name: + return f, f_comp + + # If failed, try get any file name matched + if inner_tarinfo is None: + for f in all_files: + try: + f_name, f_comp = self._extract_filename_compression(f.name) + except InvalidCompressionMethod: + continue + if f_name == file_name: + if self.base_name is not None: + writemsg( + colorize( + "WARN", "Package basename mismatched, using " + f.name + ) + ) + self.base_name_alt = os.path.dirname(f.name) + return f, f_comp + + # Not found + raise FileNotFound(f"File Not found: {file_name}") diff --git a/lib/portage/package/ebuild/_config/special_env_vars.py b/lib/portage/package/ebuild/_config/special_env_vars.py index 06ae3aa39..9b2d77aea 100644 --- a/lib/portage/package/ebuild/_config/special_env_vars.py +++ b/lib/portage/package/ebuild/_config/special_env_vars.py @@ -86,6 +86,7 @@ environ_whitelist += [ "ACCEPT_LICENSE", "BASH_ENV", "BASH_FUNC____in_portage_iuse%%", + "BINPKG_FORMAT", "BROOT", "BUILD_PREFIX", "COLUMNS", diff --git a/lib/portage/package/ebuild/config.py b/lib/portage/package/ebuild/config.py index b4d6862a3..8fe51784e 100644 --- a/lib/portage/package/ebuild/config.py +++ b/lib/portage/package/ebuild/config.py @@ -41,6 +41,7 @@ from portage.const import ( PORTAGE_BASE_PATH, PRIVATE_PATH, PROFILE_PATH, + SUPPORTED_GENTOO_BINPKG_FORMATS, USER_CONFIG_PATH, USER_VIRTUALS_FILE, ) @@ -1513,6 +1514,15 @@ class config: noiselevel=-1, ) + binpkg_format = self.get("BINPKG_FORMAT") + if binpkg_format: + if binpkg_format not in SUPPORTED_GENTOO_BINPKG_FORMATS: + writemsg( + "!!! BINPKG_FORMAT contains invalid or " + "unsupported format: %s" % binpkg_fotmat, + noiselevel=-1, + ) + binpkg_compression = self.get("BINPKG_COMPRESS") if binpkg_compression: try: diff --git a/lib/portage/package/ebuild/doebuild.py b/lib/portage/package/ebuild/doebuild.py index ac627f555..eb719b1ea 100644 --- a/lib/portage/package/ebuild/doebuild.py +++ b/lib/portage/package/ebuild/doebuild.py @@ -66,6 +66,7 @@ from portage.const import ( INVALID_ENV_FILE, MISC_SH_BINARY, PORTAGE_PYM_PACKAGES, + SUPPORTED_GENTOO_BINPKG_FORMATS, ) from portage.data import portage_gid, portage_uid, secpass, uid, userpriv_groups from portage.dbapi.porttree import _parse_uri_map @@ -649,6 +650,18 @@ def doebuild_environment( mysettings["KV"] = "" mysettings.backup_changes("KV") + binpkg_format = mysettings.get( + "BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0] + ) + if binpkg_format not in portage.const.SUPPORTED_GENTOO_BINPKG_FORMATS: + writemsg( + "!!! BINPKG_FORMAT contains invalid or " + "unsupported format: %s" % binpkg_fotmat, + noiselevel=-1, + ) + binpkg_format = "xpak" + mysettings["BINPKG_FORMAT"] = binpkg_format + binpkg_compression = mysettings.get("BINPKG_COMPRESS", "bzip2") try: compression = _compressors[binpkg_compression] diff --git a/lib/portage/tests/.gnupg/openpgp-revocs.d/06B3A311BD775C280D22A9305D90EA06352177F6.rev b/lib/portage/tests/.gnupg/openpgp-revocs.d/06B3A311BD775C280D22A9305D90EA06352177F6.rev new file mode 100644 index 000000000..a6752fd30 --- /dev/null +++ b/lib/portage/tests/.gnupg/openpgp-revocs.d/06B3A311BD775C280D22A9305D90EA06352177F6.rev @@ -0,0 +1,37 @@ +This is a revocation certificate for the OpenPGP key: + +pub rsa4096 2020-07-14 [S] + 06B3A311BD775C280D22A9305D90EA06352177F6 +uid Gentoo Portage Test Trusted Key (Test Only, Do NOT Trust!!!) (Gentoo Test Key) <test@example.org> + +A revocation certificate is a kind of "kill switch" to publicly +declare that a key shall not anymore be used. It is not possible +to retract such a revocation certificate once it has been published. + +Use it to revoke this key in case of a compromise or loss of +the secret key. However, if the secret key is still accessible, +it is better to generate a new revocation certificate and give +a reason for the revocation. For details see the description of +of the gpg command "--generate-revocation" in the GnuPG manual. + +To avoid an accidental use of this file, a colon has been inserted +before the 5 dashes below. Remove this colon with a text editor +before importing and publishing this revocation certificate. + +:-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: This is a revocation certificate + +iQI2BCABCAAgFiEEBrOjEb13XCgNIqkwXZDqBjUhd/YFAl8OFTwCHQAACgkQXZDq +BjUhd/aXCA/+OgzosMDaDe5DNwkSi2yKdC2X18v8JcaYnXBUR93nXA0LVN7iVWkR +WEH3NuVspQZ5vK+3AHTKabqZFC/buA5oQOH01Ncd4lQISfOOhFiBn5DIPX31BVT0 +iPmVkcxHAD4031ptP4oat6EFclT13SRchtlnAO04JofeHnzQIw3SozQGzXpAA1g4 +BogQ0HWA88HzuEYYE+e/yzZL4D496X1DTaXksg0Py5c4SS6u5pND6lcUtAGxAwa9 +sJFPs+coeURaRV99CrJfdh4u2OkvINTfrKOS6NFBQq6HVH5mLsRXZlcE4Oo4d+fN +XoPrTZnRUqpJADUdjHFvO/lr0fArJTS5IQCVBNFeCMlvgmUPeKWJ1r6Uiwe/UHor +9OP/tK97EqpsaXmHbo0jOUkn5iiUwy784+JBSSu/Q2NxqcBr74aaRdfxvs62dmv7 +droCDQi3ebqTdnlDSaeCIWHyVlSroOhZ+ZETVy193K1X7VXFX3hYKiJ3G8QZwy3e +AlsVGjIHWfC+K+enIn+uwSUvOWPN3upK8kqMRuXvAOppFCE4sTqNbxUnHHXaqo/r +s1q6zVsWVILBk97BHlJph2IaqhV7iIgPU97/r4U/BT11VqDFdVSHcXcs4PDNs5vh +6qttaDiyDqZjwMr+0iDoouHxFpqY8e+3M2gycUgGr2XV6ML0pXE6BqA= +=nIjC +-----END PGP PUBLIC KEY BLOCK----- diff --git a/lib/portage/tests/.gnupg/openpgp-revocs.d/8DEDA2CDED49C8809287B89D8812797DDF1DD192.rev b/lib/portage/tests/.gnupg/openpgp-revocs.d/8DEDA2CDED49C8809287B89D8812797DDF1DD192.rev new file mode 100644 index 000000000..456e0aa50 --- /dev/null +++ b/lib/portage/tests/.gnupg/openpgp-revocs.d/8DEDA2CDED49C8809287B89D8812797DDF1DD192.rev @@ -0,0 +1,37 @@ +This is a revocation certificate for the OpenPGP key: + +pub rsa4096 2020-07-14 [S] + 8DEDA2CDED49C8809287B89D8812797DDF1DD192 +uid Gentoo Portage Test Untrusted Key (Test Only, Do NOT Trust!!!) (Gentoo Test Key) <test@example.org> + +A revocation certificate is a kind of "kill switch" to publicly +declare that a key shall not anymore be used. It is not possible +to retract such a revocation certificate once it has been published. + +Use it to revoke this key in case of a compromise or loss of +the secret key. However, if the secret key is still accessible, +it is better to generate a new revocation certificate and give +a reason for the revocation. For details see the description of +of the gpg command "--generate-revocation" in the GnuPG manual. + +To avoid an accidental use of this file, a colon has been inserted +before the 5 dashes below. Remove this colon with a text editor +before importing and publishing this revocation certificate. + +:-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: This is a revocation certificate + +iQI2BCABCAAgFiEEje2ize1JyICSh7idiBJ5fd8d0ZIFAl8OFXUCHQAACgkQiBJ5 +fd8d0ZKdwxAAhmkC0V+OLyOU9PCV6ogD9/3b3nVqNIreoc+gxHTLmEvxiMSItqmq +DkcW9RJKAduA/HiLZQ8Yzxw+ldC6kuWqYEjNpSM54VDkrgOePi8W1bVDTCoSp7bo +0JOG4frieqIxA6lhAA2UppH7EPRXoODPLYqooNxWAs3xxVrR6eGAb5l8NXzrymvN +acFfOZ0s5FgADQskQHWVq6TaJn9DrcZxd+b+plSwPYDXqzTChKQ5jw7uMAPUvDkG +JUWgoKiKSrK64bslUq8aEDEZQ4uxjyEi6G0vO/wPL/ysGhS7KkPgCZsEfNjWjajb +jAsdvl1raoHxK/O7llMNr9uRAZtC56pJ//SRDc3kylZrkAo0RNoXQFowT739HWei +2UkCFDfz488VKKrOI8TzTyUvLFEo14ZAXGg1wdHaGnbYMzxpKjP15alOFo6fKIcS +Kz1f/Mab4wf4Sg0XAjQ9pnai1/U9ZF3/NSnRtYgJkLCrIEtRLrgSHJsLDPxjCfGV +jWszAbIk167aA0yKsSmuwkpc5bZqqBaTo904r857fxyt5Les6SOHsV7iNXt7F+am +03Y6u6m2eROba7M67l115vTyYcw5EZVp5j0nI81PXsC9X2DD1ci5xrNmPyEeupC4 +7y7mcGbUYPJAJHJ0kHG4ZYLnNMl42ZYr1ssEeasDwUsLWgVqvx9RkKI= +=kVUQ +-----END PGP PUBLIC KEY BLOCK----- diff --git a/lib/portage/tests/.gnupg/private-keys-v1.d/273B030399E7BEA66A9AD42216DE7CA17BA5D42E.key b/lib/portage/tests/.gnupg/private-keys-v1.d/273B030399E7BEA66A9AD42216DE7CA17BA5D42E.key Binary files differnew file mode 100644 index 000000000..0bd1026ad --- /dev/null +++ b/lib/portage/tests/.gnupg/private-keys-v1.d/273B030399E7BEA66A9AD42216DE7CA17BA5D42E.key diff --git a/lib/portage/tests/.gnupg/private-keys-v1.d/C99796FB85B0C3DF03314A11B5850C51167D6282.key b/lib/portage/tests/.gnupg/private-keys-v1.d/C99796FB85B0C3DF03314A11B5850C51167D6282.key Binary files differnew file mode 100644 index 000000000..8e29ef43c --- /dev/null +++ b/lib/portage/tests/.gnupg/private-keys-v1.d/C99796FB85B0C3DF03314A11B5850C51167D6282.key diff --git a/lib/portage/tests/.gnupg/pubring.kbx b/lib/portage/tests/.gnupg/pubring.kbx Binary files differnew file mode 100644 index 000000000..f6367f83b --- /dev/null +++ b/lib/portage/tests/.gnupg/pubring.kbx diff --git a/lib/portage/tests/.gnupg/trustdb.gpg b/lib/portage/tests/.gnupg/trustdb.gpg Binary files differnew file mode 100644 index 000000000..db5b1023b --- /dev/null +++ b/lib/portage/tests/.gnupg/trustdb.gpg diff --git a/lib/portage/tests/__init__.py b/lib/portage/tests/__init__.py index 02d9c4932..f74f992d7 100644 --- a/lib/portage/tests/__init__.py +++ b/lib/portage/tests/__init__.py @@ -14,6 +14,7 @@ import portage from portage import os from portage import _encodings from portage import _unicode_decode +from portage.output import colorize from portage.proxy.objectproxy import ObjectProxy @@ -184,19 +185,43 @@ class TextTestResult(_TextTestResult): self.todoed = [] self.portage_skipped = [] + def addSuccess(self, test): + super(_TextTestResult, self).addSuccess(test) + if self.showAll: + self.stream.writeln(colorize("GOOD", "ok")) + elif self.dots: + self.stream.write(colorize("GOOD", ".")) + self.stream.flush() + + def addError(self, test, err): + super(_TextTestResult, self).addError(test, err) + if self.showAll: + self.stream.writeln(colorize("BAD", "ERROR")) + elif self.dots: + self.stream.write(colorize("HILITE", "E")) + self.stream.flush() + + def addFailure(self, test, err): + super(_TextTestResult, self).addFailure(test, err) + if self.showAll: + self.stream.writeln(colorize("BAD", "FAIL")) + elif self.dots: + self.stream.write(colorize("BAD", "F")) + self.stream.flush() + def addTodo(self, test, info): self.todoed.append((test, info)) if self.showAll: - self.stream.writeln("TODO") + self.stream.writeln(colorize("BRACKET", "TODO")) elif self.dots: - self.stream.write(".") + self.stream.write(colorize("BRACKET", ".")) def addPortageSkip(self, test, info): self.portage_skipped.append((test, info)) if self.showAll: - self.stream.writeln("SKIP") + self.stream.writeln(colorize("WARN", "SKIP")) elif self.dots: - self.stream.write(".") + self.stream.write(colorize("WARN", ".")) def printErrors(self): if self.dots or self.showAll: @@ -331,7 +356,7 @@ class TextTestRunner(unittest.TextTestRunner): ) self.stream.writeln() if not result.wasSuccessful(): - self.stream.write("FAILED (") + self.stream.write(colorize("BAD", "FAILED") + " (") failed = len(result.failures) errored = len(result.errors) if failed: @@ -342,7 +367,7 @@ class TextTestRunner(unittest.TextTestRunner): self.stream.write("errors=%d" % errored) self.stream.writeln(")") else: - self.stream.writeln("OK") + self.stream.writeln(colorize("GOOD", "OK")) return result diff --git a/lib/portage/tests/emerge/test_simple.py b/lib/portage/tests/emerge/test_simple.py index 3a8bf3764..6d0dae0dd 100644 --- a/lib/portage/tests/emerge/test_simple.py +++ b/lib/portage/tests/emerge/test_simple.py @@ -3,6 +3,7 @@ import argparse import subprocess +import sys import portage from portage import shutil, os @@ -11,6 +12,7 @@ from portage.const import ( BINREPOS_CONF_FILE, PORTAGE_PYM_PATH, USER_CONFIG_PATH, + SUPPORTED_GENTOO_BINPKG_FORMATS, ) from portage.cache.mappings import Mapping from portage.process import find_binary @@ -19,6 +21,7 @@ from portage.tests.resolver.ResolverPlayground import ResolverPlayground from portage.tests.util.test_socks5 import AsyncHTTPServer from portage.util import ensure_dirs, find_updated_config_files, shlex_split from portage.util.futures import asyncio +from portage.output import colorize class BinhostContentMap(Mapping): @@ -223,17 +226,28 @@ call_has_and_best_version() { ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, installed=installed, debug=debug - ) + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + installed=installed, + debug=debug, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) - loop = asyncio._wrap_loop() - loop.run_until_complete( - asyncio.ensure_future( - self._async_test_simple(playground, metadata_xml_files, loop=loop), - loop=loop, - ) - ) + loop = asyncio._wrap_loop() + loop.run_until_complete( + asyncio.ensure_future( + self._async_test_simple( + playground, metadata_xml_files, loop=loop + ), + loop=loop, + ) + ) async def _async_test_simple(self, playground, metadata_xml_files, loop): @@ -327,6 +341,15 @@ call_has_and_best_version() { path=binhost_remote_path, ) + binpkg_format = settings.get( + "BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0] + ) + self.assertIn(binpkg_format, ("xpak", "gpkg")) + if binpkg_format == "xpak": + foo_filename = "foo-0-1.xpak" + elif binpkg_format == "gpkg": + foo_filename = "foo-0-1.gpkg.tar" + test_commands = () if hasattr(argparse.ArgumentParser, "parse_intermixed_args"): @@ -387,13 +410,13 @@ call_has_and_best_version() { rm_cmd + ("-rf", cachedir), emerge_cmd + ("--oneshot", "virtual/foo"), lambda: self.assertFalse( - os.path.exists(os.path.join(pkgdir, "virtual", "foo", "foo-0-1.xpak")) + os.path.exists(os.path.join(pkgdir, "virtual", "foo", foo_filename)) ), ({"FEATURES": "unmerge-backup"},) + emerge_cmd + ("--unmerge", "virtual/foo"), lambda: self.assertTrue( - os.path.exists(os.path.join(pkgdir, "virtual", "foo", "foo-0-1.xpak")) + os.path.exists(os.path.join(pkgdir, "virtual", "foo", foo_filename)) ), emerge_cmd + ("--pretend", "dev-libs/A"), ebuild_cmd + (test_ebuild, "manifest", "clean", "package", "merge"), diff --git a/lib/portage/tests/gpkg/__init__.py b/lib/portage/tests/gpkg/__init__.py new file mode 100644 index 000000000..532918b6a --- /dev/null +++ b/lib/portage/tests/gpkg/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2011 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 diff --git a/lib/portage/tests/gpkg/__test__.py b/lib/portage/tests/gpkg/__test__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/lib/portage/tests/gpkg/__test__.py diff --git a/lib/portage/tests/gpkg/test_gpkg_checksum.py b/lib/portage/tests/gpkg/test_gpkg_checksum.py new file mode 100644 index 000000000..c78045c34 --- /dev/null +++ b/lib/portage/tests/gpkg/test_gpkg_checksum.py @@ -0,0 +1,396 @@ +# Copright Gentoo Foundation 2006-2020 +# Portage Unit Testing Functionality + +import io +import sys +import tarfile +import tempfile +from os import urandom + +from portage import os +from portage import shutil +from portage.tests import TestCase +from portage.tests.resolver.ResolverPlayground import ResolverPlayground +from portage.gpkg import gpkg +from portage.exception import ( + InvalidBinaryPackageFormat, + DigestException, + MissingSignature, +) + + +class test_gpkg_checksum_case(TestCase): + def test_gpkg_missing_header(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} -binpkg-signing ' + '-binpkg-request-signature -gpg-keepalive"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(1048576) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + if f.name != binpkg_1.gpkg_version: + tar_2.addfile(f, tar_1.extractfile(f)) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + + self.assertRaises( + InvalidBinaryPackageFormat, + binpkg_2.decompress, + os.path.join(tmpdir, "test"), + ) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_missing_manifest(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} -binpkg-signing ' + '-binpkg-request-signature -gpg-keepalive"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(1048576) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + if f.name != "Manifest": + tar_2.addfile(f, tar_1.extractfile(f)) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + + self.assertRaises( + MissingSignature, binpkg_2.decompress, os.path.join(tmpdir, "test") + ) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_missing_files(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} -binpkg-signing ' + '-binpkg-request-signature -gpg-keepalive"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(1048576) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + data = urandom(1048576) + with open(os.path.join(orig_full_path, "data2"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + if "image.tar" not in f.name: + tar_2.addfile(f, tar_1.extractfile(f)) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + + self.assertRaises( + DigestException, binpkg_2.decompress, os.path.join(tmpdir, "test") + ) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_extra_files(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} -binpkg-signing ' + '-binpkg-request-signature -gpg-keepalive"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(1048576) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + tar_2.addfile(f, tar_1.extractfile(f)) + data_tarinfo = tarfile.TarInfo("data2") + data_tarinfo.size = len(data) + data2 = io.BytesIO(data) + tar_2.addfile(data_tarinfo, data2) + data2.close() + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + + self.assertRaises( + DigestException, binpkg_2.decompress, os.path.join(tmpdir, "test") + ) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_incorrect_checksum(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} -binpkg-signing ' + '-binpkg-request-signature -gpg-keepalive"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(1048576) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + if f.name == "Manifest": + data = io.BytesIO(tar_1.extractfile(f).read()) + data_view = data.getbuffer() + data_view[-16:] = b"20a6d80ab0320fh9" + del data_view + tar_2.addfile(f, data) + data.close() + else: + tar_2.addfile(f, tar_1.extractfile(f)) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + + self.assertRaises( + DigestException, binpkg_2.decompress, os.path.join(tmpdir, "test") + ) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_duplicate_files(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} -binpkg-signing ' + '-binpkg-request-signature -gpg-keepalive"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(100) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + tar_2.addfile(f, tar_1.extractfile(f)) + tar_2.addfile(f, tar_1.extractfile(f)) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + + self.assertRaises( + InvalidBinaryPackageFormat, + binpkg_2.decompress, + os.path.join(tmpdir, "test"), + ) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_manifest_duplicate_files(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} -binpkg-signing ' + '-binpkg-request-signature -gpg-keepalive"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(100) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + if f.name == "Manifest": + manifest = tar_1.extractfile(f).read() + data = io.BytesIO(manifest) + data.seek(io.SEEK_END) + data.write(b"\n") + data.write(manifest) + f.size = data.tell() + data.seek(0) + tar_2.addfile(f, data) + data.close() + else: + tar_2.addfile(f, tar_1.extractfile(f)) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + + self.assertRaises( + DigestException, binpkg_2.decompress, os.path.join(tmpdir, "test") + ) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_different_size_file(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} -binpkg-signing ' + '-binpkg-request-signature -gpg-keepalive"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(100) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + tar_2.addfile(f, tar_1.extractfile(f)) + tar_2.addfile(f, tar_1.extractfile(f)) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + + self.assertRaises( + InvalidBinaryPackageFormat, + binpkg_2.decompress, + os.path.join(tmpdir, "test"), + ) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() diff --git a/lib/portage/tests/gpkg/test_gpkg_gpg.py b/lib/portage/tests/gpkg/test_gpkg_gpg.py new file mode 100644 index 000000000..906608dff --- /dev/null +++ b/lib/portage/tests/gpkg/test_gpkg_gpg.py @@ -0,0 +1,398 @@ +# Copright Gentoo Foundation 2006-2020 +# Portage Unit Testing Functionality + +import io +import sys +import tarfile +import tempfile +from os import urandom + +from portage import os +from portage import shutil +from portage.tests import TestCase +from portage.tests.resolver.ResolverPlayground import ResolverPlayground +from portage.gpkg import gpkg +from portage.gpg import GPG +from portage.exception import MissingSignature, InvalidSignature + + +class test_gpkg_gpg_case(TestCase): + def test_gpkg_missing_manifest_signature(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} binpkg-signing ' 'binpkg-request-signature"', + 'BINPKG_FORMAT="gpkg"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + gpg = GPG(settings) + gpg.unlock() + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(1048576) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + if f.name == "Manifest": + manifest = tar_1.extractfile(f).read().decode("UTF-8") + manifest = manifest.replace( + "-----BEGIN PGP SIGNATURE-----", "" + ) + manifest = manifest.replace( + "-----END PGP SIGNATURE-----", "" + ) + manifest_data = io.BytesIO(manifest.encode("UTF-8")) + manifest_data.seek(0, io.SEEK_END) + f.size = manifest_data.tell() + manifest_data.seek(0) + tar_2.addfile(f, manifest_data) + else: + tar_2.addfile(f, tar_1.extractfile(f)) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + + self.assertRaises( + InvalidSignature, binpkg_2.decompress, os.path.join(tmpdir, "test") + ) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_missing_signature(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} binpkg-signing ' 'binpkg-request-signature"', + 'BINPKG_FORMAT="gpkg"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + gpg = GPG(settings) + gpg.unlock() + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(1048576) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + if f.name.endswith(".sig"): + pass + else: + tar_2.addfile(f, tar_1.extractfile(f)) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + self.assertRaises( + MissingSignature, binpkg_2.decompress, os.path.join(tmpdir, "test") + ) + + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_ignore_signature(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} binpkg-signing ' 'binpkg-ignore-signature"', + 'BINPKG_FORMAT="gpkg"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + gpg = GPG(settings) + gpg.unlock() + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(1048576) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + if f.name == "Manifest.sig": + pass + else: + tar_2.addfile(f, tar_1.extractfile(f)) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + binpkg_2.decompress(os.path.join(tmpdir, "test")) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_auto_use_signature(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} binpkg-signing ' + '-binpkg-request-signature"', + 'BINPKG_FORMAT="gpkg"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + gpg = GPG(settings) + gpg.unlock() + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(1048576) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + if f.name.endswith(".sig"): + pass + else: + tar_2.addfile(f, tar_1.extractfile(f)) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + self.assertRaises( + MissingSignature, binpkg_2.decompress, os.path.join(tmpdir, "test") + ) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_invalid_signature(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} binpkg-signing ' 'binpkg-request-signature"', + 'BINPKG_FORMAT="gpkg"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + gpg = GPG(settings) + gpg.unlock() + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(1048576) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + if f.name == "Manifest": + sig = b""" +-----BEGIN PGP SIGNED MESSAGE----- +Hash: SHA512 + +DATA test/image.tar.zst 1049649 BLAKE2B 3112adba9c09023962f26d9dcbf8e74107c05220f2f29aa2ce894f8a4104c3bb238f87095df73735befcf1e1f6039fc3abf4defa87e68ce80f33dd01e09c055a SHA512 9f584727f2e20a50a30e0077b94082c8c1f517ebfc9978eb3281887e24458108e73d1a2ce82eb0b59f5df7181597e4b0a297ae68bbfb36763aa052e6bdbf2c59 +DATA test/image.tar.zst.sig 833 BLAKE2B 214724ae4ff9198879c8c960fd8167632e27982c2278bb873f195abe75b75afa1ebed4c37ec696f5f5bc35c3a1184b60e0b50d56695b072b254f730db01eddb5 SHA512 67316187da8bb6b7a5f9dc6a42ed5c7d72c6184483a97f23c0bebd8b187ac9268e0409eb233c935101606768718c99eaa5699037d6a68c2d88c9ed5331a3f73c +-----BEGIN PGP SIGNATURE----- + +iQIzBAEBCgAdFiEEBrOjEb13XCgNIqkwXZDqBjUhd/YFAmFazXEACgkQXZDqBjUh +d/YFZA//eiXkYAS2NKxim6Ppr1HcZdjU1f6H+zyQzC7OdPkAh7wsVXpSr1aq+giD +G4tNtI6nsFokpA5CMhDf+ffBofKmFY5plk9zyQHr43N/RS5G6pcb2LHk0mQqgIdB +EsZRRD75Na4uGDWjuNHRmsasPTsc9qyW7FLckjwUsVmk9foAoiLYYaTsilsEGqXD +Bl/Z6PaQXvdd8txbcP6dOXfhVT06b+RWcnHI06KQrmFkZjZQh/7bCIeCVwNbXr7d +Obo8SVzCrQbTONei57AkyuRfnPqBfP61k8rQtcDUmCckQQfyaRwoW2nDIewOPfIH +xfvM137to2GEI2RR1TpWmGfu3iQzgC71f4svdX9Tyi5N7aFmfud7LZs6/Un3IdVk +ZH9/AmRzeH6hKllqSv/6WuhjsTNvr0bOzGbskkhqlLga2tml08gHFYOMWRJb/bRz +N8FZMhHzFoc0hsG8SU9uC+OeW+y5NdqpbRnQwgABmAiKEpgAPnABTsr0HjyxvjY+ +uCUdvMMHvnTxTjNEZ3Q+UQ2VsSoZzPbW9Y4PuM0XxxmTI8htdn4uIhy9dLNPsJmB +eTE8aov/1uKq9VMsYC8wcx5vLMaR7/O/9XstP+r6PaZwiLlyrKHGexV4O52sj6LC +qGAN3VUF+8EsdcsV781H0F86PANhyBgEYTGDrnItTGe3/vAPjCo= +=S/Vn +-----END PGP SIGNATURE----- +""" + data = io.BytesIO(sig) + f.size = len(sig) + tar_2.addfile(f, data) + data.close() + else: + tar_2.addfile(f, tar_1.extractfile(f)) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + self.assertRaises( + InvalidSignature, binpkg_2.decompress, os.path.join(tmpdir, "test") + ) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_untrusted_signature(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + gpg_test_path = os.environ["PORTAGE_GNUPGHOME"] + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} binpkg-signing ' 'binpkg-request-signature"', + 'BINPKG_FORMAT="gpkg"', + f'BINPKG_GPG_SIGNING_BASE_COMMAND="flock {gpg_test_path}/portage-binpkg-gpg.lock /usr/bin/gpg --sign --armor --batch --no-tty --yes --pinentry-mode loopback --passphrase GentooTest [PORTAGE_CONFIG]"', + 'BINPKG_GPG_SIGNING_DIGEST="SHA512"', + f'BINPKG_GPG_SIGNING_GPG_HOME="{gpg_test_path}"', + 'BINPKG_GPG_SIGNING_KEY="0x8812797DDF1DD192"', + 'BINPKG_GPG_VERIFY_BASE_COMMAND="/usr/bin/gpg --verify --batch --no-tty --yes --no-auto-check-trustdb --status-fd 1 [PORTAGE_CONFIG] [SIGNATURE]"', + f'BINPKG_GPG_VERIFY_GPG_HOME="{gpg_test_path}"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + gpg = GPG(settings) + gpg.unlock() + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(1048576) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + self.assertRaises( + InvalidSignature, binpkg_2.decompress, os.path.join(tmpdir, "test") + ) + + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_unknown_signature(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'FEATURES="${FEATURES} binpkg-signing ' 'binpkg-request-signature"', + 'BINPKG_FORMAT="gpkg"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + gpg = GPG(settings) + gpg.unlock() + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + data = urandom(1048576) + with open(os.path.join(orig_full_path, "data"), "wb") as f: + f.write(data) + + binpkg_1 = gpkg(settings, "test", os.path.join(tmpdir, "test-1.gpkg.tar")) + binpkg_1.compress(orig_full_path, {}) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + if f.name == "Manifest": + sig = b""" +-----BEGIN PGP SIGNED MESSAGE----- +Hash: SHA256 + + +DATA test/image.tar.zst 1049649 BLAKE2B 3112adba9c09023962f26d9dcbf8e74107c05220f2f29aa2ce894f8a4104c3bb238f87095df73735befcf1e1f6039fc3abf4defa87e68ce80f33dd01e09c055a SHA512 9f584727f2e20a50a30e0077b94082c8c1f517ebfc9978eb3281887e24458108e73d1a2ce82eb0b59f5df7181597e4b0a297ae68bbfb36763aa052e6bdbf2c59 +DATA test/image.tar.zst.sig 833 BLAKE2B 214724ae4ff9198879c8c960fd8167632e27982c2278bb873f195abe75b75afa1ebed4c37ec696f5f5bc35c3a1184b60e0b50d56695b072b254f730db01eddb5 SHA512 67316187da8bb6b7a5f9dc6a42ed5c7d72c6184483a97f23c0bebd8b187ac9268e0409eb233c935101606768718c99eaa5699037d6a68c2d88c9ed5331a3f73c +-----BEGIN PGP SIGNATURE----- + +iNUEARYIAH0WIQSMe+CQzU+/D/DeMitA3PGOlxUHlQUCYVrQal8UgAAAAAAuAChp +c3N1ZXItZnByQG5vdGF0aW9ucy5vcGVucGdwLmZpZnRoaG9yc2VtYW4ubmV0OEM3 +QkUwOTBDRDRGQkYwRkYwREUzMjJCNDBEQ0YxOEU5NzE1MDc5NQAKCRBA3PGOlxUH +lbmTAP4jdhMTW6g550/t0V7XcixqVtBockOTln8hZrZIQrjAJAD/caDkxgz5Xl8C +EP1pgSXXGtlUnv6akg/wueFJKEr9KQs= +=edEg +-----END PGP SIGNATURE----- +""" + data = io.BytesIO(sig) + f.size = len(sig) + tar_2.addfile(f, data) + data.close() + else: + tar_2.addfile(f, tar_1.extractfile(f)) + + binpkg_2 = gpkg(settings, "test", os.path.join(tmpdir, "test-2.gpkg.tar")) + self.assertRaises( + InvalidSignature, binpkg_2.decompress, os.path.join(tmpdir, "test") + ) + + finally: + shutil.rmtree(tmpdir) + playground.cleanup() diff --git a/lib/portage/tests/gpkg/test_gpkg_metadata_update.py b/lib/portage/tests/gpkg/test_gpkg_metadata_update.py new file mode 100644 index 000000000..2d5d35a98 --- /dev/null +++ b/lib/portage/tests/gpkg/test_gpkg_metadata_update.py @@ -0,0 +1,59 @@ +# Copright Gentoo Foundation 2006-2020 +# Portage Unit Testing Functionality + +import tempfile +import sys +from os import urandom + +from portage import os +from portage import shutil +from portage.util._compare_files import compare_files +from portage.tests import TestCase +from portage.tests.resolver.ResolverPlayground import ResolverPlayground +from portage.gpkg import gpkg + + +class test_gpkg_metadata_case(TestCase): + def test_gpkg_update_metadata(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + playground = ResolverPlayground( + user_config={ + "make.conf": ('BINPKG_COMPRESS="gzip"',), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + with open(os.path.join(orig_full_path, "test"), "wb") as test_file: + test_file.write(urandom(1048576)) + + gpkg_file_loc = os.path.join(tmpdir, "test.gpkg.tar") + test_gpkg = gpkg(settings, "test", gpkg_file_loc) + + meta = {"test1": b"1234567890", "test2": b"abcdef"} + + test_gpkg.compress(os.path.join(tmpdir, "orig"), meta) + + meta_result = test_gpkg.get_metadata() + self.assertEqual(meta, meta_result) + + meta_new = {"test3": b"0987654321", "test4": b"XXXXXXXX"} + test_gpkg.update_metadata(meta_new) + + meta_result = test_gpkg.get_metadata() + self.assertEqual(meta_new, meta_result) + + test_gpkg.decompress(os.path.join(tmpdir, "test")) + r = compare_files( + os.path.join(tmpdir, "orig/" + "test"), + os.path.join(tmpdir, "test/" + "test"), + skipped_types=("atime", "mtime", "ctime"), + ) + self.assertEqual(r, ()) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() diff --git a/lib/portage/tests/gpkg/test_gpkg_metadata_url.py b/lib/portage/tests/gpkg/test_gpkg_metadata_url.py new file mode 100644 index 000000000..08591715b --- /dev/null +++ b/lib/portage/tests/gpkg/test_gpkg_metadata_url.py @@ -0,0 +1,173 @@ +# Copright Gentoo Foundation 2006-2020 +# Portage Unit Testing Functionality + +import io +import random +import sys +import tarfile +import tempfile +from functools import partial +from os import urandom + +from portage.gpkg import gpkg +from portage import os +from portage import shutil +from portage.tests import TestCase +from portage.tests.resolver.ResolverPlayground import ResolverPlayground +from portage.exception import InvalidSignature +from portage.gpg import GPG + + +class test_gpkg_metadata_url_case(TestCase): + def httpd(self, directory, port): + try: + import http.server + import socketserver + except ImportError: + self.skipTest("http server not exits") + + Handler = partial(http.server.SimpleHTTPRequestHandler, directory=directory) + + with socketserver.TCPServer(("127.0.0.1", port), Handler) as httpd: + httpd.serve_forever() + + def start_http_server(self, directory, port): + try: + import threading + except ImportError: + self.skipTest("threading module not exists") + + server = threading.Thread( + target=self.httpd, args=(directory, port), daemon=True + ) + server.start() + return server + + def test_gpkg_get_metadata_url(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + if sys.version_info.major == 3 and sys.version_info.minor <= 6: + self.skipTest("http server not support change root dir") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'BINPKG_COMPRESS="gzip"', + 'FEATURES="${FEATURES} -binpkg-signing ' + '-binpkg-request-signature"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + try: + settings = playground.settings + for _ in range(0, 5): + port = random.randint(30000, 60000) + try: + server = self.start_http_server(tmpdir, port) + except OSError: + continue + break + + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + with open(os.path.join(orig_full_path, "test"), "wb") as test_file: + test_file.write(urandom(1048576)) + + gpkg_file_loc = os.path.join(tmpdir, "test.gpkg.tar") + test_gpkg = gpkg(settings, "test", gpkg_file_loc) + + meta = { + "test1": b"{abcdefghijklmnopqrstuvwxyz, 1234567890}", + "test2": urandom(102400), + } + + test_gpkg.compress(os.path.join(tmpdir, "orig"), meta) + + meta_from_url = test_gpkg.get_metadata_url( + "http://127.0.0.1:" + str(port) + "/test.gpkg.tar" + ) + + self.assertEqual(meta, meta_from_url) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_get_metadata_url_unknown_signature(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + if sys.version_info.major == 3 and sys.version_info.minor <= 6: + self.skipTest("http server not support change root dir") + + playground = ResolverPlayground( + user_config={ + "make.conf": ( + 'BINPKG_COMPRESS="gzip"', + 'FEATURES="${FEATURES} binpkg-signing ' 'binpkg-request-signature"', + ), + } + ) + tmpdir = tempfile.mkdtemp() + try: + settings = playground.settings + gpg = GPG(settings) + gpg.unlock() + + for _ in range(0, 5): + port = random.randint(30000, 60000) + try: + server = self.start_http_server(tmpdir, port) + except OSError: + continue + break + + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + + with open(os.path.join(orig_full_path, "test"), "wb") as test_file: + test_file.write(urandom(1048576)) + + gpkg_file_loc = os.path.join(tmpdir, "test-1.gpkg.tar") + test_gpkg = gpkg(settings, "test", gpkg_file_loc) + + meta = { + "test1": b"{abcdefghijklmnopqrstuvwxyz, 1234567890}", + "test2": urandom(102400), + } + + test_gpkg.compress(os.path.join(tmpdir, "orig"), meta) + + with tarfile.open(os.path.join(tmpdir, "test-1.gpkg.tar"), "r") as tar_1: + with tarfile.open( + os.path.join(tmpdir, "test-2.gpkg.tar"), "w" + ) as tar_2: + for f in tar_1.getmembers(): + if f.name == "test/metadata.tar.gz": + sig = b""" +-----BEGIN PGP SIGNATURE----- + +iHUEABYIAB0WIQRVhCbPGi/rhGTq4nV+k2dcK9uyIgUCXw4ehAAKCRB+k2dcK9uy +IkCfAP49AOYjzuQPP0n5P0SGCINnAVEXN7QLQ4PurY/lt7cT2gEAq01stXjFhrz5 +87Koh+ND2r5XfQsz3XeBqbb/BpmbEgo= +=sc5K +-----END PGP SIGNATURE----- +""" + data = io.BytesIO(sig) + f.size = len(sig) + tar_2.addfile(f, data) + data.close() + else: + tar_2.addfile(f, tar_1.extractfile(f)) + + test_gpkg = gpkg(settings, "test") + self.assertRaises( + InvalidSignature, + test_gpkg.get_metadata_url, + "http://127.0.0.1:" + str(port) + "/test-2.gpkg.tar", + ) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() diff --git a/lib/portage/tests/gpkg/test_gpkg_path.py b/lib/portage/tests/gpkg/test_gpkg_path.py new file mode 100644 index 000000000..828233dbd --- /dev/null +++ b/lib/portage/tests/gpkg/test_gpkg_path.py @@ -0,0 +1,390 @@ +# -*- coding: utf-8 -*- +# Copright Gentoo Foundation 2006 +# Portage Unit Testing Functionality + +import tempfile +import tarfile +import io +import sys +from os import urandom + +from portage import os +from portage import shutil +from portage.util._compare_files import compare_files +from portage.tests import TestCase +from portage.tests.resolver.ResolverPlayground import ResolverPlayground +from portage.gpkg import gpkg + + +class test_gpkg_path_case(TestCase): + def test_gpkg_short_path(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ('BINPKG_COMPRESS="none"',), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + path_name = ( + "aaaabbbb/ccccdddd/eeeeffff/gggghhhh/iiiijjjj/kkkkllll/" + "mmmmnnnn/oooopppp/qqqqrrrr/sssstttt/" + ) + orig_full_path = os.path.join(tmpdir, "orig/" + path_name) + os.makedirs(orig_full_path) + with open(os.path.join(orig_full_path, "test"), "wb") as test_file: + test_file.write(urandom(1048576)) + + gpkg_file_loc = os.path.join(tmpdir, "test.gpkg.tar") + test_gpkg = gpkg(settings, "test", gpkg_file_loc) + + check_result = test_gpkg._check_pre_image_files( + os.path.join(tmpdir, "orig") + ) + self.assertEqual(check_result, (95, 4, 0, 1048576, 1048576)) + + test_gpkg.compress(os.path.join(tmpdir, "orig"), {"meta": "test"}) + with open(gpkg_file_loc, "rb") as container: + # container + self.assertEqual( + test_gpkg._get_tar_format(container), tarfile.USTAR_FORMAT + ) + + with tarfile.open(gpkg_file_loc, "r") as container: + metadata = io.BytesIO(container.extractfile("test/metadata.tar").read()) + self.assertEqual( + test_gpkg._get_tar_format(metadata), tarfile.USTAR_FORMAT + ) + metadata.close() + + image = io.BytesIO(container.extractfile("test/image.tar").read()) + self.assertEqual(test_gpkg._get_tar_format(image), tarfile.USTAR_FORMAT) + image.close() + + test_gpkg.decompress(os.path.join(tmpdir, "test")) + r = compare_files( + os.path.join(tmpdir, "orig/" + path_name + "test"), + os.path.join(tmpdir, "test/" + path_name + "test"), + skipped_types=("atime", "mtime", "ctime"), + ) + self.assertEqual(r, ()) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_long_path(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ('BINPKG_COMPRESS="none"',), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + + path_name = ( + "aaaabbbb/ccccdddd/eeeeffff/gggghhhh/iiiijjjj/kkkkllll/" + "mmmmnnnn/oooopppp/qqqqrrrr/sssstttt/uuuuvvvv/wwwwxxxx/" + "yyyyzzzz/00001111/22223333/44445555/66667777/88889999/" + "aaaabbbb/ccccdddd/eeeeffff/gggghhhh/iiiijjjj/kkkkllll/" + "mmmmnnnn/oooopppp/qqqqrrrr/sssstttt/uuuuvvvv/wwwwxxxx/" + "yyyyzzzz/00001111/22223333/44445555/66667777/88889999/" + ) + orig_full_path = os.path.join(tmpdir, "orig/" + path_name) + os.makedirs(orig_full_path) + with open(os.path.join(orig_full_path, "test"), "wb") as test_file: + test_file.write(urandom(1048576)) + + gpkg_file_loc = os.path.join(tmpdir, "test.gpkg.tar") + test_gpkg = gpkg(settings, "test", gpkg_file_loc) + + check_result = test_gpkg._check_pre_image_files( + os.path.join(tmpdir, "orig") + ) + self.assertEqual(check_result, (329, 4, 0, 1048576, 1048576)) + + test_gpkg.compress(os.path.join(tmpdir, "orig"), {"meta": "test"}) + with open(gpkg_file_loc, "rb") as container: + # container + self.assertEqual( + test_gpkg._get_tar_format(container), tarfile.USTAR_FORMAT + ) + + with tarfile.open(gpkg_file_loc, "r") as container: + metadata = io.BytesIO(container.extractfile("test/metadata.tar").read()) + self.assertEqual( + test_gpkg._get_tar_format(metadata), tarfile.USTAR_FORMAT + ) + metadata.close() + + image = io.BytesIO(container.extractfile("test/image.tar").read()) + self.assertEqual(test_gpkg._get_tar_format(image), tarfile.GNU_FORMAT) + image.close() + + test_gpkg.decompress(os.path.join(tmpdir, "test")) + r = compare_files( + os.path.join(tmpdir, "orig/" + path_name + "test"), + os.path.join(tmpdir, "test/" + path_name + "test"), + skipped_types=("atime", "mtime", "ctime"), + ) + self.assertEqual(r, ()) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_non_ascii_path(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ('BINPKG_COMPRESS="none"',), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + + path_name = "中文测试/日本語テスト/한국어시험/" + orig_full_path = os.path.join(tmpdir, "orig/" + path_name) + os.makedirs(orig_full_path) + with open(os.path.join(orig_full_path, "test"), "wb") as test_file: + test_file.write(urandom(1048576)) + + gpkg_file_loc = os.path.join(tmpdir, "test.gpkg.tar") + test_gpkg = gpkg(settings, "test", gpkg_file_loc) + + check_result = test_gpkg._check_pre_image_files( + os.path.join(tmpdir, "orig") + ) + self.assertEqual(check_result, (53, 4, 0, 1048576, 1048576)) + + test_gpkg.compress(os.path.join(tmpdir, "orig"), {"meta": "test"}) + with open(gpkg_file_loc, "rb") as container: + # container + self.assertEqual( + test_gpkg._get_tar_format(container), tarfile.USTAR_FORMAT + ) + + with tarfile.open(gpkg_file_loc, "r") as container: + metadata = io.BytesIO(container.extractfile("test/metadata.tar").read()) + self.assertEqual( + test_gpkg._get_tar_format(metadata), tarfile.USTAR_FORMAT + ) + metadata.close() + + image = io.BytesIO(container.extractfile("test/image.tar").read()) + self.assertEqual(test_gpkg._get_tar_format(image), tarfile.USTAR_FORMAT) + image.close() + + test_gpkg.decompress(os.path.join(tmpdir, "test")) + r = compare_files( + os.path.join(tmpdir, "orig/" + path_name + "test"), + os.path.join(tmpdir, "test/" + path_name + "test"), + skipped_types=("atime", "mtime", "ctime"), + ) + self.assertEqual(r, ()) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_symlink_path(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ('BINPKG_COMPRESS="none"',), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + os.symlink( + "aaaabbbb/ccccdddd/eeeeffff/gggghhhh/iiiijjjj/kkkkllll/" + "mmmmnnnn/oooopppp/qqqqrrrr/sssstttt/uuuuvvvv/wwwwxxxx/" + "yyyyzzzz/00001111/22223333/44445555/66667777/88889999/test", + os.path.join(orig_full_path, "a_long_symlink"), + ) + + gpkg_file_loc = os.path.join(tmpdir, "test.gpkg.tar") + test_gpkg = gpkg(settings, "test", gpkg_file_loc) + + check_result = test_gpkg._check_pre_image_files( + os.path.join(tmpdir, "orig") + ) + self.assertEqual(check_result, (0, 14, 166, 0, 0)) + + test_gpkg.compress(os.path.join(tmpdir, "orig"), {"meta": "test"}) + with open(gpkg_file_loc, "rb") as container: + # container + self.assertEqual( + test_gpkg._get_tar_format(container), tarfile.USTAR_FORMAT + ) + + with tarfile.open(gpkg_file_loc, "r") as container: + metadata = io.BytesIO(container.extractfile("test/metadata.tar").read()) + self.assertEqual( + test_gpkg._get_tar_format(metadata), tarfile.USTAR_FORMAT + ) + metadata.close() + + image = io.BytesIO(container.extractfile("test/image.tar").read()) + self.assertEqual(test_gpkg._get_tar_format(image), tarfile.GNU_FORMAT) + image.close() + + test_gpkg.decompress(os.path.join(tmpdir, "test")) + r = compare_files( + os.path.join(tmpdir, "orig/", "a_long_symlink"), + os.path.join(tmpdir, "test/", "a_long_symlink"), + skipped_types=("atime", "mtime", "ctime"), + ) + self.assertEqual(r, ()) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() + + def test_gpkg_long_hardlink_path(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ('BINPKG_COMPRESS="none"',), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + + path_name = ( + "aaaabbbb/ccccdddd/eeeeffff/gggghhhh/iiiijjjj/kkkkllll/" + "mmmmnnnn/oooopppp/qqqqrrrr/sssstttt/uuuuvvvv/wwwwxxxx/" + ) + file_name = ( + "test-A-B-C-D-E-F-G-H-I-J-K-L-M-N-O-P-Q-R-S-T-U-V-W-X-Y-Z" + "A-B-C-D-E-F-G-H-I-J-K-L-M-N-O-P-Q-R-S-T-U-V-W-X-Y-Z" + "A-B-C-D-E-F-G-H-I-J-K-L-M-N-O-P-Q-R-S-T-U-V-W-X-Y-Z" + ) + orig_full_path = os.path.join(tmpdir, "orig", path_name) + os.makedirs(orig_full_path) + with open(os.path.join(orig_full_path, "test"), "wb") as test_file: + test_file.write(urandom(1048576)) + + os.link( + os.path.join(orig_full_path, "test"), + os.path.join(orig_full_path, file_name), + ) + + gpkg_file_loc = os.path.join(tmpdir, "test.gpkg.tar") + test_gpkg = gpkg(settings, "test", gpkg_file_loc) + + check_result = test_gpkg._check_pre_image_files( + os.path.join(tmpdir, "orig") + ) + self.assertEqual(check_result, (113, 158, 272, 1048576, 2097152)) + + test_gpkg.compress(os.path.join(tmpdir, "orig"), {"meta": "test"}) + with open(gpkg_file_loc, "rb") as container: + # container + self.assertEqual( + test_gpkg._get_tar_format(container), tarfile.USTAR_FORMAT + ) + + with tarfile.open(gpkg_file_loc, "r") as container: + metadata = io.BytesIO(container.extractfile("test/metadata.tar").read()) + self.assertEqual( + test_gpkg._get_tar_format(metadata), tarfile.USTAR_FORMAT + ) + metadata.close() + + image = io.BytesIO(container.extractfile("test/image.tar").read()) + self.assertEqual(test_gpkg._get_tar_format(image), tarfile.GNU_FORMAT) + image.close() + + test_gpkg.decompress(os.path.join(tmpdir, "test")) + r = compare_files( + os.path.join(tmpdir, "orig", path_name, file_name), + os.path.join(tmpdir, "test", path_name, file_name), + skipped_types=("atime", "mtime", "ctime"), + ) + self.assertEqual(r, ()) + finally: + shutil.rmtree(tmpdir) + + def test_gpkg_long_filename(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ('BINPKG_COMPRESS="none"',), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + path_name = "aaaabbbb/ccccdddd/eeeeffff/gggghhhh/iiiijjjj/kkkkllll/" + file_name = ( + "test1234567890" + "A-B-C-D-E-F-G-H-I-J-K-L-M-N-O-P-Q-R-S-T-U-V-W-X-Y-Z" + "A-B-C-D-E-F-G-H-I-J-K-L-M-N-O-P-Q-R-S-T-U-V-W-X-Y-Z" + "A-B-C-D-E-F-G-H-I-J-K-L-M-N-O-P-Q-R-S-T-U-V-W-X-Y-Z" + ) + + orig_full_path = os.path.join(tmpdir, "orig/" + path_name) + os.makedirs(orig_full_path) + with open(os.path.join(orig_full_path, file_name), "wb") as test_file: + test_file.write(urandom(1048576)) + + gpkg_file_loc = os.path.join(tmpdir, "test.gpkg.tar") + test_gpkg = gpkg(settings, "test", gpkg_file_loc) + + check_result = test_gpkg._check_pre_image_files( + os.path.join(tmpdir, "orig") + ) + self.assertEqual(check_result, (59, 167, 0, 1048576, 1048576)) + + test_gpkg.compress(os.path.join(tmpdir, "orig"), {"meta": "test"}) + with open(gpkg_file_loc, "rb") as container: + # container + self.assertEqual( + test_gpkg._get_tar_format(container), tarfile.USTAR_FORMAT + ) + + with tarfile.open(gpkg_file_loc, "r") as container: + metadata = io.BytesIO(container.extractfile("test/metadata.tar").read()) + self.assertEqual( + test_gpkg._get_tar_format(metadata), tarfile.USTAR_FORMAT + ) + metadata.close() + + image = io.BytesIO(container.extractfile("test/image.tar").read()) + self.assertEqual(test_gpkg._get_tar_format(image), tarfile.GNU_FORMAT) + image.close() + + test_gpkg.decompress(os.path.join(tmpdir, "test")) + r = compare_files( + os.path.join(tmpdir, "orig", path_name, file_name), + os.path.join(tmpdir, "test", path_name, file_name), + skipped_types=("atime", "mtime", "ctime"), + ) + self.assertEqual(r, ()) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() diff --git a/lib/portage/tests/gpkg/test_gpkg_size.py b/lib/portage/tests/gpkg/test_gpkg_size.py new file mode 100644 index 000000000..a38621e70 --- /dev/null +++ b/lib/portage/tests/gpkg/test_gpkg_size.py @@ -0,0 +1,58 @@ +# Copright Gentoo Foundation 2006-2020 +# Portage Unit Testing Functionality + +import tempfile +import tarfile +import sys + +from portage import os, shutil +from portage.tests import TestCase +from portage.tests.resolver.ResolverPlayground import ResolverPlayground +from portage.gpkg import gpkg + + +class test_gpkg_large_size_case(TestCase): + def test_gpkg_large_size(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + playground = ResolverPlayground( + user_config={ + "make.conf": ('BINPKG_COMPRESS="gzip"',), + } + ) + tmpdir = tempfile.mkdtemp() + + try: + settings = playground.settings + + orig_full_path = os.path.join(tmpdir, "orig/") + os.makedirs(orig_full_path) + # Check if filesystem support sparse file + with open(os.path.join(orig_full_path, "test"), "wb") as test_file: + test_file.truncate(1048576) + + if os.stat(os.path.join(orig_full_path, "test")).st_blocks != 0: + self.skipTest("Filesystem does not support sparse file") + + with open(os.path.join(orig_full_path, "test"), "wb") as test_file: + test_file.truncate(10737418240) + + gpkg_file_loc = os.path.join(tmpdir, "test.gpkg.tar") + test_gpkg = gpkg(settings, "test", gpkg_file_loc) + + check_result = test_gpkg._check_pre_image_files( + os.path.join(tmpdir, "orig") + ) + self.assertEqual(check_result, (0, 4, 0, 10737418240, 10737418240)) + + test_gpkg.compress(os.path.join(tmpdir, "orig"), {"meta": "test"}) + + with open(gpkg_file_loc, "rb") as container: + # container + self.assertEqual( + test_gpkg._get_tar_format(container), tarfile.GNU_FORMAT + ) + finally: + shutil.rmtree(tmpdir) + playground.cleanup() diff --git a/lib/portage/tests/gpkg/test_gpkg_stream.py b/lib/portage/tests/gpkg/test_gpkg_stream.py new file mode 100644 index 000000000..66471e01b --- /dev/null +++ b/lib/portage/tests/gpkg/test_gpkg_stream.py @@ -0,0 +1,112 @@ +# Copright Gentoo Foundation 2006-2020 +# Portage Unit Testing Functionality + +import sys +import tempfile +import io +import tarfile +from os import urandom + +import portage.gpkg +from portage import os +from portage import shutil +from portage.tests import TestCase +from portage.exception import CompressorOperationFailed + + +class test_gpkg_stream_case(TestCase): + def test_gpkg_stream_reader(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + data = urandom(1048576) + data_io = io.BytesIO(data) + data_io.seek(0) + with portage.gpkg.tar_stream_reader(data_io, ["cat"]) as test_reader: + data2 = test_reader.read() + data_io.close() + self.assertEqual(data, data2) + + def test_gpkg_stream_reader_without_cmd(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + data = urandom(1048576) + data_io = io.BytesIO(data) + data_io.seek(0) + with portage.gpkg.tar_stream_reader(data_io) as test_reader: + data2 = test_reader.read() + data_io.close() + self.assertEqual(data, data2) + + def test_gpkg_stream_reader_kill(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + data = urandom(1048576) + data_io = io.BytesIO(data) + data_io.seek(0) + with portage.gpkg.tar_stream_reader(data_io, ["cat"]) as test_reader: + try: + test_reader.kill() + except CompressorOperationFailed: + pass + data_io.close() + self.assertNotEqual(test_reader.proc.poll(), None) + + def test_gpkg_stream_reader_kill_without_cmd(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + data = urandom(1048576) + data_io = io.BytesIO(data) + data_io.seek(0) + with portage.gpkg.tar_stream_reader(data_io) as test_reader: + test_reader.kill() + data_io.close() + self.assertEqual(test_reader.proc, None) + + def test_gpkg_stream_writer(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + tmpdir = tempfile.mkdtemp() + try: + gpkg_file_loc = os.path.join(tmpdir, "test.gpkg.tar") + data = urandom(1048576) + with tarfile.open(gpkg_file_loc, "w") as test_tar: + test_tarinfo = tarfile.TarInfo("test") + with portage.gpkg.tar_stream_writer( + test_tarinfo, test_tar, tarfile.USTAR_FORMAT, ["cat"] + ) as test_writer: + test_writer.write(data) + + with tarfile.open(gpkg_file_loc, "r") as test_tar: + test_tarinfo = test_tar.getmember("test") + data2 = test_tar.extractfile(test_tarinfo).read() + self.assertEqual(data, data2) + finally: + shutil.rmtree(tmpdir) + + def test_gpkg_stream_writer_without_cmd(self): + if sys.version_info.major < 3: + self.skipTest("Not support Python 2") + + tmpdir = tempfile.mkdtemp() + + try: + gpkg_file_loc = os.path.join(tmpdir, "test.gpkg.tar") + data = urandom(1048576) + with tarfile.open(gpkg_file_loc, "w") as test_tar: + test_tarinfo = tarfile.TarInfo("test") + with portage.gpkg.tar_stream_writer( + test_tarinfo, test_tar, tarfile.USTAR_FORMAT + ) as test_writer: + test_writer.write(data) + + with tarfile.open(gpkg_file_loc, "r") as test_tar: + test_tarinfo = test_tar.getmember("test") + data2 = test_tar.extractfile(test_tarinfo).read() + self.assertEqual(data, data2) + finally: + shutil.rmtree(tmpdir) diff --git a/lib/portage/tests/resolver/ResolverPlayground.py b/lib/portage/tests/resolver/ResolverPlayground.py index fdd0714e6..fa8b0cc76 100644 --- a/lib/portage/tests/resolver/ResolverPlayground.py +++ b/lib/portage/tests/resolver/ResolverPlayground.py @@ -13,6 +13,7 @@ from portage.const import ( GLOBAL_CONFIG_PATH, PORTAGE_BIN_PATH, USER_CONFIG_PATH, + SUPPORTED_GENTOO_BINPKG_FORMATS, ) from portage.process import find_binary from portage.dep import Atom, _repo_separator @@ -23,6 +24,8 @@ from portage._sets.base import InternalPackageSet from portage.tests import cnf_path from portage.util import ensure_dirs, normalize_path from portage.versions import catsplit +from portage.exception import InvalidBinaryPackageFormat +from portage.gpg import GPG import _emerge from _emerge.actions import _calc_depclean @@ -161,6 +164,7 @@ class ResolverPlayground: "egrep", "env", "find", + "flock", "grep", "head", "install", @@ -225,7 +229,6 @@ class ResolverPlayground: self._create_distfiles(distfiles) self._create_ebuilds(ebuilds) - self._create_binpkgs(binpkgs) self._create_installed(installed) self._create_profile( ebuilds, eclasses, installed, profile, repo_configs, user_config, sets @@ -234,6 +237,8 @@ class ResolverPlayground: self.settings, self.trees = self._load_config() + self.gpg = None + self._create_binpkgs(binpkgs) self._create_ebuild_manifests(ebuilds) portage.util.noiselimit = 0 @@ -341,6 +346,13 @@ class ResolverPlayground: # a dict. items = getattr(binpkgs, "items", None) items = items() if items is not None else binpkgs + binpkg_format = self.settings.get( + "BINPKG_FORMAT", SUPPORTED_GENTOO_BINPKG_FORMATS[0] + ) + if binpkg_format == "gpkg": + if self.gpg is None: + self.gpg = GPG(self.settings) + self.gpg.unlock() for cpv, metadata in items: a = Atom("=" + cpv, allow_repo=True) repo = a.repo @@ -356,19 +368,38 @@ class ResolverPlayground: metadata["repository"] = repo metadata["CATEGORY"] = cat metadata["PF"] = pf + metadata["BINPKG_FORMAT"] = binpkg_format repo_dir = self.pkgdir category_dir = os.path.join(repo_dir, cat) if "BUILD_ID" in metadata: - binpkg_path = os.path.join( - category_dir, pn, "%s-%s.xpak" % (pf, metadata["BUILD_ID"]) - ) + if binpkg_format == "xpak": + binpkg_path = os.path.join( + category_dir, pn, "%s-%s.xpak" % (pf, metadata["BUILD_ID"]) + ) + elif binpkg_format == "gpkg": + binpkg_path = os.path.join( + category_dir, pn, "%s-%s.gpkg.tar" % (pf, metadata["BUILD_ID"]) + ) + else: + raise InvalidBinaryPackageFormat(binpkg_format) else: - binpkg_path = os.path.join(category_dir, pf + ".tbz2") + if binpkg_format == "xpak": + binpkg_path = os.path.join(category_dir, pf + ".tbz2") + elif binpkg_format == "gpkg": + binpkg_path = os.path.join(category_dir, pf + ".gpkg.tar") + else: + raise InvalidBinaryPackageFormat(binpkg_format) ensure_dirs(os.path.dirname(binpkg_path)) - t = portage.xpak.tbz2(binpkg_path) - t.recompose_mem(portage.xpak.xpak_mem(metadata)) + if binpkg_format == "xpak": + t = portage.xpak.tbz2(binpkg_path) + t.recompose_mem(portage.xpak.xpak_mem(metadata)) + elif binpkg_format == "gpkg": + t = portage.gpkg.gpkg(self.settings, a.cpv, binpkg_path) + t.compress(os.path.dirname(binpkg_path), metadata) + else: + raise InvalidBinaryPackageFormat(binpkg_format) def _create_installed(self, installed): for cpv in installed: @@ -545,11 +576,19 @@ class ResolverPlayground: sub_profile_dir, os.path.join(user_config_dir, "make.profile") ) + gpg_test_path = os.environ["PORTAGE_GNUPGHOME"] + make_conf = { "ACCEPT_KEYWORDS": "x86", + "BINPKG_GPG_SIGNING_BASE_COMMAND": f"flock {gpg_test_path}/portage-binpkg-gpg.lock /usr/bin/gpg --sign --armor --yes --pinentry-mode loopback --passphrase GentooTest [PORTAGE_CONFIG]", + "BINPKG_GPG_SIGNING_GPG_HOME": gpg_test_path, + "BINPKG_GPG_SIGNING_KEY": "0x5D90EA06352177F6", + "BINPKG_GPG_VERIFY_GPG_HOME": gpg_test_path, "CLEAN_DELAY": "0", "DISTDIR": self.distdir, "EMERGE_WARNING_DELAY": "0", + "FEATURES": "${FEATURES} binpkg-signing binpkg-request-signature " + "gpg-keepalive", "PKGDIR": self.pkgdir, "PORTAGE_INST_GID": str(portage.data.portage_gid), "PORTAGE_INST_UID": str(portage.data.portage_uid), @@ -742,6 +781,8 @@ class ResolverPlayground: return def cleanup(self): + if self.gpg is not None: + self.gpg.stop() for eroot in self.trees: portdb = self.trees[eroot]["porttree"].dbapi portdb.close_caches() diff --git a/lib/portage/tests/resolver/binpkg_multi_instance/test_build_id_profile_format.py b/lib/portage/tests/resolver/binpkg_multi_instance/test_build_id_profile_format.py index b311961d6..4dc83e843 100644 --- a/lib/portage/tests/resolver/binpkg_multi_instance/test_build_id_profile_format.py +++ b/lib/portage/tests/resolver/binpkg_multi_instance/test_build_id_profile_format.py @@ -1,11 +1,16 @@ # Copyright 2015-2021 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class BuildIdProfileFormatTestCase(TestCase): @@ -139,21 +144,30 @@ class BuildIdProfileFormatTestCase(TestCase): ), ) - playground = ResolverPlayground( - debug=False, - binpkgs=binpkgs, - ebuilds=ebuilds, - installed=installed, - repo_configs=repo_configs, - profile=profile, - user_config=user_config, - world=world, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - # Disable debug so that cleanup works. - # playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + _user_config = user_config.copy() + _user_config["make.conf"] += ('BINPKG_FORMAT="%s"' % binpkg_format,) + playground = ResolverPlayground( + debug=False, + binpkgs=binpkgs, + ebuilds=ebuilds, + installed=installed, + repo_configs=repo_configs, + profile=profile, + user_config=_user_config, + world=world, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + # Disable debug so that cleanup works. + # playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/binpkg_multi_instance/test_rebuilt_binaries.py b/lib/portage/tests/resolver/binpkg_multi_instance/test_rebuilt_binaries.py index c854604c1..854dee458 100644 --- a/lib/portage/tests/resolver/binpkg_multi_instance/test_rebuilt_binaries.py +++ b/lib/portage/tests/resolver/binpkg_multi_instance/test_rebuilt_binaries.py @@ -1,11 +1,16 @@ # Copyright 2015 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class RebuiltBinariesCase(TestCase): @@ -99,18 +104,27 @@ class RebuiltBinariesCase(TestCase): ), ) - playground = ResolverPlayground( - debug=False, - binpkgs=binpkgs, - installed=installed, - user_config=user_config, - world=world, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - # Disable debug so that cleanup works. - # playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + _user_config = user_config.copy() + _user_config["make.conf"] += ('BINPKG_FORMAT="%s"' % binpkg_format,) + playground = ResolverPlayground( + debug=False, + binpkgs=binpkgs, + installed=installed, + user_config=_user_config, + world=world, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + # Disable debug so that cleanup works. + # playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/soname/test_autounmask.py b/lib/portage/tests/resolver/soname/test_autounmask.py index 3ee0be8d0..e324c9392 100644 --- a/lib/portage/tests/resolver/soname/test_autounmask.py +++ b/lib/portage/tests/resolver/soname/test_autounmask.py @@ -1,11 +1,16 @@ # Copyright 2015 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SonameAutoUnmaskTestCase(TestCase): @@ -85,13 +90,26 @@ class SonameAutoUnmaskTestCase(TestCase): ), ) - playground = ResolverPlayground( - binpkgs=binpkgs, installed=installed, world=world, debug=False - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/soname/test_downgrade.py b/lib/portage/tests/resolver/soname/test_downgrade.py index b683745e0..c601b6381 100644 --- a/lib/portage/tests/resolver/soname/test_downgrade.py +++ b/lib/portage/tests/resolver/soname/test_downgrade.py @@ -1,11 +1,16 @@ # Copyright 2015 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SonameDowngradeTestCase(TestCase): @@ -125,22 +130,29 @@ class SonameDowngradeTestCase(TestCase): ), ) - playground = ResolverPlayground( - binpkgs=binpkgs, - ebuilds=ebuilds, - installed=installed, - user_config=user_config, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - # Disable debug so that cleanup works. - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + user_config["make.conf"] = ('BINPKG_FORMAT="%s"' % binpkg_format,) + playground = ResolverPlayground( + binpkgs=binpkgs, + ebuilds=ebuilds, + installed=installed, + user_config=user_config, + world=world, + debug=False, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + # Disable debug so that cleanup works. + playground.debug = False + playground.cleanup() def testTwoSlots(self): @@ -217,19 +229,27 @@ class SonameDowngradeTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - user_config=user_config, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - # Disable debug so that cleanup works. - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + user_config["make.conf"] = ('BINPKG_FORMAT="%s"' % binpkg_format,) + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + user_config=user_config, + world=world, + debug=False, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + # Disable debug so that cleanup works. + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/soname/test_or_choices.py b/lib/portage/tests/resolver/soname/test_or_choices.py index c636726f3..dcdcf57e3 100644 --- a/lib/portage/tests/resolver/soname/test_or_choices.py +++ b/lib/portage/tests/resolver/soname/test_or_choices.py @@ -1,11 +1,16 @@ # Copyright 2015 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SonameOrChoicesTestCase(TestCase): @@ -83,14 +88,26 @@ class SonameOrChoicesTestCase(TestCase): ), ) - playground = ResolverPlayground( - debug=False, binpkgs=binpkgs, installed=installed, world=world - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - # Disable debug so that cleanup works. - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + debug=False, + binpkgs=binpkgs, + installed=installed, + world=world, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + # Disable debug so that cleanup works. + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/soname/test_reinstall.py b/lib/portage/tests/resolver/soname/test_reinstall.py index f4616f9dd..68c842af1 100644 --- a/lib/portage/tests/resolver/soname/test_reinstall.py +++ b/lib/portage/tests/resolver/soname/test_reinstall.py @@ -1,11 +1,16 @@ # Copyright 2015 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SonameReinstallTestCase(TestCase): @@ -72,14 +77,27 @@ class SonameReinstallTestCase(TestCase): ), ) - playground = ResolverPlayground( - debug=False, binpkgs=binpkgs, installed=installed, world=world - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - # Disable debug so that cleanup works. - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + debug=False, + binpkgs=binpkgs, + installed=installed, + world=world, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + # Disable debug so that cleanup works. + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/soname/test_skip_update.py b/lib/portage/tests/resolver/soname/test_skip_update.py index 336bfac4f..a515a5252 100644 --- a/lib/portage/tests/resolver/soname/test_skip_update.py +++ b/lib/portage/tests/resolver/soname/test_skip_update.py @@ -1,11 +1,16 @@ # Copyright 2015 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SonameSkipUpdateTestCase(TestCase): @@ -71,14 +76,26 @@ class SonameSkipUpdateTestCase(TestCase): ), ) - playground = ResolverPlayground( - debug=False, binpkgs=binpkgs, installed=installed, world=world - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - # Disable debug so that cleanup works. - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + debug=False, + binpkgs=binpkgs, + installed=installed, + world=world, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + # Disable debug so that cleanup works. + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/soname/test_slot_conflict_reinstall.py b/lib/portage/tests/resolver/soname/test_slot_conflict_reinstall.py index 39430ae41..027cadc83 100644 --- a/lib/portage/tests/resolver/soname/test_slot_conflict_reinstall.py +++ b/lib/portage/tests/resolver/soname/test_slot_conflict_reinstall.py @@ -1,11 +1,16 @@ # Copyright 2015 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SonameSlotConflictReinstallTestCase(TestCase): @@ -80,16 +85,28 @@ class SonameSlotConflictReinstallTestCase(TestCase): ), ) - playground = ResolverPlayground( - binpkgs=binpkgs, installed=installed, world=world, debug=False - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.debug = False + playground.cleanup() def testSonameSlotConflictMassRebuild(self): """ @@ -159,16 +176,29 @@ class SonameSlotConflictReinstallTestCase(TestCase): world = [] - playground = ResolverPlayground( - binpkgs=binpkgs, installed=installed, world=world, debug=False - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.debug = False + playground.cleanup() def testSonameSlotConflictForgottenChild(self): """ @@ -242,16 +272,29 @@ class SonameSlotConflictReinstallTestCase(TestCase): world = ["app-misc/A"] - playground = ResolverPlayground( - binpkgs=binpkgs, installed=installed, world=world, debug=False - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.debug = False + playground.cleanup() def testSonameSlotConflictMixedDependencies(self): """ @@ -316,13 +359,25 @@ class SonameSlotConflictReinstallTestCase(TestCase): world = [] - playground = ResolverPlayground( - binpkgs=binpkgs, installed=installed, world=world, debug=False - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/soname/test_slot_conflict_update.py b/lib/portage/tests/resolver/soname/test_slot_conflict_update.py index 0541a185e..dd763caef 100644 --- a/lib/portage/tests/resolver/soname/test_slot_conflict_update.py +++ b/lib/portage/tests/resolver/soname/test_slot_conflict_update.py @@ -1,11 +1,16 @@ # Copyright 2015 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SonameSlotConflictUpdateTestCase(TestCase): @@ -88,13 +93,26 @@ class SonameSlotConflictUpdateTestCase(TestCase): ), ) - playground = ResolverPlayground( - binpkgs=binpkgs, installed=installed, world=world, debug=False - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/soname/test_soname_provided.py b/lib/portage/tests/resolver/soname/test_soname_provided.py index 3cd9f1423..6a9ee76ba 100644 --- a/lib/portage/tests/resolver/soname/test_soname_provided.py +++ b/lib/portage/tests/resolver/soname/test_soname_provided.py @@ -1,11 +1,16 @@ # Copyright 2015 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SonameProvidedTestCase(TestCase): @@ -62,18 +67,28 @@ class SonameProvidedTestCase(TestCase): ), ) - playground = ResolverPlayground( - binpkgs=binpkgs, - debug=False, - profile=profile, - installed=installed, - world=world, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - # Disable debug so that cleanup works. - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + binpkgs=binpkgs, + debug=False, + profile=profile, + installed=installed, + world=world, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + # Disable debug so that cleanup works. + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/soname/test_unsatisfiable.py b/lib/portage/tests/resolver/soname/test_unsatisfiable.py index a8d2e10db..75d50c10f 100644 --- a/lib/portage/tests/resolver/soname/test_unsatisfiable.py +++ b/lib/portage/tests/resolver/soname/test_unsatisfiable.py @@ -1,11 +1,16 @@ # Copyright 2015 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SonameUnsatisfiableTestCase(TestCase): @@ -57,14 +62,27 @@ class SonameUnsatisfiableTestCase(TestCase): ), ) - playground = ResolverPlayground( - binpkgs=binpkgs, debug=False, installed=installed, world=world - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - # Disable debug so that cleanup works. - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + binpkgs=binpkgs, + debug=False, + installed=installed, + world=world, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + # Disable debug so that cleanup works. + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/soname/test_unsatisfied.py b/lib/portage/tests/resolver/soname/test_unsatisfied.py index 955d5d75b..2e0fe6e7f 100644 --- a/lib/portage/tests/resolver/soname/test_unsatisfied.py +++ b/lib/portage/tests/resolver/soname/test_unsatisfied.py @@ -1,11 +1,16 @@ # Copyright 2015 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SonameUnsatisfiedTestCase(TestCase): @@ -70,14 +75,27 @@ class SonameUnsatisfiedTestCase(TestCase): ), ) - playground = ResolverPlayground( - binpkgs=binpkgs, debug=False, installed=installed, world=world - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - # Disable debug so that cleanup works. - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + binpkgs=binpkgs, + debug=False, + installed=installed, + world=world, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + # Disable debug so that cleanup works. + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_autounmask_binpkg_use.py b/lib/portage/tests/resolver/test_autounmask_binpkg_use.py index e2164f0b1..682732611 100644 --- a/lib/portage/tests/resolver/test_autounmask_binpkg_use.py +++ b/lib/portage/tests/resolver/test_autounmask_binpkg_use.py @@ -1,11 +1,16 @@ # Copyright 2017 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class AutounmaskBinpkgUseTestCase(TestCase): @@ -55,13 +60,26 @@ class AutounmaskBinpkgUseTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, binpkgs=binpkgs, installed=installed, debug=False - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_bdeps.py b/lib/portage/tests/resolver/test_bdeps.py index ce1f8e0d5..a1b987eca 100644 --- a/lib/portage/tests/resolver/test_bdeps.py +++ b/lib/portage/tests/resolver/test_bdeps.py @@ -1,11 +1,16 @@ # Copyright 2017 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS +from portage.output import colorize class BdepsTestCase(TestCase): @@ -183,18 +188,27 @@ class BdepsTestCase(TestCase): ), ) - playground = ResolverPlayground( - debug=False, - ebuilds=ebuilds, - installed=installed, - binpkgs=binpkgs, - world=world, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - # Disable debug so that cleanup works. - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + debug=False, + ebuilds=ebuilds, + installed=installed, + binpkgs=binpkgs, + world=world, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + # Disable debug so that cleanup works. + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_binary_pkg_ebuild_visibility.py b/lib/portage/tests/resolver/test_binary_pkg_ebuild_visibility.py index 9a6a5417a..10a292abd 100644 --- a/lib/portage/tests/resolver/test_binary_pkg_ebuild_visibility.py +++ b/lib/portage/tests/resolver/test_binary_pkg_ebuild_visibility.py @@ -1,11 +1,16 @@ # Copyright 2017 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class BinaryPkgEbuildVisibilityTestCase(TestCase): @@ -124,12 +129,24 @@ class BinaryPkgEbuildVisibilityTestCase(TestCase): ), ) - playground = ResolverPlayground( - binpkgs=binpkgs, ebuilds=ebuilds, installed=installed, world=world - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + binpkgs=binpkgs, + ebuilds=ebuilds, + installed=installed, + world=world, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_changed_deps.py b/lib/portage/tests/resolver/test_changed_deps.py index c3a0e2f87..0c9b34d4b 100644 --- a/lib/portage/tests/resolver/test_changed_deps.py +++ b/lib/portage/tests/resolver/test_changed_deps.py @@ -1,11 +1,16 @@ # Copyright 2014 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class ChangedDepsTestCase(TestCase): @@ -103,16 +108,26 @@ class ChangedDepsTestCase(TestCase): ), ) - playground = ResolverPlayground( - debug=False, - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - world=world, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + debug=False, + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + world=world, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_complete_if_new_subslot_without_revbump.py b/lib/portage/tests/resolver/test_complete_if_new_subslot_without_revbump.py index 3a5912606..6b061ca2a 100644 --- a/lib/portage/tests/resolver/test_complete_if_new_subslot_without_revbump.py +++ b/lib/portage/tests/resolver/test_complete_if_new_subslot_without_revbump.py @@ -1,11 +1,16 @@ # Copyright 2013 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class CompeteIfNewSubSlotWithoutRevBumpTestCase(TestCase): @@ -55,16 +60,25 @@ class CompeteIfNewSubSlotWithoutRevBumpTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_disjunctive_depend_order.py b/lib/portage/tests/resolver/test_disjunctive_depend_order.py index e08a1d845..4471bc605 100644 --- a/lib/portage/tests/resolver/test_disjunctive_depend_order.py +++ b/lib/portage/tests/resolver/test_disjunctive_depend_order.py @@ -1,11 +1,16 @@ # Copyright 2017 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class DisjunctiveDependOrderTestCase(TestCase): @@ -71,12 +76,25 @@ class DisjunctiveDependOrderTestCase(TestCase): ), ) - playground = ResolverPlayground(debug=False, binpkgs=binpkgs, ebuilds=ebuilds) + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + debug=False, + binpkgs=binpkgs, + ebuilds=ebuilds, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.debug = False - playground.cleanup() + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_multirepo.py b/lib/portage/tests/resolver/test_multirepo.py index 3a8eaa3d6..8132439b5 100644 --- a/lib/portage/tests/resolver/test_multirepo.py +++ b/lib/portage/tests/resolver/test_multirepo.py @@ -1,11 +1,16 @@ # Copyright 2010-2020 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class MultirepoTestCase(TestCase): @@ -236,15 +241,28 @@ class MultirepoTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, binpkgs=binpkgs, installed=installed, sets=sets - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + sets=sets, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() def testMultirepoUserConfig(self): ebuilds = { @@ -382,12 +400,20 @@ class MultirepoTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, installed=installed, user_config=user_config - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + user_config["make.conf"] = ('BINPKG_FORMAT="%s"' % binpkg_format,) + playground = ResolverPlayground( + ebuilds=ebuilds, installed=installed, user_config=user_config + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_regular_slot_change_without_revbump.py b/lib/portage/tests/resolver/test_regular_slot_change_without_revbump.py index bb1ce0e87..127a175d1 100644 --- a/lib/portage/tests/resolver/test_regular_slot_change_without_revbump.py +++ b/lib/portage/tests/resolver/test_regular_slot_change_without_revbump.py @@ -1,11 +1,16 @@ # Copyright 2013 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class RegularSlotChangeWithoutRevBumpTestCase(TestCase): @@ -42,16 +47,26 @@ class RegularSlotChangeWithoutRevBumpTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_simple.py b/lib/portage/tests/resolver/test_simple.py index 9bcf446be..b2bfd5fdf 100644 --- a/lib/portage/tests/resolver/test_simple.py +++ b/lib/portage/tests/resolver/test_simple.py @@ -1,11 +1,16 @@ # Copyright 2010-2020 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SimpleResolverTestCase(TestCase): @@ -75,12 +80,23 @@ class SimpleResolverTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, binpkgs=binpkgs, installed=installed - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_slot_abi.py b/lib/portage/tests/resolver/test_slot_abi.py index afab001df..2d99fb676 100644 --- a/lib/portage/tests/resolver/test_slot_abi.py +++ b/lib/portage/tests/resolver/test_slot_abi.py @@ -1,11 +1,16 @@ # Copyright 2012-2019 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SlotAbiTestCase(TestCase): @@ -118,19 +123,29 @@ class SlotAbiTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() def testWholeSlot(self): ebuilds = { @@ -243,19 +258,29 @@ class SlotAbiTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() def testWholeSlotConditional(self): ebuilds = { @@ -447,16 +472,26 @@ class SlotAbiTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_slot_abi_downgrade.py b/lib/portage/tests/resolver/test_slot_abi_downgrade.py index badd31b2d..15c6a7bfc 100644 --- a/lib/portage/tests/resolver/test_slot_abi_downgrade.py +++ b/lib/portage/tests/resolver/test_slot_abi_downgrade.py @@ -1,11 +1,16 @@ # Copyright 2012-2019 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SlotAbiDowngradeTestCase(TestCase): @@ -96,19 +101,29 @@ class SlotAbiDowngradeTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() def testWholeSlotSubSlotMix(self): ebuilds = { @@ -197,16 +212,26 @@ class SlotAbiDowngradeTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_slot_change_without_revbump.py b/lib/portage/tests/resolver/test_slot_change_without_revbump.py index 3dbd4f75e..c1c727caf 100644 --- a/lib/portage/tests/resolver/test_slot_change_without_revbump.py +++ b/lib/portage/tests/resolver/test_slot_change_without_revbump.py @@ -1,11 +1,16 @@ # Copyright 2013 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SlotChangeWithoutRevBumpTestCase(TestCase): @@ -71,16 +76,25 @@ class SlotChangeWithoutRevBumpTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_slot_operator_autounmask.py b/lib/portage/tests/resolver/test_slot_operator_autounmask.py index 7d6c3af26..b8b502a68 100644 --- a/lib/portage/tests/resolver/test_slot_operator_autounmask.py +++ b/lib/portage/tests/resolver/test_slot_operator_autounmask.py @@ -1,11 +1,16 @@ # Copyright 2013-2019 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SlotOperatorAutoUnmaskTestCase(TestCase): @@ -109,16 +114,25 @@ class SlotOperatorAutoUnmaskTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_slot_operator_bdeps.py b/lib/portage/tests/resolver/test_slot_operator_bdeps.py index 0b1f426b7..6f0e5f7e1 100644 --- a/lib/portage/tests/resolver/test_slot_operator_bdeps.py +++ b/lib/portage/tests/resolver/test_slot_operator_bdeps.py @@ -1,11 +1,13 @@ # Copyright 2020 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SlotOperatorBdependTestCase(TestCase): @@ -88,20 +90,28 @@ class SlotOperatorBdependTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.debug = False + playground.cleanup() def testSlotOperatorBdependAfterBreakage(self): """ @@ -182,17 +192,25 @@ class SlotOperatorBdependTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.debug = False - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.debug = False + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_slot_operator_rebuild.py b/lib/portage/tests/resolver/test_slot_operator_rebuild.py index 9e2325afb..9ad2bc7ab 100644 --- a/lib/portage/tests/resolver/test_slot_operator_rebuild.py +++ b/lib/portage/tests/resolver/test_slot_operator_rebuild.py @@ -1,11 +1,16 @@ # Copyright 2014-2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SlotOperatorRebuildTestCase(TestCase): @@ -72,16 +77,25 @@ class SlotOperatorRebuildTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + world=world, + debug=False, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_slot_operator_unsolved.py b/lib/portage/tests/resolver/test_slot_operator_unsolved.py index d43e8367d..945e34ccf 100644 --- a/lib/portage/tests/resolver/test_slot_operator_unsolved.py +++ b/lib/portage/tests/resolver/test_slot_operator_unsolved.py @@ -1,11 +1,16 @@ # Copyright 2013 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class SlotOperatorUnsolvedTestCase(TestCase): @@ -71,17 +76,25 @@ class SlotOperatorUnsolvedTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - user_config=user_config, - world=world, - debug=False, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + _user_config = user_config.copy() + _user_config["make.conf"] += ('BINPKG_FORMAT="%s"' % binpkg_format,) + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + user_config=_user_config, + world=world, + debug=False, + ) + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() diff --git a/lib/portage/tests/resolver/test_useflags.py b/lib/portage/tests/resolver/test_useflags.py index 6d74807e5..340ac5de2 100644 --- a/lib/portage/tests/resolver/test_useflags.py +++ b/lib/portage/tests/resolver/test_useflags.py @@ -1,11 +1,16 @@ # Copyright 2014 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys + +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ( ResolverPlayground, ResolverPlaygroundTestCase, ) +from portage.output import colorize class UseFlagsTestCase(TestCase): @@ -118,15 +123,23 @@ class UseFlagsTestCase(TestCase): ), ) - playground = ResolverPlayground( - ebuilds=ebuilds, - binpkgs=binpkgs, - installed=installed, - user_config=user_config, - ) - try: - for test_case in test_cases: - playground.run_TestCase(test_case) - self.assertEqual(test_case.test_success, True, test_case.fail_msg) - finally: - playground.cleanup() + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + user_config["make.conf"] = ('BINPKG_FORMAT="%s"' % binpkg_format,) + playground = ResolverPlayground( + ebuilds=ebuilds, + binpkgs=binpkgs, + installed=installed, + user_config=user_config, + ) + + try: + for test_case in test_cases: + playground.run_TestCase(test_case) + self.assertEqual( + test_case.test_success, True, test_case.fail_msg + ) + finally: + playground.cleanup() diff --git a/lib/portage/tests/runTests.py b/lib/portage/tests/runTests.py index 85b746092..167b40d5a 100755 --- a/lib/portage/tests/runTests.py +++ b/lib/portage/tests/runTests.py @@ -9,7 +9,10 @@ import os.path as osp import platform import pwd import signal +import tempfile +import shutil import sys +from distutils.dir_util import copy_tree def debug_signal(signum, frame): @@ -63,8 +66,17 @@ if insert_bin_path: path.insert(0, PORTAGE_BIN_PATH) os.environ["PATH"] = ":".join(path) +# Copy GPG test keys to temporary directory +gpg_path = tempfile.mkdtemp(prefix="gpg_") + +copy_tree(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".gnupg"), gpg_path) + +os.chmod(gpg_path, 0o700) +os.environ["PORTAGE_GNUPGHOME"] = gpg_path + if __name__ == "__main__": try: sys.exit(tests.main()) finally: global_event_loop().close() + shutil.rmtree(gpg_path, ignore_errors=True) diff --git a/lib/portage/tests/update/test_move_ent.py b/lib/portage/tests/update/test_move_ent.py index ba5add989..cb9bb5243 100644 --- a/lib/portage/tests/update/test_move_ent.py +++ b/lib/portage/tests/update/test_move_ent.py @@ -1,14 +1,18 @@ # Copyright 2012-2021 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys import textwrap import portage from portage import os +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ResolverPlayground from portage.util import ensure_dirs from portage._global_updates import _do_global_updates +from portage.output import colorize class MoveEntTestCase(TestCase): @@ -47,60 +51,73 @@ class MoveEntTestCase(TestCase): """ ) - playground = ResolverPlayground( - binpkgs=binpkgs, ebuilds=ebuilds, installed=installed - ) - - settings = playground.settings - trees = playground.trees - eroot = settings["EROOT"] - test_repo_location = settings.repositories["test_repo"].location - portdb = trees[eroot]["porttree"].dbapi - vardb = trees[eroot]["vartree"].dbapi - bindb = trees[eroot]["bintree"].dbapi - - updates_dir = os.path.join(test_repo_location, "profiles", "updates") - - try: - ensure_dirs(updates_dir) - with open(os.path.join(updates_dir, "1Q-2010"), "w") as f: - f.write(updates) - - # Create an empty updates directory, so that this - # repo doesn't inherit updates from the main repo. - ensure_dirs( - os.path.join( - portdb.getRepositoryPath("dont_apply_updates"), - "profiles", - "updates", + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + binpkgs=binpkgs, + ebuilds=ebuilds, + installed=installed, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, ) - ) - - global_noiselimit = portage.util.noiselimit - portage.util.noiselimit = -2 - try: - _do_global_updates(trees, {}) - finally: - portage.util.noiselimit = global_noiselimit - - # Workaround for cache validation not working - # correctly when filesystem has timestamp precision - # of 1 second. - vardb._clear_cache() - - # A -> A-moved - self.assertRaises(KeyError, vardb.aux_get, "dev-libs/A-1", ["EAPI"]) - vardb.aux_get("dev-libs/A-moved-1", ["EAPI"]) - # The original package should still exist because a binary - # package move is a copy on write operation. - bindb.aux_get("dev-libs/A-1", ["EAPI"]) - bindb.aux_get("dev-libs/A-moved-1", ["EAPI"]) - - # dont_apply_updates - self.assertRaises(KeyError, vardb.aux_get, "dev-libs/A-moved-2", ["EAPI"]) - vardb.aux_get("dev-libs/A-2", ["EAPI"]) - self.assertRaises(KeyError, bindb.aux_get, "dev-libs/A-moved-2", ["EAPI"]) - bindb.aux_get("dev-libs/A-2", ["EAPI"]) - - finally: - playground.cleanup() + + settings = playground.settings + trees = playground.trees + eroot = settings["EROOT"] + test_repo_location = settings.repositories["test_repo"].location + portdb = trees[eroot]["porttree"].dbapi + vardb = trees[eroot]["vartree"].dbapi + bindb = trees[eroot]["bintree"].dbapi + + updates_dir = os.path.join(test_repo_location, "profiles", "updates") + + try: + ensure_dirs(updates_dir) + with open(os.path.join(updates_dir, "1Q-2010"), "w") as f: + f.write(updates) + + # Create an empty updates directory, so that this + # repo doesn't inherit updates from the main repo. + ensure_dirs( + os.path.join( + portdb.getRepositoryPath("dont_apply_updates"), + "profiles", + "updates", + ) + ) + + global_noiselimit = portage.util.noiselimit + portage.util.noiselimit = -2 + try: + _do_global_updates(trees, {}) + finally: + portage.util.noiselimit = global_noiselimit + + # Workaround for cache validation not working + # correctly when filesystem has timestamp precision + # of 1 second. + vardb._clear_cache() + + # A -> A-moved + self.assertRaises(KeyError, vardb.aux_get, "dev-libs/A-1", ["EAPI"]) + vardb.aux_get("dev-libs/A-moved-1", ["EAPI"]) + # The original package should still exist because a binary + # package move is a copy on write operation. + bindb.aux_get("dev-libs/A-1", ["EAPI"]) + bindb.aux_get("dev-libs/A-moved-1", ["EAPI"]) + + # dont_apply_updates + self.assertRaises( + KeyError, vardb.aux_get, "dev-libs/A-moved-2", ["EAPI"] + ) + vardb.aux_get("dev-libs/A-2", ["EAPI"]) + self.assertRaises( + KeyError, bindb.aux_get, "dev-libs/A-moved-2", ["EAPI"] + ) + bindb.aux_get("dev-libs/A-2", ["EAPI"]) + + finally: + playground.cleanup() diff --git a/lib/portage/tests/update/test_move_slot_ent.py b/lib/portage/tests/update/test_move_slot_ent.py index 87e38f97e..27d51fbb1 100644 --- a/lib/portage/tests/update/test_move_slot_ent.py +++ b/lib/portage/tests/update/test_move_slot_ent.py @@ -1,14 +1,18 @@ # Copyright 2012-2019 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys import textwrap import portage from portage import os +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ResolverPlayground from portage.util import ensure_dirs from portage._global_updates import _do_global_updates +from portage.output import colorize class MoveSlotEntTestCase(TestCase): @@ -75,63 +79,80 @@ class MoveSlotEntTestCase(TestCase): """ ) - playground = ResolverPlayground( - binpkgs=binpkgs, ebuilds=ebuilds, installed=installed - ) - - settings = playground.settings - trees = playground.trees - eroot = settings["EROOT"] - test_repo_location = settings.repositories["test_repo"].location - portdb = trees[eroot]["porttree"].dbapi - vardb = trees[eroot]["vartree"].dbapi - bindb = trees[eroot]["bintree"].dbapi - - updates_dir = os.path.join(test_repo_location, "profiles", "updates") - - try: - ensure_dirs(updates_dir) - with open(os.path.join(updates_dir, "1Q-2010"), "w") as f: - f.write(updates) - - # Create an empty updates directory, so that this - # repo doesn't inherit updates from the main repo. - ensure_dirs( - os.path.join( - portdb.getRepositoryPath("dont_apply_updates"), - "profiles", - "updates", + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + binpkgs=binpkgs, + ebuilds=ebuilds, + installed=installed, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, ) - ) - - global_noiselimit = portage.util.noiselimit - portage.util.noiselimit = -2 - try: - _do_global_updates(trees, {}) - finally: - portage.util.noiselimit = global_noiselimit - - # Workaround for cache validation not working - # correctly when filesystem has timestamp precision - # of 1 second. - vardb._clear_cache() - - # 0/2.30 -> 2/2.30 - self.assertEqual("2/2.30", vardb.aux_get("dev-libs/A-1", ["SLOT"])[0]) - self.assertEqual("2/2.30", bindb.aux_get("dev-libs/A-1", ["SLOT"])[0]) - - # 0 -> 1 - self.assertEqual("1", vardb.aux_get("dev-libs/B-1", ["SLOT"])[0]) - self.assertEqual("1", bindb.aux_get("dev-libs/B-1", ["SLOT"])[0]) - - # 0/1 -> 1 (equivalent to 1/1) - self.assertEqual("1", vardb.aux_get("dev-libs/C-1", ["SLOT"])[0]) - self.assertEqual("1", bindb.aux_get("dev-libs/C-1", ["SLOT"])[0]) - - # dont_apply_updates - self.assertEqual("0/2.30", bindb.aux_get("dev-libs/A-2", ["SLOT"])[0]) - self.assertEqual("0", bindb.aux_get("dev-libs/B-2", ["SLOT"])[0]) - self.assertEqual("0/2.1", bindb.aux_get("dev-libs/C-2.1", ["SLOT"])[0]) - - finally: - playground.cleanup() + + settings = playground.settings + trees = playground.trees + eroot = settings["EROOT"] + test_repo_location = settings.repositories["test_repo"].location + portdb = trees[eroot]["porttree"].dbapi + vardb = trees[eroot]["vartree"].dbapi + bindb = trees[eroot]["bintree"].dbapi + + updates_dir = os.path.join(test_repo_location, "profiles", "updates") + + try: + ensure_dirs(updates_dir) + with open(os.path.join(updates_dir, "1Q-2010"), "w") as f: + f.write(updates) + + # Create an empty updates directory, so that this + # repo doesn't inherit updates from the main repo. + ensure_dirs( + os.path.join( + portdb.getRepositoryPath("dont_apply_updates"), + "profiles", + "updates", + ) + ) + + global_noiselimit = portage.util.noiselimit + portage.util.noiselimit = -2 + try: + _do_global_updates(trees, {}) + finally: + portage.util.noiselimit = global_noiselimit + + # Workaround for cache validation not working + # correctly when filesystem has timestamp precision + # of 1 second. + vardb._clear_cache() + + # 0/2.30 -> 2/2.30 + self.assertEqual( + "2/2.30", vardb.aux_get("dev-libs/A-1", ["SLOT"])[0] + ) + self.assertEqual( + "2/2.30", bindb.aux_get("dev-libs/A-1", ["SLOT"])[0] + ) + + # 0 -> 1 + self.assertEqual("1", vardb.aux_get("dev-libs/B-1", ["SLOT"])[0]) + self.assertEqual("1", bindb.aux_get("dev-libs/B-1", ["SLOT"])[0]) + + # 0/1 -> 1 (equivalent to 1/1) + self.assertEqual("1", vardb.aux_get("dev-libs/C-1", ["SLOT"])[0]) + self.assertEqual("1", bindb.aux_get("dev-libs/C-1", ["SLOT"])[0]) + + # dont_apply_updates + self.assertEqual( + "0/2.30", bindb.aux_get("dev-libs/A-2", ["SLOT"])[0] + ) + self.assertEqual("0", bindb.aux_get("dev-libs/B-2", ["SLOT"])[0]) + self.assertEqual( + "0/2.1", bindb.aux_get("dev-libs/C-2.1", ["SLOT"])[0] + ) + + finally: + playground.cleanup() diff --git a/lib/portage/tests/update/test_update_dbentry.py b/lib/portage/tests/update/test_update_dbentry.py index bed0f4b7c..3b49cba9d 100644 --- a/lib/portage/tests/update/test_update_dbentry.py +++ b/lib/portage/tests/update/test_update_dbentry.py @@ -1,11 +1,14 @@ # Copyright 2012-2013 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +from __future__ import print_function +import sys import re import textwrap import portage from portage import os +from portage.const import SUPPORTED_GENTOO_BINPKG_FORMATS from portage.dep import Atom from portage.tests import TestCase from portage.tests.resolver.ResolverPlayground import ResolverPlayground @@ -13,6 +16,7 @@ from portage.update import update_dbentry from portage.util import ensure_dirs from portage.versions import _pkg_str from portage._global_updates import _do_global_updates +from portage.output import colorize class UpdateDbentryTestCase(TestCase): @@ -224,98 +228,110 @@ class UpdateDbentryTestCase(TestCase): """ ) - playground = ResolverPlayground( - binpkgs=binpkgs, ebuilds=ebuilds, installed=installed, world=world - ) + for binpkg_format in SUPPORTED_GENTOO_BINPKG_FORMATS: + with self.subTest(binpkg_format=binpkg_format): + print(colorize("HILITE", binpkg_format), end=" ... ") + sys.stdout.flush() + playground = ResolverPlayground( + binpkgs=binpkgs, + ebuilds=ebuilds, + installed=installed, + world=world, + user_config={ + "make.conf": ('BINPKG_FORMAT="%s"' % binpkg_format,), + }, + ) - settings = playground.settings - trees = playground.trees - eroot = settings["EROOT"] - test_repo_location = settings.repositories["test_repo"].location - portdb = trees[eroot]["porttree"].dbapi - vardb = trees[eroot]["vartree"].dbapi - bindb = trees[eroot]["bintree"].dbapi - setconfig = trees[eroot]["root_config"].setconfig - selected_set = setconfig.getSets()["selected"] + settings = playground.settings + trees = playground.trees + eroot = settings["EROOT"] + test_repo_location = settings.repositories["test_repo"].location + portdb = trees[eroot]["porttree"].dbapi + vardb = trees[eroot]["vartree"].dbapi + bindb = trees[eroot]["bintree"].dbapi + setconfig = trees[eroot]["root_config"].setconfig + selected_set = setconfig.getSets()["selected"] - updates_dir = os.path.join(test_repo_location, "profiles", "updates") + updates_dir = os.path.join(test_repo_location, "profiles", "updates") - try: - ensure_dirs(updates_dir) - with open(os.path.join(updates_dir, "1Q-2010"), "w") as f: - f.write(updates) + try: + ensure_dirs(updates_dir) + with open(os.path.join(updates_dir, "1Q-2010"), "w") as f: + f.write(updates) - # Create an empty updates directory, so that this - # repo doesn't inherit updates from the main repo. - ensure_dirs( - os.path.join( - portdb.getRepositoryPath("dont_apply_updates"), - "profiles", - "updates", - ) - ) + # Create an empty updates directory, so that this + # repo doesn't inherit updates from the main repo. + ensure_dirs( + os.path.join( + portdb.getRepositoryPath("dont_apply_updates"), + "profiles", + "updates", + ) + ) - global_noiselimit = portage.util.noiselimit - portage.util.noiselimit = -2 - try: - _do_global_updates(trees, {}) - finally: - portage.util.noiselimit = global_noiselimit + global_noiselimit = portage.util.noiselimit + portage.util.noiselimit = -2 + try: + _do_global_updates(trees, {}) + finally: + portage.util.noiselimit = global_noiselimit - # Workaround for cache validation not working - # correctly when filesystem has timestamp precision - # of 1 second. - vardb._clear_cache() + # Workaround for cache validation not working + # correctly when filesystem has timestamp precision + # of 1 second. + vardb._clear_cache() - # M -> M-moved - old_pattern = re.compile(r"\bdev-libs/M(\s|$)") - rdepend = vardb.aux_get("dev-libs/A-1", ["RDEPEND"])[0] - self.assertTrue(old_pattern.search(rdepend) is None) - self.assertTrue("dev-libs/M-moved" in rdepend) - rdepend = bindb.aux_get("dev-libs/A-1", ["RDEPEND"])[0] - self.assertTrue(old_pattern.search(rdepend) is None) - self.assertTrue("dev-libs/M-moved" in rdepend) - rdepend = vardb.aux_get("dev-libs/B-1", ["RDEPEND"])[0] - self.assertTrue(old_pattern.search(rdepend) is None) - self.assertTrue("dev-libs/M-moved" in rdepend) - rdepend = vardb.aux_get("dev-libs/B-1", ["RDEPEND"])[0] - self.assertTrue(old_pattern.search(rdepend) is None) - self.assertTrue("dev-libs/M-moved" in rdepend) + # M -> M-moved + old_pattern = re.compile(r"\bdev-libs/M(\s|$)") + rdepend = vardb.aux_get("dev-libs/A-1", ["RDEPEND"])[0] + self.assertTrue(old_pattern.search(rdepend) is None) + self.assertTrue("dev-libs/M-moved" in rdepend) + rdepend = bindb.aux_get("dev-libs/A-1", ["RDEPEND"])[0] + self.assertTrue(old_pattern.search(rdepend) is None) + self.assertTrue("dev-libs/M-moved" in rdepend) + rdepend = vardb.aux_get("dev-libs/B-1", ["RDEPEND"])[0] + self.assertTrue(old_pattern.search(rdepend) is None) + self.assertTrue("dev-libs/M-moved" in rdepend) + rdepend = vardb.aux_get("dev-libs/B-1", ["RDEPEND"])[0] + self.assertTrue(old_pattern.search(rdepend) is None) + self.assertTrue("dev-libs/M-moved" in rdepend) - # EAPI 4-python/*-progress N -> N.moved - rdepend = vardb.aux_get("dev-libs/B-1", ["RDEPEND"])[0] - old_pattern = re.compile(r"\bdev-libs/N(\s|$)") - self.assertTrue(old_pattern.search(rdepend) is None) - self.assertTrue("dev-libs/N.moved" in rdepend) - rdepend = bindb.aux_get("dev-libs/B-1", ["RDEPEND"])[0] - self.assertTrue(old_pattern.search(rdepend) is None) - self.assertTrue("dev-libs/N.moved" in rdepend) - self.assertRaises(KeyError, vardb.aux_get, "dev-libs/N-2", ["EAPI"]) - vardb.aux_get("dev-libs/N.moved-2", ["RDEPEND"])[0] + # EAPI 4-python/*-progress N -> N.moved + rdepend = vardb.aux_get("dev-libs/B-1", ["RDEPEND"])[0] + old_pattern = re.compile(r"\bdev-libs/N(\s|$)") + self.assertTrue(old_pattern.search(rdepend) is None) + self.assertTrue("dev-libs/N.moved" in rdepend) + rdepend = bindb.aux_get("dev-libs/B-1", ["RDEPEND"])[0] + self.assertTrue(old_pattern.search(rdepend) is None) + self.assertTrue("dev-libs/N.moved" in rdepend) + self.assertRaises(KeyError, vardb.aux_get, "dev-libs/N-2", ["EAPI"]) + vardb.aux_get("dev-libs/N.moved-2", ["RDEPEND"])[0] - # EAPI 4 does not allow dots in package names for N -> N.moved - rdepend = vardb.aux_get("dev-libs/A-1", ["RDEPEND"])[0] - self.assertTrue("dev-libs/N" in rdepend) - self.assertTrue("dev-libs/N.moved" not in rdepend) - rdepend = bindb.aux_get("dev-libs/A-1", ["RDEPEND"])[0] - self.assertTrue("dev-libs/N" in rdepend) - self.assertTrue("dev-libs/N.moved" not in rdepend) - vardb.aux_get("dev-libs/N-1", ["RDEPEND"])[0] - self.assertRaises(KeyError, vardb.aux_get, "dev-libs/N.moved-1", ["EAPI"]) + # EAPI 4 does not allow dots in package names for N -> N.moved + rdepend = vardb.aux_get("dev-libs/A-1", ["RDEPEND"])[0] + self.assertTrue("dev-libs/N" in rdepend) + self.assertTrue("dev-libs/N.moved" not in rdepend) + rdepend = bindb.aux_get("dev-libs/A-1", ["RDEPEND"])[0] + self.assertTrue("dev-libs/N" in rdepend) + self.assertTrue("dev-libs/N.moved" not in rdepend) + vardb.aux_get("dev-libs/N-1", ["RDEPEND"])[0] + self.assertRaises( + KeyError, vardb.aux_get, "dev-libs/N.moved-1", ["EAPI"] + ) - # dont_apply_updates - rdepend = vardb.aux_get("dev-libs/A-2", ["RDEPEND"])[0] - self.assertTrue("dev-libs/M" in rdepend) - self.assertTrue("dev-libs/M-moved" not in rdepend) - rdepend = bindb.aux_get("dev-libs/A-2", ["RDEPEND"])[0] - self.assertTrue("dev-libs/M" in rdepend) - self.assertTrue("dev-libs/M-moved" not in rdepend) + # dont_apply_updates + rdepend = vardb.aux_get("dev-libs/A-2", ["RDEPEND"])[0] + self.assertTrue("dev-libs/M" in rdepend) + self.assertTrue("dev-libs/M-moved" not in rdepend) + rdepend = bindb.aux_get("dev-libs/A-2", ["RDEPEND"])[0] + self.assertTrue("dev-libs/M" in rdepend) + self.assertTrue("dev-libs/M-moved" not in rdepend) - selected_set.load() - self.assertTrue("dev-libs/M" not in selected_set) - self.assertTrue("dev-libs/M-moved" in selected_set) - self.assertTrue("dev-libs/N" not in selected_set) - self.assertTrue("dev-libs/N.moved" in selected_set) + selected_set.load() + self.assertTrue("dev-libs/M" not in selected_set) + self.assertTrue("dev-libs/M-moved" in selected_set) + self.assertTrue("dev-libs/N" not in selected_set) + self.assertTrue("dev-libs/N.moved" in selected_set) - finally: - playground.cleanup() + finally: + playground.cleanup() diff --git a/lib/portage/util/_urlopen.py b/lib/portage/util/_urlopen.py index 70440c3e1..22f0e08df 100644 --- a/lib/portage/util/_urlopen.py +++ b/lib/portage/util/_urlopen.py @@ -26,7 +26,7 @@ def have_pep_476(): return hasattr(__import__("ssl"), "_create_unverified_context") -def urlopen(url, if_modified_since=None, proxies=None): +def urlopen(url, if_modified_since=None, headers={}, proxies=None): parse_result = urllib_parse.urlparse(url) if parse_result.scheme not in ("http", "https"): return _urlopen(url) @@ -45,6 +45,8 @@ def urlopen(url, if_modified_since=None, proxies=None): password_manager = urllib_request.HTTPPasswordMgrWithDefaultRealm() request = urllib_request.Request(url) request.add_header("User-Agent", "Gentoo Portage") + for key in headers: + request.add_header(key, headers[key]) if if_modified_since: request.add_header("If-Modified-Since", _timestamp_to_http(if_modified_since)) if parse_result.username is not None: diff --git a/lib/portage/versions.py b/lib/portage/versions.py index fe1ff6ce0..086ada555 100644 --- a/lib/portage/versions.py +++ b/lib/portage/versions.py @@ -529,6 +529,22 @@ class _pkg_str(str): self.__dict__["_stable"] = stable return stable + @property + def binpkg_format(self): + """ + Returns the BINPKG_FORMAT metadata. A return value of None means + that the format is unset. If there is no metadata available or the + BINPKG_FORMAT key is missing from the metadata, then raise + AttributeError. + + @rtype: str or None + @return: a non-empty BINPKG_FORMAT string, or None + """ + try: + return self._metadata["BINPKG_FORMAT"] or None + except (AttributeError, KeyError): + raise AttributeError("binpkg_format") + def pkgsplit(mypkg, silent=1, eapi=None): """ diff --git a/man/make.conf.5 b/man/make.conf.5 index 868a2ca50..8afde4a00 100644 --- a/man/make.conf.5 +++ b/man/make.conf.5 @@ -137,6 +137,55 @@ Defaults to "". BINPKG_COMPRESS_FLAGS="-9" .fi .TP +\fBBINPKG_GPG_SIGNING_BASE_COMMAND\fR = \fI"GPG command and arguments \ +[PORTAGE_CONFIG]"\fR +The base command will be used for all signing operations. +Portage will replace \fB[PORTAGE_CONFIG]\fR under different operations. +Please do not add arguments that can be configured independently. +.br +Defaults to "/usr/bin/flock /run/lock/portage-binpkg-gpg.lock /usr/bin/gpg \ +--sign --armor [PORTAGE_CONFIG]". +.br +.TP +\fBBINPKG_GPG_SIGNING_DIGEST=\fR = \fI"GPG supported digest"\fR +The digest that will be used for signature. +.br +Defaults to "SHA512" +.br +.TP +\fBBINPKG_GPG_SIGNING_GPG_HOME\fR = \fI[path]\fR +The GPG home where the signing private key located. +.br +Defaults to "/root/.gnupg" +.br +.TP +\fBBINPKG_GPG_SIGNING_KEY\fR = \fI"GPG key ID"\fR +GPG key ID used to sign binary packages, must exists in \ +\fBBINPKG_GPG_SIGNING_GPG_HOME\fR. +.br +Defaults to "" +.br +Example: "0x40DCF18E97150795!" +.br +.TP +\fBBINPKG_GPG_VERIFY_BASE_COMMAND=\fR = \fI"GPG command and arguments"\fR +The base command will be used for all verify operations. +Portage will replace \fB[PORTAGE_CONFIG]\fR and \fB[SIGNATURE]\fR under \ +different operations. +Please do not add arguments that can be configured independently. +.br +Defaults to "/usr/bin/gpg --verify --batch --no-tty --no-auto-check-trustdb \ +--status-fd 2 [PORTAGE_CONFIG] [SIGNATURE]" +.br +.TP +\fBBINPKG_GPG_VERIFY_GPG_HOME\fR = \fI[path]\fR +The GPG home where the trusted keys located. Please make sure the target \ +directory is globally readable, as the user will be dropped to \fBnobody\fR \ +during verification. +.br +Defaults to "/etc/portage/gnupg" +.br +.TP .B CBUILD This variable is passed by the \fIebuild scripts\fR to the \fIconfigure\fR as \fI\-\-build=${CBUILD}\fR only if it is defined. Do not set this yourself @@ -286,6 +335,11 @@ strips (or splits) them before installing. \fBbinpkg\-dostrip\fR must be enabled for \fBinstallsources\fR to work. .TP +.B binpkg-ignore-signature +This will disable GPG signature check for all binary packages. Enable this +could be dangerous if you get binary packages from remote site or use third +party packages. +.TP .B binpkg\-logs Keep logs from successful binary package merges. This is relevant only when \fBPORTAGE_LOGDIR\fR is set. @@ -318,6 +372,16 @@ It is also possible to remove packages manually, and then run \(aqemaint \-\-fix binhost' to update the ${PKGDIR}/Packages index. This feature is enabled by default. .TP +.B binpkg-request-signature +Binary packages are requested to be signed by trusted GPG signature. +Portage will reject to process any binary package without a valid GPG +signature. The verify command is defined in +\fBBINPKG_GPG_VERIFY_COMMAND\fR variable. +.TP +.B binpkg-signing +Binary packages will be signed by given GPG command. The signing command +is defined in \fBBINPKG_GPG_SIGNING_COMMAND\fR variable. +.TP .B buildpkg Binary packages will be created for all packages that are merged. Also see \fBquickpkg\fR(1) and \fBemerge\fR(1) \fB\-\-buildpkg\fR and @@ -451,6 +515,10 @@ for all EAPIs (for obvious reasons). Force emerges to always try to fetch files from the \fIPORTAGE_BINHOST\fR. See \fBmake.conf\fR(5) for more information. .TP +.B gpg-keepalive +Run GPG unlock command every 5 mins to avoid the passphrase expired. +If your GPG is auto unlocked on login, you do not need this. +.TP .B icecream Enable portage support for the icecream package. .TP @@ -781,6 +849,18 @@ the \fIebuild scripts\fR. Merging 'mirrorselect' can help. Entries in this variable that have no protocol and simply start with a '/' path separator may be used to specify mounted filesystem mirrors. .TP +\fBGPG_VERIFY_GROUP_DROP\fR = \fI[group]\fR +The group name used to drop root privileges during verification. +.br +Defaults to "nogroup" +.br +.TP +\fBGPG_VERIFY_USER_DROP\fR = \fI[user]\fR +The user name used to drop root privileges during verification. +.br +Defaults to "nobody" +.br +.TP \fBhttp_proxy ftp_proxy RSYNC_PROXY\fR = \fI[protocol://host:port]\fR These variables are used by network clients such as \fBwget\fR(1) and \fBrsync\fR(1). They are only required if you use a diff --git a/repoman/lib/repoman/tests/.gnupg/openpgp-revocs.d/06B3A311BD775C280D22A9305D90EA06352177F6.rev b/repoman/lib/repoman/tests/.gnupg/openpgp-revocs.d/06B3A311BD775C280D22A9305D90EA06352177F6.rev new file mode 100644 index 000000000..a6752fd30 --- /dev/null +++ b/repoman/lib/repoman/tests/.gnupg/openpgp-revocs.d/06B3A311BD775C280D22A9305D90EA06352177F6.rev @@ -0,0 +1,37 @@ +This is a revocation certificate for the OpenPGP key: + +pub rsa4096 2020-07-14 [S] + 06B3A311BD775C280D22A9305D90EA06352177F6 +uid Gentoo Portage Test Trusted Key (Test Only, Do NOT Trust!!!) (Gentoo Test Key) <test@example.org> + +A revocation certificate is a kind of "kill switch" to publicly +declare that a key shall not anymore be used. It is not possible +to retract such a revocation certificate once it has been published. + +Use it to revoke this key in case of a compromise or loss of +the secret key. However, if the secret key is still accessible, +it is better to generate a new revocation certificate and give +a reason for the revocation. For details see the description of +of the gpg command "--generate-revocation" in the GnuPG manual. + +To avoid an accidental use of this file, a colon has been inserted +before the 5 dashes below. Remove this colon with a text editor +before importing and publishing this revocation certificate. + +:-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: This is a revocation certificate + +iQI2BCABCAAgFiEEBrOjEb13XCgNIqkwXZDqBjUhd/YFAl8OFTwCHQAACgkQXZDq +BjUhd/aXCA/+OgzosMDaDe5DNwkSi2yKdC2X18v8JcaYnXBUR93nXA0LVN7iVWkR +WEH3NuVspQZ5vK+3AHTKabqZFC/buA5oQOH01Ncd4lQISfOOhFiBn5DIPX31BVT0 +iPmVkcxHAD4031ptP4oat6EFclT13SRchtlnAO04JofeHnzQIw3SozQGzXpAA1g4 +BogQ0HWA88HzuEYYE+e/yzZL4D496X1DTaXksg0Py5c4SS6u5pND6lcUtAGxAwa9 +sJFPs+coeURaRV99CrJfdh4u2OkvINTfrKOS6NFBQq6HVH5mLsRXZlcE4Oo4d+fN +XoPrTZnRUqpJADUdjHFvO/lr0fArJTS5IQCVBNFeCMlvgmUPeKWJ1r6Uiwe/UHor +9OP/tK97EqpsaXmHbo0jOUkn5iiUwy784+JBSSu/Q2NxqcBr74aaRdfxvs62dmv7 +droCDQi3ebqTdnlDSaeCIWHyVlSroOhZ+ZETVy193K1X7VXFX3hYKiJ3G8QZwy3e +AlsVGjIHWfC+K+enIn+uwSUvOWPN3upK8kqMRuXvAOppFCE4sTqNbxUnHHXaqo/r +s1q6zVsWVILBk97BHlJph2IaqhV7iIgPU97/r4U/BT11VqDFdVSHcXcs4PDNs5vh +6qttaDiyDqZjwMr+0iDoouHxFpqY8e+3M2gycUgGr2XV6ML0pXE6BqA= +=nIjC +-----END PGP PUBLIC KEY BLOCK----- diff --git a/repoman/lib/repoman/tests/.gnupg/openpgp-revocs.d/8DEDA2CDED49C8809287B89D8812797DDF1DD192.rev b/repoman/lib/repoman/tests/.gnupg/openpgp-revocs.d/8DEDA2CDED49C8809287B89D8812797DDF1DD192.rev new file mode 100644 index 000000000..456e0aa50 --- /dev/null +++ b/repoman/lib/repoman/tests/.gnupg/openpgp-revocs.d/8DEDA2CDED49C8809287B89D8812797DDF1DD192.rev @@ -0,0 +1,37 @@ +This is a revocation certificate for the OpenPGP key: + +pub rsa4096 2020-07-14 [S] + 8DEDA2CDED49C8809287B89D8812797DDF1DD192 +uid Gentoo Portage Test Untrusted Key (Test Only, Do NOT Trust!!!) (Gentoo Test Key) <test@example.org> + +A revocation certificate is a kind of "kill switch" to publicly +declare that a key shall not anymore be used. It is not possible +to retract such a revocation certificate once it has been published. + +Use it to revoke this key in case of a compromise or loss of +the secret key. However, if the secret key is still accessible, +it is better to generate a new revocation certificate and give +a reason for the revocation. For details see the description of +of the gpg command "--generate-revocation" in the GnuPG manual. + +To avoid an accidental use of this file, a colon has been inserted +before the 5 dashes below. Remove this colon with a text editor +before importing and publishing this revocation certificate. + +:-----BEGIN PGP PUBLIC KEY BLOCK----- +Comment: This is a revocation certificate + +iQI2BCABCAAgFiEEje2ize1JyICSh7idiBJ5fd8d0ZIFAl8OFXUCHQAACgkQiBJ5 +fd8d0ZKdwxAAhmkC0V+OLyOU9PCV6ogD9/3b3nVqNIreoc+gxHTLmEvxiMSItqmq +DkcW9RJKAduA/HiLZQ8Yzxw+ldC6kuWqYEjNpSM54VDkrgOePi8W1bVDTCoSp7bo +0JOG4frieqIxA6lhAA2UppH7EPRXoODPLYqooNxWAs3xxVrR6eGAb5l8NXzrymvN +acFfOZ0s5FgADQskQHWVq6TaJn9DrcZxd+b+plSwPYDXqzTChKQ5jw7uMAPUvDkG +JUWgoKiKSrK64bslUq8aEDEZQ4uxjyEi6G0vO/wPL/ysGhS7KkPgCZsEfNjWjajb +jAsdvl1raoHxK/O7llMNr9uRAZtC56pJ//SRDc3kylZrkAo0RNoXQFowT739HWei +2UkCFDfz488VKKrOI8TzTyUvLFEo14ZAXGg1wdHaGnbYMzxpKjP15alOFo6fKIcS +Kz1f/Mab4wf4Sg0XAjQ9pnai1/U9ZF3/NSnRtYgJkLCrIEtRLrgSHJsLDPxjCfGV +jWszAbIk167aA0yKsSmuwkpc5bZqqBaTo904r857fxyt5Les6SOHsV7iNXt7F+am +03Y6u6m2eROba7M67l115vTyYcw5EZVp5j0nI81PXsC9X2DD1ci5xrNmPyEeupC4 +7y7mcGbUYPJAJHJ0kHG4ZYLnNMl42ZYr1ssEeasDwUsLWgVqvx9RkKI= +=kVUQ +-----END PGP PUBLIC KEY BLOCK----- diff --git a/repoman/lib/repoman/tests/.gnupg/private-keys-v1.d/273B030399E7BEA66A9AD42216DE7CA17BA5D42E.key b/repoman/lib/repoman/tests/.gnupg/private-keys-v1.d/273B030399E7BEA66A9AD42216DE7CA17BA5D42E.key Binary files differnew file mode 100644 index 000000000..0bd1026ad --- /dev/null +++ b/repoman/lib/repoman/tests/.gnupg/private-keys-v1.d/273B030399E7BEA66A9AD42216DE7CA17BA5D42E.key diff --git a/repoman/lib/repoman/tests/.gnupg/private-keys-v1.d/C99796FB85B0C3DF03314A11B5850C51167D6282.key b/repoman/lib/repoman/tests/.gnupg/private-keys-v1.d/C99796FB85B0C3DF03314A11B5850C51167D6282.key Binary files differnew file mode 100644 index 000000000..8e29ef43c --- /dev/null +++ b/repoman/lib/repoman/tests/.gnupg/private-keys-v1.d/C99796FB85B0C3DF03314A11B5850C51167D6282.key diff --git a/repoman/lib/repoman/tests/.gnupg/pubring.kbx b/repoman/lib/repoman/tests/.gnupg/pubring.kbx Binary files differnew file mode 100644 index 000000000..f6367f83b --- /dev/null +++ b/repoman/lib/repoman/tests/.gnupg/pubring.kbx diff --git a/repoman/lib/repoman/tests/.gnupg/trustdb.gpg b/repoman/lib/repoman/tests/.gnupg/trustdb.gpg Binary files differnew file mode 100644 index 000000000..db5b1023b --- /dev/null +++ b/repoman/lib/repoman/tests/.gnupg/trustdb.gpg diff --git a/repoman/lib/repoman/tests/runTests.py b/repoman/lib/repoman/tests/runTests.py index e23ded192..4a081551e 100644 --- a/repoman/lib/repoman/tests/runTests.py +++ b/repoman/lib/repoman/tests/runTests.py @@ -10,6 +10,8 @@ import grp import platform import pwd import signal +import tempfile +from distutils.dir_util import copy_tree def debug_signal(signum, frame): @@ -68,6 +70,14 @@ if insert_bin_path: path.insert(0, PORTAGE_BIN_PATH) os.environ["PATH"] = ":".join(path) +# Copy GPG test keys to temporary directory +gpg_path = tempfile.mkdtemp(prefix="gpg_") + +copy_tree(os.path.join(os.path.dirname(os.path.realpath(__file__)), ".gnupg"), gpg_path) + +os.chmod(gpg_path, 0o700) +os.environ["PORTAGE_GNUPGHOME"] = gpg_path + if __name__ == "__main__": try: sys.exit(tests.main()) diff --git a/repoman/setup.py b/repoman/setup.py index 17c9c5e8a..8231c8f75 100755 --- a/repoman/setup.py +++ b/repoman/setup.py @@ -13,7 +13,7 @@ try: from setuptools.command.install_scripts import install_scripts from setuptools.command.sdist import sdist from setuptools.dep_util import newer - from setuptools.dir_util import mkpath, remove_tree + from setuptools.dir_util import mkpath, remove_tree, copy_tree from setuptools.util import change_root, subst_vars except ImportError: from distutils.core import setup, Command @@ -26,7 +26,7 @@ except ImportError: from distutils.command.install_scripts import install_scripts from distutils.command.sdist import sdist from distutils.dep_util import newer - from distutils.dir_util import mkpath, remove_tree + from distutils.dir_util import mkpath, remove_tree, copy_tree from distutils.util import change_root, subst_vars import codecs @@ -425,6 +425,14 @@ class test(Command): def run(self): self.run_command("build_tests") + # copy GPG test keys + copy_tree( + os.path.join( + self.build_lib, "..", "..", "lib", "repoman", "tests", ".gnupg" + ), + os.path.join(self.build_lib, "repoman", "tests", ".gnupg"), + ) + os.chmod(os.path.join(self.build_lib, "repoman", "tests", ".gnupg"), 0o700) subprocess.check_call( [ sys.executable, @@ -14,7 +14,7 @@ try: from setuptools.command.install_scripts import install_scripts from setuptools.command.sdist import sdist from setuptools.dep_util import newer - from setuptools.dir_util import mkpath, remove_tree + from setuptools.dir_util import mkpath, remove_tree, copy_tree from setuptools.util import change_root, subst_vars except ImportError: from distutils.core import setup, Command, Extension @@ -28,7 +28,7 @@ except ImportError: from distutils.command.install_scripts import install_scripts from distutils.command.sdist import sdist from distutils.dep_util import newer - from distutils.dir_util import mkpath, remove_tree + from distutils.dir_util import mkpath, remove_tree, copy_tree from distutils.util import change_root, subst_vars import codecs @@ -700,6 +700,16 @@ class test(Command): def run(self): self.run_command("build_tests") + + # copy GPG test keys + copy_tree( + os.path.join( + self.build_lib, "..", "..", "lib", "portage", "tests", ".gnupg" + ), + os.path.join(self.build_lib, "portage", "tests", ".gnupg"), + ) + os.chmod(os.path.join(self.build_lib, "portage", "tests", ".gnupg"), 0o700) + subprocess.check_call( [ sys.executable, |