aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'cvs2svn_lib/common.py')
-rw-r--r--cvs2svn_lib/common.py409
1 files changed, 0 insertions, 409 deletions
diff --git a/cvs2svn_lib/common.py b/cvs2svn_lib/common.py
deleted file mode 100644
index 8400907..0000000
--- a/cvs2svn_lib/common.py
+++ /dev/null
@@ -1,409 +0,0 @@
-# (Be in -*- python -*- mode.)
-#
-# ====================================================================
-# Copyright (c) 2000-2009 CollabNet. All rights reserved.
-#
-# This software is licensed as described in the file COPYING, which
-# you should have received as part of this distribution. The terms
-# are also available at http://subversion.tigris.org/license-1.html.
-# If newer versions of this license are posted there, you may use a
-# newer version instead, at your option.
-#
-# This software consists of voluntary contributions made by many
-# individuals. For exact contribution history, see the revision
-# history and logs, available at http://cvs2svn.tigris.org/.
-# ====================================================================
-
-"""This module contains common facilities used by cvs2svn."""
-
-
-import re
-import time
-import codecs
-
-from cvs2svn_lib.log import Log
-
-
-# Always use these constants for opening databases.
-DB_OPEN_READ = 'r'
-DB_OPEN_WRITE = 'w'
-DB_OPEN_NEW = 'n'
-
-
-SVN_INVALID_REVNUM = -1
-
-
-# Warnings and errors start with these strings. They are typically
-# followed by a colon and a space, as in "%s: " ==> "WARNING: ".
-warning_prefix = "WARNING"
-error_prefix = "ERROR"
-
-
-class FatalException(Exception):
- """Exception thrown on a non-recoverable error.
-
- If this exception is thrown by main(), it is caught by the global
- layer of the program, its string representation is printed (followed
- by a newline), and the program is ended with an exit code of 1."""
-
- pass
-
-
-class InternalError(Exception):
- """Exception thrown in the case of a cvs2svn internal error (aka, bug)."""
-
- pass
-
-
-class FatalError(FatalException):
- """A FatalException that prepends error_prefix to the message."""
-
- def __init__(self, msg):
- """Use (error_prefix + ': ' + MSG) as the error message."""
-
- FatalException.__init__(self, '%s: %s' % (error_prefix, msg,))
-
-
-class CommandError(FatalError):
- """A FatalError caused by a failed command invocation.
-
- The error message includes the command name, exit code, and output."""
-
- def __init__(self, command, exit_status, error_output=''):
- self.command = command
- self.exit_status = exit_status
- self.error_output = error_output
- if error_output.rstrip():
- FatalError.__init__(
- self,
- 'The command %r failed with exit status=%s\n'
- 'and the following output:\n'
- '%s'
- % (self.command, self.exit_status, self.error_output.rstrip()))
- else:
- FatalError.__init__(
- self,
- 'The command %r failed with exit status=%s and no output'
- % (self.command, self.exit_status))
-
-
-def path_join(*components):
- """Join two or more pathname COMPONENTS, inserting '/' as needed.
- Empty component are skipped."""
-
- return '/'.join(filter(None, components))
-
-
-def path_split(path):
- """Split the svn pathname PATH into a pair, (HEAD, TAIL).
-
- This is similar to os.path.split(), but always uses '/' as path
- separator. PATH is an svn path, which should not start with a '/'.
- HEAD is everything before the last slash, and TAIL is everything
- after. If PATH ends in a slash, TAIL will be empty. If there is no
- slash in PATH, HEAD will be empty. If PATH is empty, both HEAD and
- TAIL are empty."""
-
- pos = path.rfind('/')
- if pos == -1:
- return ('', path,)
- else:
- return (path[:pos], path[pos+1:],)
-
-
-class IllegalSVNPathError(FatalException):
- pass
-
-
-# Control characters (characters not allowed in Subversion filenames):
-ctrl_characters_regexp = re.compile('[\\\x00-\\\x1f\\\x7f]')
-
-
-def verify_svn_filename_legal(filename):
- """Verify that FILENAME is a legal filename.
-
- FILENAME is a path component of a CVS path. Check that it won't
- choke SVN:
-
- - Check that it is not empty.
-
- - Check that it is not equal to '.' or '..'.
-
- - Check that the filename does not include any control characters.
-
- If any of these tests fail, raise an IllegalSVNPathError."""
-
- if filename == '':
- raise IllegalSVNPathError("Empty filename component.")
-
- if filename in ['.', '..']:
- raise IllegalSVNPathError("Illegal filename component %r." % (filename,))
-
- m = ctrl_characters_regexp.search(filename)
- if m:
- raise IllegalSVNPathError(
- "Character %r in filename %r is not supported by Subversion."
- % (m.group(), filename,)
- )
-
-
-def verify_svn_path_legal(path):
- """Verify that PATH is a legitimate SVN path.
-
- If not, raise an IllegalSVNPathError."""
-
- if path.startswith('/'):
- raise IllegalSVNPathError("Path %r must not start with '/'." % (path,))
- head = path
- while head != '':
- (head,tail) = path_split(head)
- try:
- verify_svn_filename_legal(tail)
- except IllegalSVNPathError, e:
- raise IllegalSVNPathError('Problem with path %r: %s' % (path, e,))
-
-
-def normalize_svn_path(path, allow_empty=False):
- """Normalize an SVN path (e.g., one supplied by a user).
-
- 1. Strip leading, trailing, and duplicated '/'.
- 2. If ALLOW_EMPTY is not set, verify that PATH is not empty.
-
- Return the normalized path.
-
- If the path is invalid, raise an IllegalSVNPathError."""
-
- norm_path = path_join(*path.split('/'))
- if not allow_empty and not norm_path:
- raise IllegalSVNPathError("Path is empty")
- return norm_path
-
-
-class PathRepeatedException(Exception):
- def __init__(self, path, count):
- self.path = path
- self.count = count
- Exception.__init__(
- self, 'Path %s is repeated %d times' % (self.path, self.count,)
- )
-
-
-class PathsNestedException(Exception):
- def __init__(self, nest, nestlings):
- self.nest = nest
- self.nestlings = nestlings
- Exception.__init__(
- self,
- 'Path %s contains the following other paths: %s'
- % (self.nest, ', '.join(self.nestlings),)
- )
-
-
-class PathsNotDisjointException(FatalException):
- """An exception that collects multiple other disjointness exceptions."""
-
- def __init__(self, problems):
- self.problems = problems
- Exception.__init__(
- self,
- 'The following paths are not disjoint:\n'
- ' %s\n'
- % ('\n '.join([str(problem) for problem in self.problems]),)
- )
-
-
-def verify_paths_disjoint(*paths):
- """Verify that all of the paths in the argument list are disjoint.
-
- If any of the paths is nested in another one (i.e., in the sense
- that 'a/b/c/d' is nested in 'a/b'), or any two paths are identical,
- raise a PathsNotDisjointException containing exceptions detailing
- the individual problems."""
-
- def split(path):
- if not path:
- return []
- else:
- return path.split('/')
-
- def contains(split_path1, split_path2):
- """Return True iff SPLIT_PATH1 contains SPLIT_PATH2."""
-
- return (
- len(split_path1) < len(split_path2)
- and split_path2[:len(split_path1)] == split_path1
- )
-
- paths = [(split(path), path) for path in paths]
- # If all overlapping elements are equal, a shorter list is
- # considered "less than" a longer one. Therefore if any paths are
- # nested, this sort will leave at least one such pair adjacent, in
- # the order [nest,nestling].
- paths.sort()
-
- problems = []
-
- # Create exceptions for any repeated paths, and delete the repeats
- # from the paths array:
- i = 0
- while i < len(paths):
- split_path, path = paths[i]
- j = i + 1
- while j < len(paths) and split_path == paths[j][0]:
- j += 1
- if j - i > 1:
- problems.append(PathRepeatedException(path, j - i))
- # Delete all but the first copy:
- del paths[i + 1:j]
- i += 1
-
- # Create exceptions for paths nested in each other:
- i = 0
- while i < len(paths):
- split_path, path = paths[i]
- j = i + 1
- while j < len(paths) and contains(split_path, paths[j][0]):
- j += 1
- if j - i > 1:
- problems.append(PathsNestedException(
- path, [path2 for (split_path2, path2) in paths[i + 1:j]]
- ))
- i += 1
-
- if problems:
- raise PathsNotDisjointException(problems)
-
-
-def format_date(date):
- """Return an svn-compatible date string for DATE (seconds since epoch).
-
- A Subversion date looks like '2002-09-29T14:44:59.000000Z'."""
-
- return time.strftime("%Y-%m-%dT%H:%M:%S.000000Z", time.gmtime(date))
-
-
-class CVSTextDecoder:
- """Callable that decodes CVS strings into Unicode."""
-
- def __init__(self, encodings, fallback_encoding=None):
- """Create a CVSTextDecoder instance.
-
- ENCODINGS is a list containing the names of encodings that are
- attempted to be used as source encodings in 'strict' mode.
-
- FALLBACK_ENCODING, if specified, is the name of an encoding that
- should be used as a source encoding in lossy 'replace' mode if all
- of ENCODINGS failed.
-
- Raise LookupError if any of the specified encodings is unknown."""
-
- self.decoders = [
- (encoding, codecs.lookup(encoding)[1])
- for encoding in encodings]
-
- if fallback_encoding is None:
- self.fallback_decoder = None
- else:
- self.fallback_decoder = (
- fallback_encoding, codecs.lookup(fallback_encoding)[1]
- )
-
- def add_encoding(self, encoding):
- """Add an encoding to be tried in 'strict' mode.
-
- ENCODING is the name of an encoding. If it is unknown, raise a
- LookupError."""
-
- for (name, decoder) in self.decoders:
- if name == encoding:
- return
- else:
- self.decoders.append( (encoding, codecs.lookup(encoding)[1]) )
-
- def set_fallback_encoding(self, encoding):
- """Set the fallback encoding, to be tried in 'replace' mode.
-
- ENCODING is the name of an encoding. If it is unknown, raise a
- LookupError."""
-
- if encoding is None:
- self.fallback_decoder = None
- else:
- self.fallback_decoder = (encoding, codecs.lookup(encoding)[1])
-
- def __call__(self, s):
- """Try to decode string S using our configured source encodings.
-
- Return the string as a Unicode string. If S is already a unicode
- string, do nothing.
-
- Raise UnicodeError if the string cannot be decoded using any of
- the source encodings and no fallback encoding was specified."""
-
- if isinstance(s, unicode):
- return s
- for (name, decoder) in self.decoders:
- try:
- return decoder(s)[0]
- except ValueError:
- Log().verbose("Encoding '%s' failed for string %r" % (name, s))
-
- if self.fallback_decoder is not None:
- (name, decoder) = self.fallback_decoder
- return decoder(s, 'replace')[0]
- else:
- raise UnicodeError
-
-
-class Timestamper:
- """Return monotonic timestamps derived from changeset timestamps."""
-
- def __init__(self):
- # The last timestamp that has been returned:
- self.timestamp = 0.0
-
- # The maximum timestamp that is considered reasonable:
- self.max_timestamp = time.time() + 24.0 * 60.0 * 60.0
-
- def get(self, timestamp, change_expected):
- """Return a reasonable timestamp derived from TIMESTAMP.
-
- Push TIMESTAMP into the future if necessary to ensure that it is
- at least one second later than every other timestamp that has been
- returned by previous calls to this method.
-
- If CHANGE_EXPECTED is not True, then log a message if the
- timestamp has to be changed."""
-
- if timestamp > self.max_timestamp:
- # If a timestamp is in the future, it is assumed that it is
- # bogus. Shift it backwards in time to prevent it forcing other
- # timestamps to be pushed even further in the future.
-
- # Note that this is not nearly a complete solution to the bogus
- # timestamp problem. A timestamp in the future still affects
- # the ordering of changesets, and a changeset having such a
- # timestamp will not be committed until all changesets with
- # earlier timestamps have been committed, even if other
- # changesets with even earlier timestamps depend on this one.
- self.timestamp = self.timestamp + 1.0
- if not change_expected:
- Log().warn(
- 'Timestamp "%s" is in the future; changed to "%s".'
- % (time.asctime(time.gmtime(timestamp)),
- time.asctime(time.gmtime(self.timestamp)),)
- )
- elif timestamp < self.timestamp + 1.0:
- self.timestamp = self.timestamp + 1.0
- if not change_expected and Log().is_on(Log.VERBOSE):
- Log().verbose(
- 'Timestamp "%s" adjusted to "%s" to ensure monotonicity.'
- % (time.asctime(time.gmtime(timestamp)),
- time.asctime(time.gmtime(self.timestamp)),)
- )
- else:
- self.timestamp = timestamp
-
- return self.timestamp
-
-