Ménage.
[Plinn.git] / Products / Plinn / catalog.py
1 # -*- coding: utf-8 -*-
2 from App.class_init import InitializeClass
3 from AccessControl import ClassSecurityInfo
4 from Products.CMFCore.interfaces import IIndexableObject
5 from Products.CMFCore.CatalogTool import CatalogTool as BaseCatalogTool
6 from Products.CMFCore.CatalogTool import IndexableObjectWrapper
7 from Products.PageTemplates.PageTemplateFile import PageTemplateFile
8 from Products.CMFCore.permissions import ModifyPortalContent, ManagePortal
9 from zope.component import queryMultiAdapter
10 from Products.ZCatalog.Catalog import Catalog
11 import transaction
12 from solr import *
13
14 # imports for Catalog class
15 from Products.PluginIndexes.interfaces import ILimitedResultIndex
16 from Products.ZCatalog.Lazy import LazyMap, LazyCat, LazyValues
17 from BTrees.IIBTree import intersection, IISet
18 from BTrees.IIBTree import weightedIntersection
19 import warnings
20
21 _VOLATILE_SOLR_NAME = '_v_solrConnection'
22
23 class SolrTransactionHook :
24 ''' commit solr couplé sur le commit de la ZODB '''
25 def __init__(self, context, con) :
26 self.context = context
27 self.con = con
28
29 def __call__(self, status) :
30 if status :
31 self.con.commit()
32 self.con.close()
33 else :
34 self.con.close()
35 try :
36 delattr(self.context, _VOLATILE_SOLR_NAME)
37 except AttributeError :
38 pass
39
40 class CatalogTool(BaseCatalogTool) :
41 meta_type = 'Plinn Catalog'
42 security = ClassSecurityInfo()
43 manage_options = (BaseCatalogTool.manage_options[:5] +
44 ({'label' : 'Solr', 'action' : 'manage_solr'},) +
45 BaseCatalogTool.manage_options[5:])
46 manage_solr = PageTemplateFile('www/manage_solr.pt', globals(), __name__='manage_solr')
47
48
49
50 def __init__(self, idxs=[]) :
51 super(CatalogTool, self).__init__()
52 self._catalog = DelegatedCatalog(self)
53 self.solr_url = 'http://localhost:8983/solr'
54 self.delegatedIndexes = ('Title', 'Description', 'SearchableText')
55
56 security.declarePublic('getDelegatedIndexes')
57 def getDelegatedIndexes(self) :
58 """ read the method name """
59 return self.delegatedIndexes
60
61 security.declareProtected(ManagePortal, 'setSolrProperties')
62 def setSolrProperties(self, url, indexes, REQUEST=None) :
63 """ set Solr server url and delegated indexes """
64 self.solr_url = url
65 self.delegatedIndexes = tuple([i.strip() for i in indexes if i.strip()])
66 if REQUEST :
67 REQUEST.RESPONSE.redirect(self.absolute_url() + '/manage_solr?manage_tabs_message=Saved changes.')
68
69 def _getSolrConnection(self) :
70 if not hasattr(self, _VOLATILE_SOLR_NAME) :
71 con = SolrConnection(self.solr_url)
72 setattr(self, _VOLATILE_SOLR_NAME, con)
73 txn = transaction.get()
74 txn.addAfterCommitHook(SolrTransactionHook(self, con))
75 return getattr(self, _VOLATILE_SOLR_NAME)
76
77 security.declarePrivate('solrAdd')
78 def solrAdd(self, w, uid, idxs) :
79 idxs = idxs if idxs else self.delegatedIndexes
80 # Filter out delegated indexes
81 idxs = [i for i in idxs if i in self.delegatedIndexes]
82 data = {'id' : uid}
83 for name in idxs :
84 attr = getattr(w, name, '')
85 data[name] = attr() if callable(attr) else attr
86 c = self._getSolrConnection()
87 c.add(**data)
88
89 # PortalCatalog api overloads
90 def catalog_object(self, obj, uid=None, idxs=None, update_metadata=1,
91 pghandler=None):
92 # Wraps the object with workflow and accessibility
93 # information just before cataloging.
94 if IIndexableObject.providedBy(obj):
95 w = obj
96 else:
97 w = queryMultiAdapter( (obj, self), IIndexableObject )
98 if w is None:
99 # BBB
100 w = IndexableObjectWrapper(obj, self)
101
102 idxs_ = idxs
103 if idxs:
104 # Filter out invalid indexes.
105 valid_indexes = self._catalog.indexes.keys()
106 idxs_ = [i for i in idxs if i in valid_indexes]
107
108 super(CatalogTool, self).catalog_object(w, uid, idxs_, update_metadata, pghandler)
109 self.solrAdd(w, uid, idxs)
110
111 security.declarePrivate('reindexObject')
112 def reindexObject(self, object, idxs=[], update_metadata=1, uid=None):
113 """Update catalog after object data has changed.
114
115 The optional idxs argument is a list of specific indexes
116 to update (all of them by default).
117
118 The update_metadata flag controls whether the object's
119 metadata record is updated as well.
120
121 If a non-None uid is passed, it will be used as the catalog uid
122 for the object instead of its physical path.
123 """
124 if uid is None:
125 uid = self.__url(object)
126
127 self.catalog_object(object, uid, idxs, update_metadata)
128
129 security.declarePrivate('unindexObject')
130 def unindexObject(self, object):
131 """Remove from catalog.
132 """
133 super(CatalogTool, self).unindexObject(object)
134 c = self._getSolrConnection()
135 url = self.__url(object)
136 c.delete(id=url)
137
138 InitializeClass(CatalogTool)
139
140
141 class DelegatedCatalog(Catalog) :
142 '''C'est ici qu'on délègue effectivement à Solr '''
143
144 def __init__(self, zcat, brains=None) :
145 Catalog.__init__(self, brains=brains)
146 self.zcat = zcat
147
148 def delegateSearch(self, query, plan) :
149 '''
150 retours faux :
151 None signifie : pas de délégation, il faut continuer à interroger les autres index.
152 IISet() vide : pas de résultat lors de la délégation, on peut arrêter la recherche.
153 '''
154 indexes = set(query.keys()).intersection(set(self.zcat.delegatedIndexes))
155 if not indexes :
156 return None
157 delegatedQuery = {}
158 for i in indexes :
159 delegatedQuery[i] = query.pop(i)
160 try : plan.remove(i)
161 except ValueError : pass
162 c = SolrConnection(self.zcat.solr_url)
163 q =' AND '.join(['%s:"%s"' % item for item in delegatedQuery.items()])
164 resp = c.query(q, fields='id', rows=len(self))
165 c.close()
166 return IISet(filter(None, [self.uids.get(r['id']) for r in resp.results]))
167
168 def search(self, query, sort_index=None, reverse=0, limit=None, merge=1):
169 """Iterate through the indexes, applying the query to each one. If
170 merge is true then return a lazy result set (sorted if appropriate)
171 otherwise return the raw (possibly scored) results for later merging.
172 Limit is used in conjuntion with sorting or scored results to inform
173 the catalog how many results you are really interested in. The catalog
174 can then use optimizations to save time and memory. The number of
175 results is not guaranteed to fall within the limit however, you should
176 still slice or batch the results as usual."""
177
178 rs = None # resultset
179
180 # Indexes fulfill a fairly large contract here. We hand each
181 # index the query mapping we are given (which may be composed
182 # of some combination of web request, kw mappings or plain old dicts)
183 # and the index decides what to do with it. If the index finds work
184 # for itself in the query, it returns the results and a tuple of
185 # the attributes that were used. If the index finds nothing for it
186 # to do then it returns None.
187
188 # Canonicalize the request into a sensible query before passing it on
189 query = self.make_query(query)
190
191 cr = self.getCatalogPlan(query)
192 cr.start()
193
194 plan = cr.plan()
195 if not plan:
196 plan = self._sorted_search_indexes(query)
197
198 # délégation
199 rs = self.delegateSearch(query, plan)
200 if rs is not None and not rs :
201 return LazyCat([])
202
203 indexes = self.indexes.keys()
204 for i in plan:
205 if i not in indexes:
206 # We can have bogus keys or the plan can contain index names
207 # that have been removed in the meantime
208 continue
209
210 index = self.getIndex(i)
211 _apply_index = getattr(index, "_apply_index", None)
212 if _apply_index is None:
213 continue
214
215 cr.start_split(i)
216 limit_result = ILimitedResultIndex.providedBy(index)
217 if limit_result:
218 r = _apply_index(query, rs)
219 else:
220 r = _apply_index(query)
221
222 if r is not None:
223 r, u = r
224 # Short circuit if empty result
225 # BBB: We can remove the "r is not None" check in Zope 2.14
226 # once we don't need to support the "return everything" case
227 # anymore
228 if r is not None and not r:
229 cr.stop_split(i, result=None, limit=limit_result)
230 return LazyCat([])
231
232 # provide detailed info about the pure intersection time
233 intersect_id = i + '#intersection'
234 cr.start_split(intersect_id)
235 # weightedIntersection preserves the values from any mappings
236 # we get, as some indexes don't return simple sets
237 if hasattr(rs, 'items') or hasattr(r, 'items'):
238 _, rs = weightedIntersection(rs, r)
239 else:
240 rs = intersection(rs, r)
241
242 cr.stop_split(intersect_id)
243
244 # consider the time it takes to intersect the index result with
245 # the total resultset to be part of the index time
246 cr.stop_split(i, result=r, limit=limit_result)
247 if not rs:
248 break
249 else:
250 cr.stop_split(i, result=None, limit=limit_result)
251
252 # Try to deduce the sort limit from batching arguments
253 b_start = int(query.get('b_start', 0))
254 b_size = query.get('b_size', None)
255 if b_size is not None:
256 b_size = int(b_size)
257
258 if b_size is not None:
259 limit = b_start + b_size
260 elif limit and b_size is None:
261 b_size = limit
262
263 if rs is None:
264 # None of the indexes found anything to do with the query
265 # We take this to mean that the query was empty (an empty filter)
266 # and so we return everything in the catalog
267 warnings.warn('Your query %s produced no query restriction. '
268 'Currently the entire catalog content is returned. '
269 'In Zope 2.14 this will result in an empty LazyCat '
270 'to be returned.' % repr(cr.make_key(query)),
271 DeprecationWarning, stacklevel=3)
272
273 rlen = len(self)
274 if sort_index is None:
275 sequence, slen = self._limit_sequence(self.data.items(), rlen,
276 b_start, b_size)
277 result = LazyMap(self.instantiate, sequence, slen,
278 actual_result_count=rlen)
279 else:
280 cr.start_split('sort_on')
281 result = self.sortResults(
282 self.data, sort_index, reverse, limit, merge,
283 actual_result_count=rlen, b_start=b_start,
284 b_size=b_size)
285 cr.stop_split('sort_on', None)
286 elif rs:
287 # We got some results from the indexes.
288 # Sort and convert to sequences.
289 # XXX: The check for 'values' is really stupid since we call
290 # items() and *not* values()
291 rlen = len(rs)
292 if sort_index is None and hasattr(rs, 'items'):
293 # having a 'items' means we have a data structure with
294 # scores. Build a new result set, sort it by score, reverse
295 # it, compute the normalized score, and Lazify it.
296
297 if not merge:
298 # Don't bother to sort here, return a list of
299 # three tuples to be passed later to mergeResults
300 # note that data_record_normalized_score_ cannot be
301 # calculated and will always be 1 in this case
302 getitem = self.__getitem__
303 result = [(score, (1, score, rid), getitem)
304 for rid, score in rs.items()]
305 else:
306 cr.start_split('sort_on')
307
308 rs = rs.byValue(0) # sort it by score
309 max = float(rs[0][0])
310
311 # Here we define our getter function inline so that
312 # we can conveniently store the max value as a default arg
313 # and make the normalized score computation lazy
314 def getScoredResult(item, max=max, self=self):
315 """
316 Returns instances of self._v_brains, or whatever is
317 passed into self.useBrains.
318 """
319 score, key = item
320 r=self._v_result_class(self.data[key])\
321 .__of__(aq_parent(self))
322 r.data_record_id_ = key
323 r.data_record_score_ = score
324 r.data_record_normalized_score_ = int(100. * score / max)
325 return r
326
327 sequence, slen = self._limit_sequence(rs, rlen, b_start,
328 b_size)
329 result = LazyMap(getScoredResult, sequence, slen,
330 actual_result_count=rlen)
331 cr.stop_split('sort_on', None)
332
333 elif sort_index is None and not hasattr(rs, 'values'):
334 # no scores
335 if hasattr(rs, 'keys'):
336 rs = rs.keys()
337 sequence, slen = self._limit_sequence(rs, rlen, b_start,
338 b_size)
339 result = LazyMap(self.__getitem__, sequence, slen,
340 actual_result_count=rlen)
341 else:
342 # sort. If there are scores, then this block is not
343 # reached, therefore 'sort-on' does not happen in the
344 # context of a text index query. This should probably
345 # sort by relevance first, then the 'sort-on' attribute.
346 cr.start_split('sort_on')
347 result = self.sortResults(rs, sort_index, reverse, limit,
348 merge, actual_result_count=rlen, b_start=b_start,
349 b_size=b_size)
350 cr.stop_split('sort_on', None)
351 else:
352 # Empty result set
353 result = LazyCat([])
354 cr.stop()
355 return result