PK!ankix/__init__.pyPK!!q~XXankix/ankix.pyimport sqlite3 from zipfile import ZipFile from tempfile import TemporaryDirectory import json from tqdm import tqdm import os import re import logging from .config import config from .util import MediaType from . import db def init(database, **kwargs): db.database.init(database, **kwargs) if not os.path.exists(database): db.create_all_tables() def update_config(markdown=False, srs=None): config.update(markdown=markdown, srs=srs) if db.Settings.get_or_none() is None: db.Settings().save() else: config.update(db.Settings.to_dict()) def import_apkg(src_apkg, skip_media=False): """ :param src_apkg: :param bool|list skip_media: :return: """ info = dict() with TemporaryDirectory() as temp_dir: with db.database.atomic(): with ZipFile(src_apkg) as zf: zf.extractall(temp_dir) conn = sqlite3.connect(os.path.join(temp_dir, 'collection.anki2')) conn.row_factory = sqlite3.Row try: d = dict(conn.execute('''SELECT * FROM col LIMIT 1''').fetchone()) for model in tqdm(tuple(json.loads(d['models']).values()), desc='models'): db.Model.create( id=model['id'], name=model['name'], css=model['css'] ) info.setdefault('model', dict())[int(model['id'])] = model for media_name in re.findall(r'url\([\'\"]((?!.*//)[^\'\"]+)[\'\"]\)', model['css']): info.setdefault('media', dict())\ .setdefault(MediaType.font, dict())\ .setdefault(media_name, [])\ .append(model['id']) for template in model['tmpls']: db_template = db.Template.get_or_create( model_id=model['id'], name=template['name'], question=template['qfmt'], answer=template['afmt'], )[0] info.setdefault('template', dict())[db_template.id] = template for deck in tqdm(tuple(json.loads(d['decks']).values()), desc='decks'): db.Deck.create( id=deck['id'], name=deck['name'] ) c = conn.execute('''SELECT * FROM notes''') for note in tqdm(c.fetchall(), desc='notes'): info_model = info['model'][note['mid']] header = [field['name'] for field in info_model['flds']] db_note = db.Note.create( id=note['id'], model_id=note['mid'], data=dict(zip(header, note['flds'].split('\u001f'))) ) for tag in set(t for t in note['tags'].split(' ') if t): db_tag = db.Tag.get_or_create( name=tag )[0] db_tag.notes.add(db_note) info.setdefault('note', dict())[note['id']] = dict(note) for media_name in re.findall(r'src=[\'\"]((?!.*//)[^\'\"]+)[\'\"]', note['flds']): info.setdefault('media', dict())\ .setdefault(MediaType.image, dict())\ .setdefault(media_name, [])\ .append(note['id']) for media_name in re.findall(r'\[sound:[^\]]+\]', note['flds']): info.setdefault('media', dict()) \ .setdefault(MediaType.audio, dict()) \ .setdefault(media_name, []) \ .append(note['id']) c = conn.execute('''SELECT * FROM cards''') for card in tqdm(c.fetchall(), desc='cards'): info_note = info['note'][card['nid']] info_model = info['model'][info_note['mid']] db_model = db.Model.get(id=info_model['id']) db_template = db_model.templates[0] if '{{cloze:' not in db_template.question: db_template = db_model.templates[card['ord']] db.Card.create( id=card['id'], note_id=card['nid'], deck_id=card['did'], template_id=db_template.id ) else: for db_template_n in db_model.templates: db.Card.create( id=card['id'], note_id=card['nid'], deck_id=card['did'], cloze_order=card['ord'] + 1, template_id=db_template_n.id ) for db_deck in db.Deck.select(): if not db_deck.cards: db_deck.delete_instance() finally: conn.close() if not skip_media: if skip_media is False: skip_media = [] with open(os.path.join(temp_dir, 'media')) as f: info_media = info.get('media', dict()) for media_id, media_name in tqdm(json.load(f).items(), desc='media'): with open(os.path.join(temp_dir, media_id), 'rb') as image_f: db_media = db.Media.create( id=int(media_id), name=media_name, data=image_f.read() ) if MediaType.image not in skip_media: for note_id in info_media.get(MediaType.image, dict()).get(media_name, []): db_media.notes.add(db.Note.get(id=note_id)) if MediaType.audio not in skip_media: for note_id in info_media.get(MediaType.audio, dict()).get(media_name, []): db_media.notes.add(db.Note.get(id=note_id)) if MediaType.font not in skip_media: for model_id in info_media.get(MediaType.font, dict()).get(media_name, []): db_media.models.add(db.Model.get(id=model_id)) if not db_media.notes and db_media.models: logging.error('%s not connected to Notes or Models. Deleting...', media_name) db_media.delete_instance() PK!llankix/config.pyfrom datetime import timedelta from .util import parse_srs class Config(dict): DEFAULT = { 'markdown': True, 'srs': [ timedelta(minutes=10), # 0 timedelta(hours=1), # 1 timedelta(hours=4), # 2 timedelta(hours=8), # 3 timedelta(days=1), # 4 timedelta(days=3), # 5 timedelta(weeks=1), # 6 timedelta(weeks=2), # 7 timedelta(weeks=4), # 8 timedelta(weeks=16) # 9 ] } def __init__(self): super(Config, self).__init__(**self.DEFAULT) def to_db(self): d = dict() for k, v in self.items(): d[k] = { 'srs': lambda x: parse_srs(x, self.DEFAULT['srs']), }.get(k, lambda x: x)(v) return d config = Config() PK! B B ankix/db.pyimport peewee as pv from playhouse import sqlite_ext, signals from playhouse.shortcuts import model_to_dict from datetime import datetime, timedelta from pytimeparse.timeparse import timeparse import random import sys import re import json import hashlib import magic from .config import config from .jupyter import HTML from .util import MediaType, parse_srs, do_markdown, build_base64 from .preview import TemplateMaker database = sqlite_ext.SqliteDatabase(None) class BaseModel(signals.Model): viewer_config = dict() def to_viewer(self): return model_to_dict(self) @classmethod def get_viewer(cls, records): from htmlviewer import PagedViewer return PagedViewer([r.to_viewer() for r in records], **cls.viewer_config) class Meta: database = database class Settings(BaseModel): DEFAULT = config.to_db() markdown = pv.BooleanField(default=DEFAULT['markdown']) _srs = pv.TextField(default=DEFAULT['srs']) @classmethod def to_dict(cls): db_settings = cls.get() d = model_to_dict(db_settings) d.pop('_srs') d['srs'] = db_settings.srs return d @property def srs(self): d = dict() for i, s in enumerate(json.loads(str(self._srs))): d[i] = timedelta(seconds=timeparse(s)) return d @srs.setter def srs(self, value): self._srs = parse_srs(value, self.srs) self.save() @signals.post_save(sender=Settings) def auto_update_config(model_class, instance, created): config.update(model_class.to_dict()) class Tag(BaseModel): name = pv.TextField(unique=True, collation='NOCASE') # notes def __repr__(self): return f'' def to_viewer(self): d = model_to_dict(self) d['notes'] = [repr(n) for n in self.notes] return d class Media(BaseModel): name = pv.TextField(unique=True) type_ = pv.TextField(default=MediaType.font) data = pv.BlobField() h = pv.TextField(unique=True) # models (for css) # notes def __repr__(self): return f'' @property def src(self): return build_base64(bytes(self.data)) @property def html(self): if self.type_ == MediaType.font: return f'' elif self.type_ == MediaType.audio: return f'