Utilisateur:Phe/Scripts/page cache.py
Apparence
< Utilisateur:Phe | Scripts
# -*- coding: utf-8 -*-
# Copyright 2006, Philippe Elie.
import botpywi
import wikipedia
import pagegenerators
import catlib
import time
import datetime
def diff_time(a, b):
d = a - b
diff = d.days * 86400.0
diff += d.seconds
diff /= 86400
return diff
def catlist(cat):
result = ( [], [], [], )
for tag, article in cat._getContentsNaive():
if tag == 0 or tag == 1:
result[tag].append(article.title())
else:
result[tag].append(article)
return result
class dictionary_cache:
def __init__(self, name):
self.name = name
try:
self.cache = botpywi.load_obj(self.name)
except:
self.cache = {}
def save(self):
botpywi.save_obj(self.name, self.cache)
class page_object(object):
def __init__(self, site, page_name):
self.page = wikipedia.Page(site, page_name)
if self.page.isCategory():
self.page = catlib.Category(site, page_name)
self.refs = {}
# all other data are already stored in self.page
def get_references(self, follow_redirects, withTemplateInclusion,
onlyTemplateInclusion, redirects_only):
key = (follow_redirects, withTemplateInclusion, onlyTemplateInclusion, redirects_only)
if not self.refs.has_key(key):
self.refs[key] = [ x for x in self.page.getReferences(
follow_redirects=follow_redirects,
withTemplateInclusion=withTemplateInclusion,
onlyTemplateInclusion=onlyTemplateInclusion,
redirectsOnly=redirects_only) ]
return self.refs[key]
# We don't store linked page but we parse contents for each request
def get_linked_page(self, nms):
pages = self.page.linkedPages(True)
return set([x for x in pages if nms == None or x.namespace() in nms])
class page_cache(dictionary_cache):
def __init__(self, cache_name = 'page_cache'):
dictionary_cache.__init__(self, cache_name)
def get_page(self, page_name, site = None):
if site == None:
site = wikipedia.getSite()
#page_name = botpywi.capitalize(page_name)
page_name = wikipedia.Page(site, page_name).title()
key = (site, page_name)
if not self.cache.has_key(key):
self.cache[key] = page_object(site, page_name)
return self.cache[key]
def purge(self, page_name, site = None):
if site == None:
site = wikipedia.getSite()
key = (site, page_name)
if self.cache.has_key(key):
del self.cache[key]
def get_redirect(self, page_name, site = None):
if site == None:
site = wikipedia.getSite()
page = self.get_page(page_name, site).page
page.get(get_redirect = True)
if page.isRedirectPage():
return page._redirarg
return None
def get(self, page_name, follow_redirect = True, site = None):
if site == None:
site = wikipedia.getSite()
if follow_redirect:
new_page_name = self.get_redirect(page_name, site)
if new_page_name:
page_name = new_page_name
page = self.get_page(page_name, site).page
return page.get(get_redirect = True)
# convenience for backward compatiblity with older code using cache.py
def read_page(self, page_name, site = None, follow_redirect = True):
return self.get(page_name, follow_redirect, site)
def write_page(self, page_name, text, comment, minor_edit = True, site = None):
if site == None:
site = wikipedia.getSite()
page = self.get_page(page_name, site).page
page.put_async(text, comment, False, minor_edit)
# FIXME: put() doesn't update contents , for now remove it
if hasattr(page, '_contents'):
delattr(page, '_contents')
def get_references(self, page_name, follow_redirects = True,
withTemplateInclusion = True,
onlyTemplateInclusion = False,
site = None,
redirects_only = False):
page = self.get_page(page_name, site)
refs = page.get_references(follow_redirects=follow_redirects,
withTemplateInclusion=withTemplateInclusion,
onlyTemplateInclusion=onlyTemplateInclusion,
redirects_only=redirects_only)
return set(refs)
def get_linked_page(self, page_name, namespace = [ 0 ], site = None):
page = self.get_page(page_name, site)
return page.get_linked_page(namespace)
def category(self, page_name, recurse, site, filtered_cat = []):
if site == None:
site = wikipedia.getSite()
cats_done = set()
for p in filtered_cat:
if type(p) == type(u''):
p = self.get_page(p, site).page
cats_done.add(p)
page = self.get_page(page_name, site).page
cats_todo = [ page ]
result = [ set(), set(), set() ]
while cats_todo:
cat = cats_todo.pop()
(articles, sub_cats, super_cats) = catlist(cat)
for p in articles:
result[0].add(self.get_page(p, site).page)
for p in sub_cats:
if not self.get_page(p, site).page in cats_done:
result[1].add(self.get_page(p, site).page)
for p in super_cats:
if not self.get_page(p, site).page in cats_done:
result[2].add(self.get_page(p, site).page)
cats_done.add(cat)
if recurse:
for cat in sub_cats:
cat = self.get_page(cat, site).page
if not cat in cats_done:
cats_todo.append(cat)
return result
def categorized_articles(self, page_name, recurse = False, site = None, filtered_cat = []):
return self.category(page_name, recurse, site, filtered_cat)[0]
def sub_cat(self, page_name, recurse = False, site = None, filtered_cat = []):
return self.category(page_name, recurse, site, filtered_cat)[1]
def super_cat(self, page_name, recurse = False, site = None, filtered_cat = []):
return self.category(page_name, recurse, site, filtered_cat)[2]
# second elapsed since 1970 0 0 0
def last_modified(self, page_name, site = None):
page = self.get_page(page_name, site).page
text = page.getVersionHistory()
t = time.strptime(text[0][0].encode('utf-8'), '%d %B %Y Ã %H:%M')
#print '"' + text[0].encode('utf-8') + '"'
# FIXME: using utf-8 as encoding doesn't work on debian stable
# (2006/08) when date contains diacritics, look like a bug in python
# debian stable since it works even with an old slack so we workaround
# by using iso-8859-1 as enconding. This means than many language
# can't be handled.
#t = time.strptime(text[0][0].encode('iso-8859-1'), '%d %B %Y à %H:%M')
return t
def last_modified_since(self, page_name, site = None):
t = self.last_modified(page_name, site)
now = datetime.datetime(2005, 1, 1).utcnow()
d = datetime.datetime(t[0], t[1], t[2], t[3], t[4], t[5], t[6])
return diff_time(now, d)
def history(self, page_name, site = None):
page = self.get_page(page_name, site).page
return page.getVersionHistory()
# FIXME : use a generator object ? (we are likely to iterate on contents)
# it's intended that all page in pages belong to the same site.
def mass_load(self, pages, follow_redirect = False):
page_list = []
for p in pages:
p = self.get_page(p.title(), p.site()).page
if not hasattr(p, '_contents') and not hasattr(p,'_getexception'):
page_list.append(p)
if len(page_list):
gen = pagegenerators.PreloadingGenerator(page_list)
# we need to force the iteration
[p for p in gen]
if follow_redirect:
new_pages = []
# FIXME: equivalent to for p in page_list but safer ?
for p in pages:
try:
new_p = self.get_redirect(p.title(), p.site())
if new_p:
new_pages.append(self.get_page(new_p, p.site()).page)
#if new_p != None:
# print p.title(), new_p
except wikipedia.NoPage:
pass
except wikipedia.SectionError:
pass
print "mass_load() recurse", len(new_pages)
self.mass_load(new_pages, False)
def exists(self, page_name, site = None):
page = self.get_page(page_name, site).page
return page.exists()
def _notify_update(self, user_page, text1):
self.purge(user_page)
text = self.read_page(user_page)
text += text1
self.write_page(user_page, text, u'Notification de mise à jour', True)
def notify_update(self, page_name, user_page):
text = u'\n\n== [[' + page_name + u']]==\n'
text += u'Mise à jour. ~~~~'
self._notify_update(user_page, text)
def true_page_name(self, page_name, site = None):
new_page_name = self.get_redirect(page_name, site)
if new_page_name:
page_name = new_page_name
return page_name
if __name__ == "__main__":
def test_ref_cache(title):
print cache.get_references(title, False, False, False)
print cache.get_references(title, False, False, True)
print cache.get_references(title, False, True, False)
print cache.get_references(title, False, True, True)
print cache.get_references(title, True, False, False)
print cache.get_references(title, True, False, True)
print cache.get_references(title, True, True, False)
print cache.get_references(title, True, True, True)
try:
cache = page_cache()
#print cache.get(u'L\'art de la guerre', False)
#print cache.get_redirect(u'L\'art de la guerre')
#test_ref_cache(u'L\'art de la guerre')
#test_ref_cache(u'L\'Art de la guerre')
#test_ref_cache(u'Modèle:CatalogueMessier')
#data = cache.get_linked_page(u'utilisateur:Phe')
#print len(data)
#print data
#cache.purge(u'utilisateur:Phe')
#data = cache.get_linked_page(u'utilisateur:Phe', [0, 1, 2, 3, 4, 6])
#print len(data)
#print data
#print cache.get(u'utilisateur:Phe')
data = cache.categorized_articles(u'Catégorie:Type de guerre', True)
print len(data)
print data
data = cache.categorized_articles(u'Catégorie:Guerre contre le terrorisme', True)
print len(data)
print data
cache.mass_load(data)
finally:
cache.save()
wikipedia.stopme()