diff options
Diffstat (limited to 'cvs2svn_lib/common.py')
-rw-r--r-- | cvs2svn_lib/common.py | 409 |
1 files changed, 0 insertions, 409 deletions
diff --git a/cvs2svn_lib/common.py b/cvs2svn_lib/common.py deleted file mode 100644 index 8400907..0000000 --- a/cvs2svn_lib/common.py +++ /dev/null @@ -1,409 +0,0 @@ -# (Be in -*- python -*- mode.) -# -# ==================================================================== -# Copyright (c) 2000-2009 CollabNet. All rights reserved. -# -# This software is licensed as described in the file COPYING, which -# you should have received as part of this distribution. The terms -# are also available at http://subversion.tigris.org/license-1.html. -# If newer versions of this license are posted there, you may use a -# newer version instead, at your option. -# -# This software consists of voluntary contributions made by many -# individuals. For exact contribution history, see the revision -# history and logs, available at http://cvs2svn.tigris.org/. -# ==================================================================== - -"""This module contains common facilities used by cvs2svn.""" - - -import re -import time -import codecs - -from cvs2svn_lib.log import Log - - -# Always use these constants for opening databases. -DB_OPEN_READ = 'r' -DB_OPEN_WRITE = 'w' -DB_OPEN_NEW = 'n' - - -SVN_INVALID_REVNUM = -1 - - -# Warnings and errors start with these strings. They are typically -# followed by a colon and a space, as in "%s: " ==> "WARNING: ". -warning_prefix = "WARNING" -error_prefix = "ERROR" - - -class FatalException(Exception): - """Exception thrown on a non-recoverable error. - - If this exception is thrown by main(), it is caught by the global - layer of the program, its string representation is printed (followed - by a newline), and the program is ended with an exit code of 1.""" - - pass - - -class InternalError(Exception): - """Exception thrown in the case of a cvs2svn internal error (aka, bug).""" - - pass - - -class FatalError(FatalException): - """A FatalException that prepends error_prefix to the message.""" - - def __init__(self, msg): - """Use (error_prefix + ': ' + MSG) as the error message.""" - - FatalException.__init__(self, '%s: %s' % (error_prefix, msg,)) - - -class CommandError(FatalError): - """A FatalError caused by a failed command invocation. - - The error message includes the command name, exit code, and output.""" - - def __init__(self, command, exit_status, error_output=''): - self.command = command - self.exit_status = exit_status - self.error_output = error_output - if error_output.rstrip(): - FatalError.__init__( - self, - 'The command %r failed with exit status=%s\n' - 'and the following output:\n' - '%s' - % (self.command, self.exit_status, self.error_output.rstrip())) - else: - FatalError.__init__( - self, - 'The command %r failed with exit status=%s and no output' - % (self.command, self.exit_status)) - - -def path_join(*components): - """Join two or more pathname COMPONENTS, inserting '/' as needed. - Empty component are skipped.""" - - return '/'.join(filter(None, components)) - - -def path_split(path): - """Split the svn pathname PATH into a pair, (HEAD, TAIL). - - This is similar to os.path.split(), but always uses '/' as path - separator. PATH is an svn path, which should not start with a '/'. - HEAD is everything before the last slash, and TAIL is everything - after. If PATH ends in a slash, TAIL will be empty. If there is no - slash in PATH, HEAD will be empty. If PATH is empty, both HEAD and - TAIL are empty.""" - - pos = path.rfind('/') - if pos == -1: - return ('', path,) - else: - return (path[:pos], path[pos+1:],) - - -class IllegalSVNPathError(FatalException): - pass - - -# Control characters (characters not allowed in Subversion filenames): -ctrl_characters_regexp = re.compile('[\\\x00-\\\x1f\\\x7f]') - - -def verify_svn_filename_legal(filename): - """Verify that FILENAME is a legal filename. - - FILENAME is a path component of a CVS path. Check that it won't - choke SVN: - - - Check that it is not empty. - - - Check that it is not equal to '.' or '..'. - - - Check that the filename does not include any control characters. - - If any of these tests fail, raise an IllegalSVNPathError.""" - - if filename == '': - raise IllegalSVNPathError("Empty filename component.") - - if filename in ['.', '..']: - raise IllegalSVNPathError("Illegal filename component %r." % (filename,)) - - m = ctrl_characters_regexp.search(filename) - if m: - raise IllegalSVNPathError( - "Character %r in filename %r is not supported by Subversion." - % (m.group(), filename,) - ) - - -def verify_svn_path_legal(path): - """Verify that PATH is a legitimate SVN path. - - If not, raise an IllegalSVNPathError.""" - - if path.startswith('/'): - raise IllegalSVNPathError("Path %r must not start with '/'." % (path,)) - head = path - while head != '': - (head,tail) = path_split(head) - try: - verify_svn_filename_legal(tail) - except IllegalSVNPathError, e: - raise IllegalSVNPathError('Problem with path %r: %s' % (path, e,)) - - -def normalize_svn_path(path, allow_empty=False): - """Normalize an SVN path (e.g., one supplied by a user). - - 1. Strip leading, trailing, and duplicated '/'. - 2. If ALLOW_EMPTY is not set, verify that PATH is not empty. - - Return the normalized path. - - If the path is invalid, raise an IllegalSVNPathError.""" - - norm_path = path_join(*path.split('/')) - if not allow_empty and not norm_path: - raise IllegalSVNPathError("Path is empty") - return norm_path - - -class PathRepeatedException(Exception): - def __init__(self, path, count): - self.path = path - self.count = count - Exception.__init__( - self, 'Path %s is repeated %d times' % (self.path, self.count,) - ) - - -class PathsNestedException(Exception): - def __init__(self, nest, nestlings): - self.nest = nest - self.nestlings = nestlings - Exception.__init__( - self, - 'Path %s contains the following other paths: %s' - % (self.nest, ', '.join(self.nestlings),) - ) - - -class PathsNotDisjointException(FatalException): - """An exception that collects multiple other disjointness exceptions.""" - - def __init__(self, problems): - self.problems = problems - Exception.__init__( - self, - 'The following paths are not disjoint:\n' - ' %s\n' - % ('\n '.join([str(problem) for problem in self.problems]),) - ) - - -def verify_paths_disjoint(*paths): - """Verify that all of the paths in the argument list are disjoint. - - If any of the paths is nested in another one (i.e., in the sense - that 'a/b/c/d' is nested in 'a/b'), or any two paths are identical, - raise a PathsNotDisjointException containing exceptions detailing - the individual problems.""" - - def split(path): - if not path: - return [] - else: - return path.split('/') - - def contains(split_path1, split_path2): - """Return True iff SPLIT_PATH1 contains SPLIT_PATH2.""" - - return ( - len(split_path1) < len(split_path2) - and split_path2[:len(split_path1)] == split_path1 - ) - - paths = [(split(path), path) for path in paths] - # If all overlapping elements are equal, a shorter list is - # considered "less than" a longer one. Therefore if any paths are - # nested, this sort will leave at least one such pair adjacent, in - # the order [nest,nestling]. - paths.sort() - - problems = [] - - # Create exceptions for any repeated paths, and delete the repeats - # from the paths array: - i = 0 - while i < len(paths): - split_path, path = paths[i] - j = i + 1 - while j < len(paths) and split_path == paths[j][0]: - j += 1 - if j - i > 1: - problems.append(PathRepeatedException(path, j - i)) - # Delete all but the first copy: - del paths[i + 1:j] - i += 1 - - # Create exceptions for paths nested in each other: - i = 0 - while i < len(paths): - split_path, path = paths[i] - j = i + 1 - while j < len(paths) and contains(split_path, paths[j][0]): - j += 1 - if j - i > 1: - problems.append(PathsNestedException( - path, [path2 for (split_path2, path2) in paths[i + 1:j]] - )) - i += 1 - - if problems: - raise PathsNotDisjointException(problems) - - -def format_date(date): - """Return an svn-compatible date string for DATE (seconds since epoch). - - A Subversion date looks like '2002-09-29T14:44:59.000000Z'.""" - - return time.strftime("%Y-%m-%dT%H:%M:%S.000000Z", time.gmtime(date)) - - -class CVSTextDecoder: - """Callable that decodes CVS strings into Unicode.""" - - def __init__(self, encodings, fallback_encoding=None): - """Create a CVSTextDecoder instance. - - ENCODINGS is a list containing the names of encodings that are - attempted to be used as source encodings in 'strict' mode. - - FALLBACK_ENCODING, if specified, is the name of an encoding that - should be used as a source encoding in lossy 'replace' mode if all - of ENCODINGS failed. - - Raise LookupError if any of the specified encodings is unknown.""" - - self.decoders = [ - (encoding, codecs.lookup(encoding)[1]) - for encoding in encodings] - - if fallback_encoding is None: - self.fallback_decoder = None - else: - self.fallback_decoder = ( - fallback_encoding, codecs.lookup(fallback_encoding)[1] - ) - - def add_encoding(self, encoding): - """Add an encoding to be tried in 'strict' mode. - - ENCODING is the name of an encoding. If it is unknown, raise a - LookupError.""" - - for (name, decoder) in self.decoders: - if name == encoding: - return - else: - self.decoders.append( (encoding, codecs.lookup(encoding)[1]) ) - - def set_fallback_encoding(self, encoding): - """Set the fallback encoding, to be tried in 'replace' mode. - - ENCODING is the name of an encoding. If it is unknown, raise a - LookupError.""" - - if encoding is None: - self.fallback_decoder = None - else: - self.fallback_decoder = (encoding, codecs.lookup(encoding)[1]) - - def __call__(self, s): - """Try to decode string S using our configured source encodings. - - Return the string as a Unicode string. If S is already a unicode - string, do nothing. - - Raise UnicodeError if the string cannot be decoded using any of - the source encodings and no fallback encoding was specified.""" - - if isinstance(s, unicode): - return s - for (name, decoder) in self.decoders: - try: - return decoder(s)[0] - except ValueError: - Log().verbose("Encoding '%s' failed for string %r" % (name, s)) - - if self.fallback_decoder is not None: - (name, decoder) = self.fallback_decoder - return decoder(s, 'replace')[0] - else: - raise UnicodeError - - -class Timestamper: - """Return monotonic timestamps derived from changeset timestamps.""" - - def __init__(self): - # The last timestamp that has been returned: - self.timestamp = 0.0 - - # The maximum timestamp that is considered reasonable: - self.max_timestamp = time.time() + 24.0 * 60.0 * 60.0 - - def get(self, timestamp, change_expected): - """Return a reasonable timestamp derived from TIMESTAMP. - - Push TIMESTAMP into the future if necessary to ensure that it is - at least one second later than every other timestamp that has been - returned by previous calls to this method. - - If CHANGE_EXPECTED is not True, then log a message if the - timestamp has to be changed.""" - - if timestamp > self.max_timestamp: - # If a timestamp is in the future, it is assumed that it is - # bogus. Shift it backwards in time to prevent it forcing other - # timestamps to be pushed even further in the future. - - # Note that this is not nearly a complete solution to the bogus - # timestamp problem. A timestamp in the future still affects - # the ordering of changesets, and a changeset having such a - # timestamp will not be committed until all changesets with - # earlier timestamps have been committed, even if other - # changesets with even earlier timestamps depend on this one. - self.timestamp = self.timestamp + 1.0 - if not change_expected: - Log().warn( - 'Timestamp "%s" is in the future; changed to "%s".' - % (time.asctime(time.gmtime(timestamp)), - time.asctime(time.gmtime(self.timestamp)),) - ) - elif timestamp < self.timestamp + 1.0: - self.timestamp = self.timestamp + 1.0 - if not change_expected and Log().is_on(Log.VERBOSE): - Log().verbose( - 'Timestamp "%s" adjusted to "%s" to ensure monotonicity.' - % (time.asctime(time.gmtime(timestamp)), - time.asctime(time.gmtime(self.timestamp)),) - ) - else: - self.timestamp = timestamp - - return self.timestamp - - |