diff options
Diffstat (limited to 'pypy/objspace/std/multimethod.py')
-rw-r--r-- | pypy/objspace/std/multimethod.py | 507 |
1 files changed, 0 insertions, 507 deletions
diff --git a/pypy/objspace/std/multimethod.py b/pypy/objspace/std/multimethod.py deleted file mode 100644 index 04a35e1e05..0000000000 --- a/pypy/objspace/std/multimethod.py +++ /dev/null @@ -1,507 +0,0 @@ -from pypy.interpreter.baseobjspace import OperationError -from pypy.tool.cache import Cache - -class FailedToImplement(Exception): - "Signals the dispatcher to try harder." - -class W_ANY: - "Catch-all in case multimethods don't find anything else." - typedef = None - - -# This file defines three major classes: -# -# MultiMethod is the class you instantiate explicitly. -# It is essentially just a collection of registered functions. -# If xxx is a MultiMethod, StdObjSpace.xxx is xxx again, -# and space.xxx is a BoundMultiMethod. -# -# UnboundMultiMethod is a MultiMethod on which one argument -# (typically the first one) has been restricted to be of some -# statically known type. It is obtained by the syntax -# W_XxxType.yyy, where W_XxxType is the static type class, -# or explicitly by calling 'xxx.slice(typeclass, arg_position)'. -# Its dispatch table is always a subset of the original MultiMethod's -# dispatch table. -# -# The registration of a new function to a MultiMethod will be propagated -# to all of its matching UnboundMultiMethod instances. The registration of -# a function directly to an UnboundMultiMethod will register the function -# to its base MultiMethod and invoke the same behavior. -# -# BoundMultiMethod is a MultiMethod or UnboundMultiMethod which -# has been bound to a specific object space. It is obtained by -# 'space.xxx' or explicitly by calling 'xxx.get(space)'. - -class AbstractMultiMethod(object): - """Abstract base class for MultiMethod and UnboundMultiMethod - i.e. the classes that are not bound to a specific space instance.""" - - def __init__(self, operatorsymbol, arity): - "NOT_RPYTHON: cannot create new multimethods dynamically" - self.arity = arity - self.operatorsymbol = operatorsymbol - self.dispatch_table = {} - self.cache_table = Cache() - self.compilation_cache_table = {} - self.cache_delegator_key = None - self.dispatch_arity = 0 - - def __repr__(self): - return '<%s %s>' % (self.__class__.__name__, self.operatorsymbol) - - def register(self, function, *types): - """NOT_RPYTHON: cannot register new multimethod - implementations dynamically""" - assert len(types) == self.arity - functions = self.dispatch_table.setdefault(types, []) - if function in functions: - return False - functions.append(function) - self.cache_table.clear() - self.compilation_cache_table.clear() - self.adjust_dispatch_arity(types) - return True - - def adjust_dispatch_arity(self, types): - "NOT_RPYTHON" - width = len(types) - while width > self.dispatch_arity and types[width-1] is W_ANY: - width -= 1 - self.dispatch_arity = width - - def compile_calllist(self, argclasses, delegate): - """Compile a list of calls to try for the given classes of the - arguments. Return a function that should be called with the - actual arguments.""" - if delegate.key is not self.cache_delegator_key: - self.cache_table.clear() - self.compilation_cache_table.clear() - self.cache_delegator_key = delegate.key - return self.cache_table.getorbuild(argclasses, - self.do_compile_calllist, - delegate) - - def do_compile_calllist(self, argclasses, delegate): - "NOT_RPYTHON" - calllist = [] - self.internal_buildcalllist(argclasses, delegate, calllist) - calllist = tuple(calllist) - try: - result = self.compilation_cache_table[calllist] - except KeyError: - result = self.internal_compilecalllist(calllist) - self.compilation_cache_table[calllist] = result - return result - - def internal_compilecalllist(self, calllist): - "NOT_RPYTHON" - source, glob = self.internal_sourcecalllist(calllist) - # compile the function - exec source in glob - return glob['do'] - - def internal_sourcecalllist(self, calllist): - """NOT_RPYTHON - Translate a call list into the source of a Python function - which is optimized and doesn't do repeated conversions on the - same arguments.""" - if len(calllist) == 1: - fn, conversions = calllist[0] - if conversions == ((),) * len(conversions): - # no conversion, just calling a single function: return - # that function directly - return '', {'do': fn} - - #print '**** compile **** ', self.operatorsymbol, [ - # t.__name__ for t in argclasses] - arglist = ['a%d'%i for i in range(self.dispatch_arity)] + ['*extraargs'] - source = ['def do(space,%s):' % ','.join(arglist)] - converted = [{(): ('a%d'%i, False)} for i in range(self.dispatch_arity)] - - def make_conversion(argi, convtuple): - if convtuple in converted[argi]: - return converted[argi][convtuple] - else: - prev, can_fail = make_conversion(argi, convtuple[:-1]) - new = '%s_%d' % (prev, len(converted[argi])) - fname = all_functions.setdefault(convtuple[-1], - 'd%d' % len(all_functions)) - if can_fail: - source.append(' if isinstance(%s, FailedToImplement):' % prev) - source.append(' %s = %s' % (new, prev)) - source.append(' else:') - indent = ' ' - else: - indent = ' ' - source.append('%s%s = %s(space,%s)' % (indent, new, fname, prev)) - can_fail = can_fail or getattr(convtuple[-1], 'can_fail', False) - converted[argi][convtuple] = new, can_fail - return new, can_fail - - all_functions = {} - has_firstfailure = False - for fn, conversions in calllist: - # make the required conversions - fname = all_functions.setdefault(fn, 'f%d' % len(all_functions)) - arglist = [] - failcheck = [] - for i in range(self.dispatch_arity): - argname, can_fail = make_conversion(i, conversions[i]) - arglist.append(argname) - if can_fail: - failcheck.append(argname) - arglist.append('*extraargs') - source.append( ' try:') - for argname in failcheck: - source.append(' if isinstance(%s, FailedToImplement):' % argname) - source.append(' raise %s' % argname) - source.append( ' return %s(space,%s)' % ( - fname, ','.join(arglist))) - if has_firstfailure: - source.append(' except FailedToImplement:') - else: - source.append(' except FailedToImplement, firstfailure:') - has_firstfailure = True - source.append( ' pass') - - # complete exhaustion - if has_firstfailure: - source.append(' raise firstfailure') - else: - source.append(' raise FailedToImplement()') - source.append('') - - glob = {'FailedToImplement': FailedToImplement} - for fn, fname in all_functions.items(): - glob[fname] = fn - return '\n'.join(source), glob - - def internal_buildcalllist(self, argclasses, delegate, calllist): - """NOT_RPYTHON - Build a list of calls to try for the given classes of the - arguments. The list contains 'calls' of the following form: - (function-to-call, list-of-list-of-converters) - The list of converters contains a list of converter functions per - argument, with [] meaning that no conversion is needed for - that argument.""" - # look for an exact match first - arity = self.dispatch_arity - assert arity == len(argclasses) - dispatchclasses = tuple([(c,) for c in argclasses]) - choicelist = self.buildchoices(dispatchclasses) - seen_functions = {} - no_conversion = [()] * arity - for signature, function in choicelist: - calllist.append((function, tuple(no_conversion))) - seen_functions[function] = 1 - - # proceed by expanding the last argument by delegation, step by step - # until no longer possible, and then the previous argument, and so on. - expanded_args = () - expanded_dispcls = () - - for argi in range(arity-1, -1, -1): - # growing tuple of dispatch classes we can delegate to - curdispcls = dispatchclasses[argi] - assert len(curdispcls) == 1 - # maps each dispatch class to a list of converters - curargs = {curdispcls[0]: []} - # reduce dispatchclasses to the arguments before this one - # (on which no delegation has been tried yet) - dispatchclasses = dispatchclasses[:argi] - no_conversion = no_conversion[:argi] - - while 1: - choicelist = delegate.buildchoices((curdispcls,)) - # the list is sorted by priority - progress = False - for (t,), function in choicelist: - if function is None: - # this marks a decrease in the priority. - # Don't try delegators with lower priority if - # we have already progressed. - if progress: - break - else: - assert hasattr(function, 'result_class'), ( - "delegator %r must have a result_class" % function) - nt = function.result_class - if nt not in curargs: - curdispcls += (nt,) - srcconvs = curargs[t] - if not getattr(function, 'trivial_delegation',False): - srcconvs = srcconvs + [function] - curargs[nt] = srcconvs - progress = True - else: - if not progress: - break # no progress, and delegators list exhausted - - # progress: try again to dispatch with this new set of types - choicelist = self.buildchoices( - dispatchclasses + (curdispcls,) + expanded_dispcls) - for signature, function in choicelist: - if function not in seen_functions: - seen_functions[function] = 1 - # collect arguments: arguments after position argi... - after_argi = [tuple(expanded_args[j][signature[j]]) - for j in range(argi+1-arity, 0)] # nb. j<0 - # collect arguments: argument argi... - arg_argi = tuple(curargs[signature[argi]]) - # collect all arguments - newargs = no_conversion + [arg_argi] + after_argi - # record the call - calllist.append((function, tuple(newargs))) - # end of while 1: try on delegating the same argument i - - # proceed to the next argument - expanded_args = (curargs,) + expanded_args - expanded_dispcls = (curdispcls,) + expanded_dispcls - - def buildchoices(self, allowedtypes): - """NOT_RPYTHON - Build a list of all possible implementations we can dispatch to, - sorted best-first, ignoring delegation.""" - # 'types' is a tuple of tuples of classes, one tuple of classes per - # argument. (After delegation, we need to call buildchoice() with - # more than one possible class for a single argument.) - result = [] - self.internal_buildchoices(allowedtypes, (), result) - self.postprocessresult(allowedtypes, result) - #print self.operatorsymbol, allowedtypes, result - # the result is a list a tuples (function, signature). - return result - - def postprocessresult(self, allowedtypes, result): - "NOT_RPYTHON" - - def internal_buildchoices(self, initialtypes, currenttypes, result): - "NOT_RPYTHON" - if len(currenttypes) == self.dispatch_arity: - currenttypes += (W_ANY,) * (self.arity - self.dispatch_arity) - for func in self.dispatch_table.get(currenttypes, []): - if func not in result: # ignore duplicates - result.append((currenttypes, func)) - else: - classtuple = initialtypes[len(currenttypes)] - for nexttype in classtuple: - self.internal_buildchoices(initialtypes, - currenttypes + (nexttype,), - result) - - def is_empty(self): - return not self.dispatch_table - - -class MultiMethod(AbstractMultiMethod): - - def __init__(self, operatorsymbol, arity, specialnames=None, **extras): - """NOT_RPYTHON: cannot create new multimethods dynamically. - MultiMethod dispatching on the first 'arity' arguments. - """ - AbstractMultiMethod.__init__(self, operatorsymbol, arity) - if arity < 1: - raise ValueError, "multimethods cannot dispatch on nothing" - if specialnames is None: - specialnames = [operatorsymbol] - self.specialnames = specialnames # e.g. ['__xxx__', '__rxxx__'] - self.extras = extras - self.unbound_versions = {} - - - def __get__(self, space, cls=None): - if space is None: - return self - else: - return BoundMultiMethod(space, self) - - get = __get__ - - def slice(self, typeclass, bound_position=0): - "NOT_RPYTHON" - try: - return self.unbound_versions[typeclass, bound_position] - except KeyError: - m = UnboundMultiMethod(self, typeclass, bound_position) - self.unbound_versions[typeclass, bound_position] = m - return m - - def register(self, function, *types): - """NOT_RPYTHON: cannot register new multimethod - implementations dynamically""" - if not AbstractMultiMethod.register(self, function, *types): - return False - # register the function into unbound versions that match - for m in self.unbound_versions.values(): - if m.match(types): - AbstractMultiMethod.register(m, function, *types) - return True - - -class DelegateMultiMethod(MultiMethod): - - def __init__(self): - "NOT_RPYTHON: cannot create new multimethods dynamically." - MultiMethod.__init__(self, 'delegate', 1, []) - self.key = object() - - def register(self, function, *types): - """NOT_RPYTHON: cannot register new multimethod - implementations dynamically""" - if not AbstractMultiMethod.register(self, function, *types): - return False - self.key = object() # change the key to force recomputation - return True - - def postprocessresult(self, allowedtypes, result): - "NOT_RPYTHON" - # add delegation from a class to the *first* immediate parent class - # and to W_ANY - arg1types, = allowedtypes - for t in arg1types: - if t.__bases__: - base = t.__bases__[0] - def delegate_to_parent_class(space, a): - return a - delegate_to_parent_class.trivial_delegation = True - delegate_to_parent_class.result_class = base - delegate_to_parent_class.priority = 0 - # hard-wire it at priority 0 - result.append(((t,), delegate_to_parent_class)) - - def delegate_to_any(space, a): - return a - delegate_to_any.trivial_delegation = True - delegate_to_any.result_class = W_ANY - delegate_to_any.priority = -999 - # hard-wire it at priority -999 - result.append(((t,), delegate_to_any)) - - # sort the results in priority order, and insert None marks - # between jumps in the priority values. Higher priority values - # first. - by_priority = {} # classify delegators by priority - for signature, function in result: - assert hasattr(function, 'priority'), ( - "delegator %r must have a priority" % function) - sublist = by_priority.setdefault(function.priority, []) - sublist.append((signature, function)) - delegators = by_priority.items() - delegators.sort() - delegators.reverse() - del result[:] - for priority, sublist in delegators: - if result: - result.append(((None,), None)) - result += sublist - - -class UnboundMultiMethod(AbstractMultiMethod): - - def __init__(self, basemultimethod, typeclass, bound_position=0): - "NOT_RPYTHON: cannot create new multimethods dynamically." - AbstractMultiMethod.__init__(self, - basemultimethod.operatorsymbol, - basemultimethod.arity) - self.basemultimethod = basemultimethod - self.typeclass = typeclass - self.bound_position = bound_position - # get all the already-registered matching functions from parent - for types, functions in basemultimethod.dispatch_table.iteritems(): - if self.match(types): - self.dispatch_table[types] = functions - self.adjust_dispatch_arity(types) - #print basemultimethod.operatorsymbol, typeclass, self.dispatch_table - - def register(self, function, *types): - """NOT_RPYTHON: cannot register new multimethod - implementations dynamically""" - if not AbstractMultiMethod.register(self, function, *types): - return False - # propagate the function registeration to the base multimethod - # and possibly other UnboundMultiMethods - self.basemultimethod.register(function, *types) - return True - - def match(self, types): - "NOT_RPYTHON" - # check if the 'types' signature statically corresponds to the - # restriction of the present UnboundMultiMethod. - # Only accept an exact match; having merely subclass should - # be taken care of by the general look-up rules. - t = types[self.bound_position].typedef - return t is self.typeclass or ( - getattr(t, 'could_also_match', None) is self.typeclass) - - def __get__(self, space, cls=None): - if space is None: - return self - else: - return BoundMultiMethod(space, self) - - get = __get__ - - -class BoundMultiMethod: - ASSERT_BASE_TYPE = object - - def __init__(self, space, multimethod): - self.space = space - self.multimethod = multimethod - - def __eq__(self, other): - return (self.__class__ == other.__class__ and - self.space == other.space and - self.multimethod == other.multimethod) - - def __ne__(self, other): - return self != other - - def __hash__(self): - return hash((self.__class__, self.space, self.multimethod)) - - def __call__(self, *args): - if len(args) < self.multimethod.arity: - raise TypeError, ("multimethod needs at least %d arguments" % - self.multimethod.arity) - try: - return self.perform_call(*args) - except FailedToImplement, e: - if e.args: - raise OperationError(e.args[0], e.args[1]) - else: - # raise a TypeError for a FailedToImplement - initialtypes = [a.__class__ - for a in args[:self.multimethod.dispatch_arity]] - if len(initialtypes) <= 1: - plural = "" - else: - plural = "s" - debugtypenames = [t.__name__ for t in initialtypes] - message = "unsupported operand type%s for %s (%s)" % ( - plural, self.multimethod.operatorsymbol, - ', '.join(debugtypenames)) - w_value = self.space.wrap(message) - raise OperationError(self.space.w_TypeError, w_value) - - def perform_call(self, *args): - for a in args: - assert isinstance(a, self.ASSERT_BASE_TYPE), ( - "'%s' multimethod got a non-wrapped argument: %r" % ( - self.multimethod.operatorsymbol, a)) - arity = self.multimethod.dispatch_arity - argclasses = tuple([a.__class__ for a in args[:arity]]) - delegate = self.space.delegate.multimethod - fn = self.multimethod.compile_calllist(argclasses, delegate) - return fn(self.space, *args) - - def is_empty(self): - return self.multimethod.is_empty() - - def __repr__(self): - return '<Bound %r>'%(self.multimethod,) - - -class error(Exception): - "Thrown to you when you do something wrong with multimethods." |