X-Git-Url: https://svn.cri.ensmp.fr/git/Plinn.git/blobdiff_plain/492bf0fb7e0204ec727c3639af6c5ea4006ef611..f53850f9b9d8071258091d66b4c781f2f4399d72:/utils.py diff --git a/utils.py b/utils.py index 863b94e..de68f22 100755 --- a/utils.py +++ b/utils.py @@ -27,6 +27,8 @@ import re from types import StringType from random import randrange from Acquisition import aq_base +from quopri import encodestring +from zope.globalrequest import getRequest from AccessControl.PermissionRole import rolesForPermissionOn from AccessControl import ModuleSecurityInfo from AccessControl import getSecurityManager @@ -49,234 +51,288 @@ security = ModuleSecurityInfo( 'Products.Plinn.utils' ) security.declarePublic('thisObjectComeFromPortalSkin') def thisObjectComeFromPortalSkin(ob, portal=None): - """ check if ob comes from portal_skins """ - if not portal : - portal = getToolByName(ob, 'portal_url') - portal = portal.getPortalObject() - - if ob.aq_self == portal.aq_self : - return False - - obId = ob.id - if callable(obId) : - obId = obId() - - sob = getattr(portal, obId, None) - - if sob is None : - return False - elif not(sob.aq_inner.aq_self is ob.aq_inner.aq_self) : - return False - else : - try : - portal._checkId(obId) - return True - except BadRequest : - return False + """ check if ob comes from portal_skins """ + if not portal : + portal = getToolByName(ob, 'portal_url') + portal = portal.getPortalObject() + + if ob.aq_self == portal.aq_self : + return False + + obId = ob.id + if callable(obId) : + obId = obId() + + sob = getattr(portal, obId, None) + + if sob is None : + return False + elif not(sob.aq_inner.aq_self is ob.aq_inner.aq_self) : + return False + else : + try : + portal._checkId(obId) + return True + except BadRequest : + return False security.declarePublic('listActionProviders_') def listActionProviders_(context) : - atool = getToolByName(context, 'portal_actions') - return atool.listActionProviders() + atool = getToolByName(context, 'portal_actions') + return atool.listActionProviders() def capitalizeCompoundGivenName(givenName) : - givenName = givenName.strip() - givenNames = ' '.join(givenName.split('-')).split() - givenNameCapitalized = '-'.join(map(string.capitalize, givenNames)) - return givenNameCapitalized - - + givenName = givenName.strip() + givenNames = ' '.join(givenName.split('-')).split() + givenNameCapitalized = '-'.join(map(string.capitalize, givenNames)) + return givenNameCapitalized + + def formatFullName(memberName, memberGivenName, memberId, nameBefore=1) : - memberName = memberName.decode('utf-8') - memberGivenName = memberGivenName.decode('utf-8') - memberFullName = u'' - if memberName and memberGivenName : - if nameBefore : - memberFullName = memberName.upper() + ' ' + capitalizeCompoundGivenName(memberGivenName) - else : - memberFullName = capitalizeCompoundGivenName(memberGivenName) + ' ' + memberName.upper() - - elif memberName and not memberGivenName : - memberFullName = memberName.upper() - - elif not memberName and memberGivenName : - memberFullName = capitalizeCompoundGivenName(memberGivenName) - - else : - memberFullName = memberId - - return memberFullName.encode('utf-8') + memberName = memberName.decode('utf-8') + memberGivenName = memberGivenName.decode('utf-8') + memberFullName = u'' + if memberName and memberGivenName : + if nameBefore : + memberFullName = memberName.capitalize() + ' ' + capitalizeCompoundGivenName(memberGivenName) + else : + memberFullName = capitalizeCompoundGivenName(memberGivenName) + ' ' + memberName.capitalize() + + elif memberName and not memberGivenName : + memberFullName = memberName.capitalize() + + elif not memberName and memberGivenName : + memberFullName = capitalizeCompoundGivenName(memberGivenName) + + else : + memberFullName = memberId + + return memberFullName.encode('utf-8') # from OFS.ObjectManager #63 bad_url_chars = re.compile(r'[^a-zA-Z0-9-_~,.$\(\)@]') security.declarePublic('makeValidId') def makeValidId(self, id, allow_dup=0): - id = Utf8Utils.desacc(id) - id = bad_url_chars.sub('-', id) - # If allow_dup is false, an error will be raised if an object - # with the given id already exists. If allow_dup is true, - # only check that the id string contains no illegal chars; - # check_valid_id() will be called again later with allow_dup - # set to false before the object is added. - - makeRandomId = False - if id in ('.', '..'): - makeRandomId = True - if id.startswith('_'): - id = id.lstrip('_') - if id.startswith('aq_'): - id = id[3:] - - while id.endswith('__') : - id = id[:-1] - if not allow_dup: - obj = getattr(self, id, None) - if obj is not None: - # An object by the given id exists either in this - # ObjectManager or in the acquisition path. - flags = getattr(obj, '__replaceable__', NOT_REPLACEABLE) - if hasattr(aq_base(self), id): - # The object is located in this ObjectManager. - if not flags & REPLACEABLE: - makeRandomId = True - # else the object is replaceable even if the UNIQUE - # flag is set. - elif flags & UNIQUE: - makeRandomId = True - if id == 'REQUEST': - makeRandomId = True - - if makeRandomId is True : - id = str(randrange(2,10000)) + id - return id - - + id = Utf8Utils.desacc(id) + id = bad_url_chars.sub('-', id) + # If allow_dup is false, an error will be raised if an object + # with the given id already exists. If allow_dup is true, + # only check that the id string contains no illegal chars; + # check_valid_id() will be called again later with allow_dup + # set to false before the object is added. + + makeRandomId = False + if id in ('.', '..'): + makeRandomId = True + if id.startswith('_'): + id = id.lstrip('_') + if id.startswith('aq_'): + id = id[3:] + + while id.endswith('__') : + id = id[:-1] + if not allow_dup: + obj = getattr(self, id, None) + if obj is not None: + # An object by the given id exists either in this + # ObjectManager or in the acquisition path. + flags = getattr(obj, '__replaceable__', NOT_REPLACEABLE) + if hasattr(aq_base(self), id): + # The object is located in this ObjectManager. + if not flags & REPLACEABLE: + makeRandomId = True + # else the object is replaceable even if the UNIQUE + # flag is set. + elif flags & UNIQUE: + makeRandomId = True + if id == 'REQUEST': + makeRandomId = True + + if makeRandomId is True : + id = str(randrange(2,10000)) + id + return id + + def _checkMemberPermission(userid, permission, obj, StringType = type('')): - user = obj.aq_inner.acl_users.getUser(userid) - roles = rolesForPermissionOn(permission, obj) - if type(roles) is StringType: - roles=[roles] - if user.allowed( obj, roles ): - return 1 - return 0 - + user = obj.aq_inner.acl_users.getUser(userid) + roles = rolesForPermissionOn(permission, obj) + if type(roles) is StringType: + roles=[roles] + if user.allowed( obj, roles ): + return 1 + return 0 + def getCPInfo(self) : - try: cp = _cb_decode(self.REQUEST['__cp']) - except: return None - return cp + if self.REQUEST.RESPONSE.cookies.has_key('__cp') : + cp = self.REQUEST.RESPONSE.cookies['__cp']['value'] + else : + cp = self.REQUEST.get('__cp') + try: cp = _cb_decode(cp) + except: return None + return cp def popCP(self, indexes=None) : - try: cp = _cb_decode(self.REQUEST['__cp']) - except: return - - paths = list(cp[1]) - if indexes is not None : - indexes = list(indexes) - indexes.sort() - indexes.reverse() - for index in indexes : - paths.pop(index) - else : - paths.pop() - - if not paths : - self.REQUEST.RESPONSE.expireCookie('__cp', path=self.REQUEST['BASEPATH1'] or "/") - else : - paths = tuple(paths) - cp = _cb_encode( (cp[0], paths) ) - resp = self.REQUEST['RESPONSE'] - resp.setCookie('__cp', cp, path='%s' % cookie_path(self.REQUEST)) + try: cp = _cb_decode(self.REQUEST['__cp']) + except: return + + paths = list(cp[1]) + if indexes is not None : + indexes = list(indexes) + indexes.sort() + indexes.reverse() + for index in indexes : + paths.pop(index) + else : + paths.pop() + + if not paths : + self.REQUEST.RESPONSE.expireCookie('__cp', path=self.REQUEST['BASEPATH1'] or "/") + else : + paths = tuple(paths) + cp = _cb_encode( (cp[0], paths) ) + resp = self.REQUEST['RESPONSE'] + resp.setCookie('__cp', cp, path='%s' % cookie_path(self.REQUEST)) security.declarePublic('Message') Message = MessageFactory('plinn') security.declarePublic('translate') -def translate(message, context): - """ Translate i18n message. - """ - if isinstance(message, Exception): - try: - message = message[0] - except (TypeError, IndexError): - pass - return i18ntranslate(message, domain='plinn', context=context.REQUEST) +def translate(message, context=None): + """ Translate i18n message. + """ + if isinstance(message, Exception): + try: + message = message[0] + except (TypeError, IndexError): + pass + if not context : + request = getRequest() + else : + request = context.REQUEST + return i18ntranslate(message, domain='plinn', context=request) security.declarePublic('desacc') desacc = Utf8Utils.desacc security.declarePublic('getPreferredLanguages') def getPreferredLanguages(context): - """ returns browser prefered languages""" - request = getattr(context, 'REQUEST', None) - if request is not None : - adapter = IUserPreferredLanguages(request, None) - if adapter is not None : - return adapter.getPreferredLanguages() - return [] + """ returns browser prefered languages""" + request = getattr(context, 'REQUEST', None) + if request is not None : + adapter = IUserPreferredLanguages(request, None) + if adapter is not None : + return adapter.getPreferredLanguages() + return [] security.declarePublic('getBestTranslationLanguage') def getBestTranslationLanguage(langs, context): - """ returns best translation language according - availables languages (param langs) - and user preferences (retrieves by context) - """ - request = getattr(context, 'REQUEST', None) - if request : - negociator = getUtilityByInterfaceName('zope.i18n.interfaces.INegotiator') - return negociator.getLanguage(langs, request) or langs[0] - else : - return langs[0] + """ returns best translation language according + to available languages (param langs) + and user preferences (retrieves by context) + """ + request = getattr(context, 'REQUEST', None) + if request : + negociator = getUtilityByInterfaceName('zope.i18n.interfaces.INegotiator') + return negociator.getLanguage(langs, request) or langs[0] + else : + return langs[0] security.declarePublic('getAdapterByInterface') def getAdapterByInterface(ob, dotted_name, default=_marker) : - """ Get the adapter which provides the interface on the given object. - """ - try: - iface = resolve_dotted_name(dotted_name) - except ImportError: - if default is _marker: - raise ComponentLookupError, dotted_name - return default + """ Get the adapter which provides the interface on the given object. + """ + try: + iface = resolve_dotted_name(dotted_name) + except ImportError: + if default is _marker: + raise ComponentLookupError, dotted_name + return default - adapter = queryAdapter(ob, iface, default=default) - if adapter is _marker : - raise ComponentLookupError, "no adapter providing %r found on %r" % (dotted_name, ob) + adapter = queryAdapter(ob, iface, default=default) + if adapter is _marker : + raise ComponentLookupError, "no adapter providing %r found on %r" % (dotted_name, ob) + + # the adapter must be wrapped to allow security mahinery to work. + if adapter != default : + return adapter.__of__(ob) + else : + return default + +security.declarePublic('encodeQuopriEmail') +def encodeQuopriEmail(name, email) : + qpName = encodestring(name).replace('=\n', '') + return '''"=?utf-8?q?%s?=" <%s>''' % (qpName, email) + +security.declarePublic('encodeMailHeader') +def encodeMailHeader(content) : + s = encodestring(content).replace('=\n', '') + s = s.replace('_', '=5F') + s = s.replace(' ', '_') + + lines = [] + STEP = 50 + start = 0 + stop = STEP + part = s[start:stop] + lines.append(part) + + while len(part) == STEP: + start = start + STEP + stop = stop + STEP + part = s[start:stop] + lines.append(part) + + lines = [' =?utf-8?Q?%s?=' % part for part in lines] + s = '\n'.join(lines) + s = s.strip() + return s - # the adapter must be wrapped to allow security mahinery to work. - if adapter != default : - return adapter.__of__(ob) - else : - return default def _sudo(func, userid=None) : - """ - execute func or any callable object - without restriction. Used to switch off - security assertions (eg. checkPermission) encountered - during the execution. - """ - - sm = getSecurityManager() - restrictedUser = sm.getUser() - - if not userid : - userid = restrictedUser.getId() - - sm._context.user = UnrestrictedUser(userid, '', (), ()) + """ + execute func or any callable object + without restriction. Used to switch off + security assertions (eg. checkPermission) encountered + during the execution. + """ + + sm = getSecurityManager() + restrictedUser = sm.getUser() + + if not userid : + userid = restrictedUser.getId() + + sm._context.user = UnrestrictedUser(userid, '', (), ()) + + deferedEx = None + try : + ret = func() + except Exception, e : + deferedEx = e + + sm._context.user = restrictedUser + + if deferedEx is not None : + raise e - deferedEx = None - try : - ret = func() - except Exception, e : - deferedEx = e - - sm._context.user = restrictedUser - - if deferedEx is not None : - raise e + return ret - return ret - \ No newline at end of file +security.declarePublic('searchContentsWithLocalRolesForAuthenticatedUser') +def searchContentsWithLocalRolesForAuthenticatedUser(**kw): + mtool = getUtilityByInterfaceName('Products.CMFCore.interfaces.IMembershipTool') + ctool = getUtilityByInterfaceName('Products.CMFCore.interfaces.ICatalogTool') + member = mtool.getAuthenticatedMember() + userid = member.getId() + userAndGroups = ['user:%s' % userid] + + getGroups = getattr(member, 'getGroups', None) + if getGroups is not None : + for group in getGroups(): + userAndGroups.append('user:'+group) + + kw[ 'allowedRolesAndUsers' ] = userAndGroups + + return ctool.unrestrictedSearchResults(**kw)