Utilisateur:Phe/Python/page cache.py

La bibliothèque libre.
# -*- coding: utf-8 -*-

# Copyright 2006, Philippe Elie.

import botpywi
import wikipedia
import pagegenerators
import catlib
import time
import datetime

def diff_time(a, b):
    d = a - b
    diff = d.days * 86400.0
    diff += d.seconds
    diff /= 86400
    return diff

def catlist(cat):
    result = ( [], [], [], )
    for tag, article in cat._getContentsNaive():
        if tag == 0 or tag == 1:
            result[tag].append(article.title())
        else:
            result[tag].append(article)
    return result

class dictionary_cache:
    def __init__(self, name):
        self.name = name
        try:
            self.cache = botpywi.load_obj(self.name)
        except:
            self.cache = {}

    def save(self):
        botpywi.save_obj(self.name, self.cache)

class page_object(object):
    def __init__(self, site, page_name):
        self.page = wikipedia.Page(site, page_name)
        if self.page.isCategory():
            self.page = catlib.Category(site, page_name)
        self.refs = {}
        # all other data are already stored in self.page

    def get_references(self, follow_redirects, withTemplateInclusion,
                        onlyTemplateInclusion, redirects_only):
        key = (follow_redirects, withTemplateInclusion, onlyTemplateInclusion, redirects_only)
        if not self.refs.has_key(key):
            self.refs[key] = [ x for x in self.page.getReferences(
                                  follow_redirects=follow_redirects,
                                  withTemplateInclusion=withTemplateInclusion,
                                  onlyTemplateInclusion=onlyTemplateInclusion,
                                  redirectsOnly=redirects_only) ]
        return self.refs[key]

    # We don't store linked page but we parse contents for each request
    def get_linked_page(self, nms):
        pages = self.page.linkedPages(True)
        return set([x for x in pages if nms == None or x.namespace() in nms])

class page_cache(dictionary_cache):
    def __init__(self, cache_name = 'page_cache'):
        dictionary_cache.__init__(self, cache_name)

    def get_page(self, page_name, site = None):
        if site == None:
            site = wikipedia.getSite()
        #page_name = botpywi.capitalize(page_name)
        page_name = wikipedia.Page(site, page_name).title()
        key = (site, page_name)
        if not self.cache.has_key(key):
            self.cache[key] = page_object(site, page_name)
        return self.cache[key]

    def purge(self, page_name, site = None):
        if site == None:
            site = wikipedia.getSite()
        key = (site, page_name)
        if self.cache.has_key(key):
            del self.cache[key]

    def get_redirect(self, page_name, site = None):
        if site == None:
            site = wikipedia.getSite()
        page = self.get_page(page_name, site).page
        page.get(get_redirect = True)
        if page.isRedirectPage():
            return page._redirarg
        return None

    def get(self, page_name, follow_redirect = True, site = None):
        if site == None:
            site = wikipedia.getSite()
        if follow_redirect:
            new_page_name = self.get_redirect(page_name, site)
            if new_page_name:
                page_name = new_page_name
        page = self.get_page(page_name, site).page
        return page.get(get_redirect = True)

    # convenience for backward compatiblity with older code using cache.py
    def read_page(self, page_name, site = None, follow_redirect = True):
        return self.get(page_name, follow_redirect, site)

    def write_page(self, page_name, text, comment, minor_edit = True, site = None):
        if site == None:
            site = wikipedia.getSite()
        page = self.get_page(page_name, site).page
        page.put_async(text, comment, False, minor_edit)
        # FIXME: put() doesn't update contents , for now remove it
        if hasattr(page, '_contents'):
            delattr(page, '_contents')

    def get_references(self, page_name, follow_redirects = True,
                       withTemplateInclusion = True,
                       onlyTemplateInclusion = False,
                       site = None,
                       redirects_only = False):
        page = self.get_page(page_name, site)
        refs = page.get_references(follow_redirects=follow_redirects,
                                   withTemplateInclusion=withTemplateInclusion,
                                   onlyTemplateInclusion=onlyTemplateInclusion,
                                   redirects_only=redirects_only)
        return set(refs)

    def get_linked_page(self, page_name, namespace = [ 0 ], site = None):
        page = self.get_page(page_name, site)
        return page.get_linked_page(namespace)

    def category(self, page_name, recurse, site, filtered_cat = []):
        if site == None:
            site = wikipedia.getSite()
        cats_done = set()
        for p in filtered_cat:
            if type(p) == type(u''):
                p = self.get_page(p, site).page
            cats_done.add(p)

        page = self.get_page(page_name, site).page
        cats_todo = [ page ]
        result = [ set(), set(), set() ]
        while cats_todo:
            cat = cats_todo.pop()
            (articles, sub_cats, super_cats) = catlist(cat)
            for p in articles:
                result[0].add(self.get_page(p, site).page)
            for p in sub_cats:
                if not self.get_page(p, site).page in cats_done:
                    result[1].add(self.get_page(p, site).page)
            for p in super_cats:
                if not self.get_page(p, site).page in cats_done:
                    result[2].add(self.get_page(p, site).page)
            cats_done.add(cat)
            if recurse:
                for cat in sub_cats:
                    cat = self.get_page(cat, site).page
                    if not cat in cats_done:
                        cats_todo.append(cat)
        return result

    def categorized_articles(self, page_name, recurse = False, site = None, filtered_cat = []):
        return self.category(page_name, recurse, site, filtered_cat)[0]

    def sub_cat(self, page_name, recurse = False, site = None, filtered_cat = []):
        return self.category(page_name, recurse, site, filtered_cat)[1]

    def super_cat(self, page_name, recurse = False, site = None, filtered_cat = []):
        return self.category(page_name, recurse, site, filtered_cat)[2]

    # second elapsed since 1970 0 0 0
    def last_modified(self, page_name, site = None):
        page = self.get_page(page_name, site).page
        text = page.getVersionHistory()

        t = time.strptime(text[0][0].encode('utf-8'), '%d %B %Y à %H:%M')
        #print '"' + text[0].encode('utf-8') + '"'
        # FIXME: using utf-8 as encoding doesn't work on debian stable
        # (2006/08) when date contains diacritics, look like a bug in python
        # debian stable since it works even with an old slack so we workaround
        # by using iso-8859-1 as enconding. This means than many language
        # can't be handled.
        #t = time.strptime(text[0][0].encode('iso-8859-1'), '%d %B %Y à %H:%M')
        return t

    def last_modified_since(self, page_name, site = None):
        t = self.last_modified(page_name, site)
        now = datetime.datetime(2005, 1, 1).utcnow()
        d = datetime.datetime(t[0], t[1], t[2], t[3], t[4], t[5], t[6])
        return diff_time(now, d)

    def history(self, page_name, site = None):
        page = self.get_page(page_name, site).page
        return page.getVersionHistory()

    # FIXME : use a generator object ? (we are likely to iterate on contents)
    # it's intended that all page in pages belong to the same site.
    def mass_load(self, pages, follow_redirect = False):
        page_list = []
        for p in pages:
            p = self.get_page(p.title(), p.site()).page
            if not hasattr(p, '_contents') and not hasattr(p,'_getexception'):
                page_list.append(p)
        if len(page_list):
            gen = pagegenerators.PreloadingGenerator(page_list)
            # we need to force the iteration
            [p for p in gen]
        if follow_redirect:
            new_pages = []
            # FIXME: equivalent to for p in page_list but safer ?
            for p in pages:
                try:
                    new_p = self.get_redirect(p.title(), p.site())
                    if new_p:
                        new_pages.append(self.get_page(new_p, p.site()).page)
                    #if new_p != None:
                    #    print p.title(), new_p
                except wikipedia.NoPage:
                    pass
                except wikipedia.SectionError:
                    pass
            print "mass_load() recurse", len(new_pages)
            self.mass_load(new_pages, False)

    def exists(self, page_name, site = None):
        page = self.get_page(page_name, site).page
        return page.exists()

    def _notify_update(self, user_page, text1):
        self.purge(user_page)
        text = self.read_page(user_page)
        text += text1
        self.write_page(user_page, text, u'Notification de mise à jour', True)
        
    def notify_update(self, page_name, user_page):
        text = u'\n\n== [[' + page_name + u']]==\n'
        text += u'Mise à jour. ~~~~'
        self._notify_update(user_page, text)

    def true_page_name(self, page_name, site = None):
        new_page_name = self.get_redirect(page_name, site)
        if new_page_name:
            page_name = new_page_name
        return page_name

if __name__ == "__main__":
    def test_ref_cache(title):
        print cache.get_references(title, False, False, False)
        print cache.get_references(title, False, False, True)
        print cache.get_references(title, False, True, False)
        print cache.get_references(title, False, True, True)
        print cache.get_references(title, True, False, False)
        print cache.get_references(title, True, False, True)
        print cache.get_references(title, True, True, False)
        print cache.get_references(title, True, True, True)

    try:
        cache = page_cache()
        #print cache.get(u'L\'art de la guerre', False)
        #print cache.get_redirect(u'L\'art de la guerre')
        #test_ref_cache(u'L\'art de la guerre')
        #test_ref_cache(u'L\'Art de la guerre')
        #test_ref_cache(u'Modèle:CatalogueMessier')
        #data = cache.get_linked_page(u'utilisateur:Phe')
        #print len(data)
        #print data
        #cache.purge(u'utilisateur:Phe')
        #data = cache.get_linked_page(u'utilisateur:Phe', [0, 1, 2, 3, 4, 6])
        #print len(data)
        #print data
        #print cache.get(u'utilisateur:Phe')
        data = cache.categorized_articles(u'Catégorie:Type de guerre', True)
        print len(data)
        print data
        data = cache.categorized_articles(u'Catégorie:Guerre contre le terrorisme', True)
        print len(data)
        print data
        cache.mass_load(data)
    finally:
        cache.save()
        wikipedia.stopme()