PK &F2^* * tacl/tokenizer.py"""Module containing the Tokenizer class."""
import re
class Tokenizer:
"""A tokenizer that splits a string using a regular expression.
Based on the RegexpTokenizer from the Natural Language Toolkit.
"""
def __init__ (self, pattern, joiner, flags=re.UNICODE | re.MULTILINE |
re.DOTALL):
try:
self._regexp = re.compile(pattern, flags)
except re.error as err:
raise ValueError('Error in regular expression %r: %s' %
(pattern, err))
self._joiner = joiner
self._pattern = pattern
@property
def joiner (self):
return self._joiner
@property
def pattern (self):
return self._pattern
def tokenize (self, text):
return self._regexp.findall(text)
PK LtGW: =K =K tacl/report.py"""Module containing the Report class."""
import logging
import re
import pandas as pd
from . import constants
from .text import BaseText
class Report:
def __init__ (self, matches, tokenizer):
self._logger = logging.getLogger(__name__)
self._matches = pd.read_csv(matches, encoding='utf-8', na_filter=False)
# Work around a problem with CSV files produced on Windows
# being read by pandas and creating an empty row for each
# actual row.
self._matches = self._matches.dropna(how='all')
self._tokenizer = tokenizer
def csv (self, fh):
"""Writes the report data to `fh` in CSV format and returns it.
:param fh: file to write data to
:type fh: file object
:rtype: file object
"""
self._matches.to_csv(fh, encoding='utf-8', float_format='%d',
index=False)
return fh
def extend (self, corpus):
self._logger.info('Extending results')
if self._matches.empty:
return
highest_n = self._matches[constants.SIZE_FIELDNAME].max()
if highest_n == 1:
self._logger.warning(
'Extending results that contain only 1-grams is unsupported; '
'the original results will be used')
return
# Supply the extender with only matches on the largest
# n-grams.
matches = self._matches[
self._matches[constants.SIZE_FIELDNAME] == highest_n]
extended_matches = pd.DataFrame()
cols = [constants.NAME_FIELDNAME, constants.SIGLUM_FIELDNAME,
constants.LABEL_FIELDNAME]
for index, (text_name, siglum, label) in \
matches[cols].drop_duplicates().iterrows():
extended_ngrams = self._generate_extended_ngrams(
matches, text_name, siglum, label, corpus, highest_n)
extended_matches = pd.concat(
[extended_matches, self._generate_extended_matches(
extended_ngrams, highest_n, text_name, siglum, label)])
extended_ngrams = None
extended_matches = extended_matches.reindex_axis(
constants.QUERY_FIELDNAMES, axis=1)
extended_matches = self._reciprocal_remove(extended_matches)
self._matches = self._matches.append(extended_matches)
def _generate_extended_matches (self, extended_ngrams, highest_n, name,
siglum, label):
"""Returns extended match data derived from `extended_ngrams`.
This extended match data are the counts for all intermediate
n-grams within each extended n-gram.
:param extended_ngrams: extended n-grams
:type extended_ngrams: `list` of `str`
:param highest_n: the highest degree of n-grams in the original results
:type highest_n: `int`
:param name: name of the text bearing `extended_ngrams`
:type name: `str`
:param siglum: siglum of the text bearing `extended_ngrams`
:type siglum: `str`
:param label: label associated with the text
:type label: `str`
:rtype: `pandas.DataFrame`
"""
# Add data for each n-gram within each extended n-gram. Since
# this treats each extended piece of text separately, the same
# n-gram may be generated more than once, so the complete set
# of new possible matches for this filename needs to combine
# the counts for such.
rows_list = []
for extended_ngram in extended_ngrams:
text = BaseText(extended_ngram, self._tokenizer)
for size, ngrams in text.get_ngrams(highest_n+1,
len(text.get_tokens())):
data = [{constants.NAME_FIELDNAME: name,
constants.SIGLUM_FIELDNAME: siglum,
constants.LABEL_FIELDNAME: label,
constants.SIZE_FIELDNAME: size,
constants.NGRAM_FIELDNAME: ngram,
constants.COUNT_FIELDNAME: count}
for ngram, count in ngrams.items()]
rows_list.extend(data)
self._logger.debug('Number of extended results: {}'.format(
len(rows_list)))
extended_matches = pd.DataFrame(rows_list)
rows_list = None
self._logger.debug('Finished generating intermediate extended matches')
# extended_matches may be an empty DataFrame, in which case
# manipulating it on the basis of non-existing columns is not
# going to go well.
groupby_fields = [constants.NGRAM_FIELDNAME, constants.NAME_FIELDNAME,
constants.SIGLUM_FIELDNAME, constants.SIZE_FIELDNAME,
constants.LABEL_FIELDNAME]
if constants.NGRAM_FIELDNAME in extended_matches:
extended_matches = extended_matches.groupby(
groupby_fields).sum().reset_index()
return extended_matches
def _generate_extended_ngrams (self, matches, name, siglum, label, corpus,
highest_n):
"""Returns the n-grams of the largest size that exist in `siglum`
witness to `name` text under `label`, generated from adding
together overlapping n-grams in `matches`.
:param matches: n-gram matches
:type matches: `pandas.DataFrame`
:param name: name of text whose results are being processed
:type name: `str`
:param siglum: siglum of witness whose results are being processed
:type siglum: `str`
:param label: label of witness whose results are being processed
:type label: `str`
:param corpus: corpus to which `filename` belongs
:type corpus: `Corpus`
:param highest_n: highest degree of n-gram in `matches`
:type highest_n: `int`
:rtype: `list` of `str`
"""
# For large result sets, this method may involve a lot of
# processing within the for loop, so optimise even small
# things, such as aliasing dotted calls here and below.
t_join = self._tokenizer.joiner.join
witness_matches = matches[
(matches[constants.NAME_FIELDNAME] == name) &
(matches[constants.SIGLUM_FIELDNAME] == siglum) &
(matches[constants.LABEL_FIELDNAME] == label)]
text = t_join(corpus.get_text(name, siglum).get_tokens())
ngrams = [tuple(self._tokenizer.tokenize(ngram)) for ngram in
list(witness_matches[constants.NGRAM_FIELDNAME])]
# Go through the list of n-grams, and create a list of
# extended n-grams by joining two n-grams together that
# overlap (a[-overlap:] == b[:-1]) and checking that the result
# occurs in text.
working_ngrams = ngrams[:]
extended_ngrams = set(ngrams)
new_working_ngrams = []
overlap = highest_n - 1
# Create an index of n-grams by their overlapping portion,
# pointing to the non-overlapping token.
ngram_index = {}
for ngram in ngrams:
values = ngram_index.setdefault(ngram[:-1], [])
values.append(ngram[-1:])
extended_add = extended_ngrams.add
new_working_append = new_working_ngrams.append
ngram_size = highest_n
while working_ngrams:
removals = set()
ngram_size += 1
self._logger.debug(
'Iterating over {} n-grams to produce {}-grams'.format(
len(working_ngrams), ngram_size))
for base in working_ngrams:
remove_base = False
base_overlap = base[-overlap:]
for next_token in ngram_index.get(base_overlap, []):
extension = base + next_token
if t_join(extension) in text:
extended_add(extension)
new_working_append(extension)
remove_base = True
if remove_base:
# Remove base from extended_ngrams, because it is
# now encompassed by extension.
removals.add(base)
extended_ngrams -= removals
working_ngrams = new_working_ngrams[:]
new_working_ngrams = []
new_working_append = new_working_ngrams.append
extended_ngrams = sorted(extended_ngrams, key=len, reverse=True)
extended_ngrams = [t_join(ngram) for ngram in extended_ngrams]
self._logger.debug('Generated {} extended n-grams'.format(
len(extended_ngrams)))
self._logger.debug('Longest generated n-gram: {}'.format(
extended_ngrams[0]))
# In order to get the counts correct in the next step of the
# process, these n-grams must be overlaid over the text and
# repeated as many times as there are matches. N-grams that do
# not match (and they may not match on previously matched
# parts of the text) are discarded.
ngrams = []
for ngram in extended_ngrams:
# Remove from the text those parts that match. Replace
# them with a double space, which should prevent any
# incorrect match on the text from each side of the match
# that is now contiguous.
text, count = re.subn(re.escape(ngram), ' ', text)
ngrams.extend([ngram] * count)
self._logger.debug('Aligned extended n-grams with the text; '
'{} distinct n-grams exist'.format(len(ngrams)))
return ngrams
def _generate_substrings (self, ngram, size):
"""Returns a list of all substrings of `ngram`.
:param ngram: n-gram to generate substrings of
:type ngram: `str`
:param size: size of `ngram`
:type size: `int`
:rtype: `list`
"""
text = BaseText(ngram, self._tokenizer)
substrings = []
for sub_size, ngrams in text.get_ngrams(1, size-1):
for sub_ngram, count in ngrams.items():
substrings.extend([sub_ngram] * count)
return substrings
def prune_by_ngram_count (self, minimum=None, maximum=None):
"""Removes results rows whose total n-gram count (across all
texts bearing this n-gram) is outside the range specified by
`minimum` and `maximum`.
:param minimum: minimum n-gram count
:type minimum: `int`
:param maximum: maximum n-gram count
:type maximum: `int`
"""
self._logger.info('Pruning results by n-gram count')
counts = pd.DataFrame(self._matches.groupby(constants.NGRAM_FIELDNAME)[
constants.COUNT_FIELDNAME].sum())
counts.rename(columns={constants.COUNT_FIELDNAME: 'tmp_count'},
inplace=True)
if minimum:
counts = counts[counts['tmp_count'] >= minimum]
if maximum:
counts = counts[counts['tmp_count'] <= maximum]
self._matches = pd.merge(self._matches, counts,
left_on=constants.NGRAM_FIELDNAME,
right_index=True)
del self._matches['tmp_count']
def prune_by_ngram_size (self, minimum=None, maximum=None):
"""Removes results rows whose n-gram size is outside the
range specified by `minimum` and `maximum`.
:param minimum: minimum n-gram size
:type minimum: `int`
:param maximum: maximum n-gram size
:type maximum: `int`
"""
self._logger.info('Pruning results by n-gram size')
if minimum:
self._matches = self._matches[
self._matches[constants.SIZE_FIELDNAME] >= minimum]
if maximum:
self._matches = self._matches[
self._matches[constants.SIZE_FIELDNAME] <= maximum]
def prune_by_text_count (self, minimum=None, maximum=None):
"""Removes results rows for n-grams that are not attested in a
number of texts in the range specified by `minimum` and
`maximum`.
Text here encompasses all witnesses, so that the same n-gram
appearing in multiple witnesses of the same text are counted
as a single text.
:param minimum: minimum number of texts
:type minimum: `int`
:param maximum: maximum number of texts
:type maximum: `int`
"""
self._logger.info('Pruning results by text count')
count_fieldname = 'tmp_count'
filtered = self._matches[self._matches[constants.COUNT_FIELDNAME] > 0]
grouped = filtered.groupby(constants.NGRAM_FIELDNAME)
counts = pd.DataFrame(grouped[constants.NAME_FIELDNAME].nunique())
counts.rename(columns={constants.NAME_FIELDNAME: count_fieldname},
inplace=True)
if minimum:
counts = counts[counts[count_fieldname] >= minimum]
if maximum:
counts = counts[counts[count_fieldname] <= maximum]
self._matches = pd.merge(self._matches, counts,
left_on=constants.NGRAM_FIELDNAME,
right_index=True)
del self._matches[count_fieldname]
def reciprocal_remove (self):
"""Removes results rows for which the n-gram is not present in
at least one text in each labelled set of texts."""
self._logger.info(
'Removing n-grams that are not attested in all labels')
self._matches = self._reciprocal_remove(self._matches)
def _reciprocal_remove (self, matches):
number_labels = matches[constants.LABEL_FIELDNAME].nunique()
filtered = matches[matches[constants.COUNT_FIELDNAME] > 0]
grouped = filtered.groupby(constants.NGRAM_FIELDNAME)
return grouped.filter(
lambda x: x[constants.LABEL_FIELDNAME].nunique() == number_labels)
def reduce (self):
"""Removes results rows whose n-grams are contained in larger
n-grams."""
self._logger.info('Reducing the n-grams')
# This does not make use of any pandas functionality; it
# probably could, and if so ought to.
data = {}
labels = {}
# Derive a convenient data structure from the rows.
for row_index, row in self._matches.iterrows():
name = row[constants.NAME_FIELDNAME]
siglum = row[constants.SIGLUM_FIELDNAME]
labels[name] = row[constants.LABEL_FIELDNAME]
text_data = data.setdefault((name, siglum), {})
text_data[row[constants.NGRAM_FIELDNAME]] = {
'count': int(row[constants.COUNT_FIELDNAME]),
'size': int(row[constants.SIZE_FIELDNAME])}
for text_data in data.values():
ngrams = list(text_data.keys())
ngrams.sort(key=lambda ngram: text_data[ngram]['size'],
reverse=True)
for ngram in ngrams:
if text_data[ngram]['count'] > 0:
self._reduce_by_ngram(text_data, ngram)
# Recreate rows from the modified data structure.
rows = []
for (name, siglum), text_data in data.items():
for ngram, ngram_data in text_data.items():
count = ngram_data['count']
if count > 0:
rows.append(
{constants.NGRAM_FIELDNAME: ngram,
constants.SIZE_FIELDNAME: ngram_data['size'],
constants.NAME_FIELDNAME: name,
constants.SIGLUM_FIELDNAME: siglum,
constants.COUNT_FIELDNAME: count,
constants.LABEL_FIELDNAME: labels[name]})
if rows:
self._matches = pd.DataFrame(
rows, columns=constants.QUERY_FIELDNAMES)
else:
self._matches = pd.DataFrame()
def _reduce_by_ngram (self, data, ngram):
"""Lowers the counts of all n-grams in `data` that are
substrings of `ngram` by `ngram`\'s count.
Modifies `data` in place.
:param data: row data dictionary for the current text
:type data: `dict`
:param ngram: n-gram being reduced
:type ngram: `str`
"""
# Find all substrings of `ngram` and reduce their count by the
# count of `ngram`. Substrings may not exist in `data`.
count = data[ngram]['count']
for substring in self._generate_substrings(ngram, data[ngram]['size']):
try:
substring_data = data[substring]
except KeyError:
continue
else:
substring_data['count'] -= count
def remove_label (self, label):
self._logger.info('Removing label "{}"'.format(label))
count = self._matches[constants.LABEL_FIELDNAME].value_counts()[label]
self._matches = self._matches[
self._matches[constants.LABEL_FIELDNAME] != label]
self._logger.info('Removed {} labelled results'.format(count))
def sort (self):
self._matches.sort_values(
by=[constants.SIZE_FIELDNAME, constants.NGRAM_FIELDNAME,
constants.COUNT_FIELDNAME, constants.LABEL_FIELDNAME,
constants.NAME_FIELDNAME, constants.SIGLUM_FIELDNAME],
ascending=[False, True, False, True, True, True], inplace=True)
def zero_fill (self, corpus, catalogue):
"""Adds rows to the results to ensure that, for every n-gram that is
attested in at least one witness, every witness for that text
has a row, with added rows having a count of zero.
:param corpus: corpus containing the texts appearing in the results
:type corpus: `Corpus`
:param catalogue: catalogue used in the generation of the results
:type catalogue: `Catalogue`
"""
zero_rows = []
# Get all of the texts, and their witnesses, for each label.
data = {}
for text, label in iter(catalogue.items()):
data.setdefault(label, {})[text] = []
for siglum in corpus.get_sigla(text):
data[label][text].append(siglum)
grouping_cols = [constants.LABEL_FIELDNAME, constants.NGRAM_FIELDNAME,
constants.SIZE_FIELDNAME, constants.NAME_FIELDNAME]
grouped = self._matches.groupby(grouping_cols, sort=False)
for (label, ngram, size, text), group in grouped:
row_data = {
constants.NGRAM_FIELDNAME: ngram,
constants.LABEL_FIELDNAME: label,
constants.SIZE_FIELDNAME: size,
constants.COUNT_FIELDNAME: 0,
constants.NAME_FIELDNAME: text,
}
for siglum in data[label][text]:
if group[group[constants.SIGLUM_FIELDNAME] == siglum].empty:
row_data[constants.SIGLUM_FIELDNAME] = siglum
zero_rows.append(row_data)
zero_df = pd.DataFrame(zero_rows, columns=constants.QUERY_FIELDNAMES)
self._matches = pd.concat([self._matches, zero_df])
PK &FA tacl/text.py"""Module containing the Text class."""
import collections
import hashlib
import os.path
class BaseText:
def __init__ (self, content, tokenizer):
self._content = content
self._tokenizer = tokenizer
def get_content (self):
"""Returns the content of this text.
:rtype: `str`
"""
return self._content
def get_ngrams (self, minimum, maximum, skip_sizes=None):
"""Returns a generator supplying the n-grams (`minimum` <= n
<= `maximum`) for this text.
Each iteration of the generator supplies a tuple consisting of
the size of the n-grams and a `collections.Counter` of the
n-grams.
:param minimum: minimum n-gram size
:type minimum: `int`
:param maximum: maximum n-gram size
:type maximum: `int`
:rtype: `generator`
"""
skip_sizes = skip_sizes or []
tokens = self.get_tokens()
for size in range(minimum, maximum + 1):
if size not in skip_sizes:
ngrams = collections.Counter(self._ngrams(tokens, size))
yield (size, ngrams)
def get_tokens (self):
"""Returns a list of tokens in this text."""
return self._tokenizer.tokenize(self._content)
def _ngrams (self, sequence, degree):
"""Returns the n-grams generated from `sequence`.
Based on the ngrams function from the Natural Language
Toolkit.
Each n-gram in the returned list is a string with whitespace
removed.
:param sequence: the source data to be converted into n-grams
:type sequence: sequence
:param degree: the degree of the n-grams
:type degree: int
:rtype: `list` of `str`
"""
count = max(0, len(sequence) - degree + 1)
# The extra split and join are due to having to handle
# whitespace within a CBETA token (eg, [(禾*尤)\n/上/日]).
return [self._tokenizer.joiner.join(
self._tokenizer.joiner.join(sequence[i:i+degree]).split())
for i in range(count)]
class Text (BaseText):
def __init__ (self, name, siglum, content, tokenizer):
super().__init__(content, tokenizer)
self._name = name
self._siglum = siglum
self._filename = self.assemble_filename(name, siglum)
@staticmethod
def assemble_filename (name, siglum):
return os.path.join(name, siglum + '.txt')
def get_checksum (self):
"""Returns the checksum for the content of this text.
:rtype: `str`
"""
return hashlib.md5(self._content.encode('utf-8')).hexdigest()
def get_filename (self):
"""Returns the filename of this text.
:rtype: `str`
"""
return self._filename
def get_names (self):
"""Returns the name and siglum of this text.
:rtype: `tuple`
"""
return self._name, self._siglum
PK sGdK K tacl/constants.py"""Module containing constants."""
TOKENIZER_CHOICE_CBETA = 'cbeta'
TOKENIZER_CHOICE_PAGEL = 'pagel'
TOKENIZER_CHOICES = [TOKENIZER_CHOICE_CBETA, TOKENIZER_CHOICE_PAGEL]
# For the CBETA (Chinese) tokenizer, a token is either a workaround
# (anything in square brackets, as a whole), or a single word
# character. Tokens are grouped together (when constituted into
# n-grams) by an empty string.
TOKENIZER_PATTERN_CBETA = r'\[[^]]*\]|\w'
TOKENIZER_JOINER_CBETA = ''
# For the Pagel (Tibetan) tokenizer, a token is a continuous set of
# word (plus some punctuation) characters. Tokens are grouped together
# (when constituted into n-grams) by a space.
TOKENIZER_PATTERN_PAGEL = r"[\w'\-+?~]+"
TOKENIZER_JOINER_PAGEL = ' '
TOKENIZERS = {
TOKENIZER_CHOICE_CBETA: [TOKENIZER_PATTERN_CBETA, TOKENIZER_JOINER_CBETA],
TOKENIZER_CHOICE_PAGEL: [TOKENIZER_PATTERN_PAGEL, TOKENIZER_JOINER_PAGEL],
}
# Sequencer scoring values.
IDENTICAL_CHARACTER_SCORE = 1
DIFFERENT_CHARACTER_SCORE = -1
OPEN_GAP_PENALTY = -0.5
EXTEND_GAP_PENALTY = -0.1
# The threshold is the ratio between the alignment score and the
# length of the text being aligned below which the alignment is used
# as is, rather than further expanded.
SCORE_THRESHOLD = 0.75
# CSV field names.
COUNT_FIELDNAME = 'count'
COUNT_TOKENS_FIELDNAME = 'matching tokens'
LABEL_FIELDNAME = 'label'
NAME_FIELDNAME = 'text name'
NGRAM_FIELDNAME = 'ngram'
NGRAMS_FIELDNAME = 'ngrams'
NUMBER_FIELDNAME = 'number'
PERCENTAGE_FIELDNAME = 'percentage'
SIGLUM_FIELDNAME = 'siglum'
SIZE_FIELDNAME = 'size'
TOTAL_NGRAMS_FIELDNAME = 'total ngrams'
TOTAL_TOKENS_FIELDNAME = 'total tokens'
UNIQUE_NGRAMS_FIELDNAME = 'unique ngrams'
QUERY_FIELDNAMES = [NGRAM_FIELDNAME, SIZE_FIELDNAME, NAME_FIELDNAME,
SIGLUM_FIELDNAME, COUNT_FIELDNAME, LABEL_FIELDNAME]
COUNTS_FIELDNAMES = [NAME_FIELDNAME, SIGLUM_FIELDNAME, SIZE_FIELDNAME,
UNIQUE_NGRAMS_FIELDNAME, TOTAL_NGRAMS_FIELDNAME,
TOTAL_TOKENS_FIELDNAME, LABEL_FIELDNAME]
SEARCH_FIELDNAMES = [NAME_FIELDNAME, SIGLUM_FIELDNAME, COUNT_FIELDNAME,
LABEL_FIELDNAME, NGRAMS_FIELDNAME, NUMBER_FIELDNAME]
STATISTICS_FIELDNAMES = [NAME_FIELDNAME, SIGLUM_FIELDNAME,
COUNT_TOKENS_FIELDNAME, TOTAL_TOKENS_FIELDNAME,
PERCENTAGE_FIELDNAME, LABEL_FIELDNAME]
# Command-line documentation strings.
ENCODING_EPILOG = '''\
Due to encoding issues, you may need to set the environment
variable PYTHONIOENCODING to "utf-8".'''
ALIGN_DESCRIPTION = '''\
Generates an HTML report giving tables showing aligned sequences
of text between each text within each label and all of the texts
in the other labels, within a set of results. This functionality
is only appropriate for intersect results.'''
ALIGN_EPILOG = ENCODING_EPILOG + '''\
\n\nThis function requires the Biopython suite of software to be
installed. It is extremely slow and resource hungry when the
overlap between two texts is very great.'''
ALIGN_HELP = 'Show aligned sets of matches between two texts side by side.'
ALIGN_MINIMUM_SIZE_HELP = 'Minimum size of n-gram to base sequences around.'
ALIGN_OUTPUT_HELP = 'Directory to output alignment files to.'
ASYMMETRIC_HELP = 'Label of sub-corpus to restrict results to.'
CATALOGUE_CATALOGUE_HELP = 'Path to catalogue file.'
CATALOGUE_DESCRIPTION = 'Generate a catalogue file.'
CATALOGUE_EPILOG = '''\
This command is just a convenience for generating a base catalogue
file to then be customised manually.'''
CATALOGUE_HELP = 'Generate a catalogue file.'
CATALOGUE_LABEL_HELP = 'Label to use for all texts.'
COUNTS_DESCRIPTION = 'List counts of n-grams in each labelled text.'
COUNTS_EPILOG = ENCODING_EPILOG
COUNTS_HELP = 'List counts of n-grams in each labelled text.'
DB_CORPUS_HELP = 'Path to corpus.'
DB_DATABASE_HELP = 'Path to database file.'
DB_MEMORY_HELP = '''\
Use RAM for temporary database storage.
This may cause an out of memory error, in which case run the
command without this switch.'''
DB_RAM_HELP = 'Number of gigabytes of RAM to use.'
DB_TOKENIZER_HELP = '''\
Type of tokenizer to use. The "cbeta" tokenizer is suitable for
the Chinese CBETA texts (tokens are single characters or
workaround clusters within square brackets). The "pagel" tokenizer
is for use with the transliterated Tibetan corpus (tokens are sets
of word characters plus some punctuation used to transliterate
characters).'''
DIFF_DESCRIPTION = '''\
List n-grams unique to each sub-corpus (as defined by the labels
in the specified catalogue file).'''
DIFF_EPILOG = ENCODING_EPILOG
DIFF_HELP = 'List n-grams unique to each sub-corpus.'
HIGHLIGHT_BASE_NAME_HELP = 'Name of text to display.'
HIGHLIGHT_BASE_SIGLUM_HELP = 'Siglum of text to display.'
HIGHLIGHT_DESCRIPTION = '''\
Output an HTML document showing a text with its matches visually
highlighted.'''
HIGHLIGHT_EPILOG = '''\
The scope of the supplied results may have a dramatic influence on
the amount of highlighting. Results containing 1-grams are very
likely to be almost entirely highlighted. Results may be
restricted by using the tacl report command.
Example:
tacl highlight corpus/stripped/ intersect.csv T0001 元'''
HIGHLIGHT_HELP = 'Output a text with its matches visually highlighted.'
INTERSECT_DESCRIPTION = '''\
List n-grams common to all sub-corpora (as defined by the labels
in the specified catalogue file).'''
INTERSECT_EPILOG = ENCODING_EPILOG
INTERSECT_HELP = 'List n-grams common to all sub-corpora.'
NGRAMS_DESCRIPTION = 'Generate n-grams from a corpus.'
NGRAMS_HELP = 'Generate n-grams from a corpus.'
NGRAMS_MAXIMUM_HELP = 'Maximum size of n-gram to generate (integer).'
NGRAMS_MINIMUM_HELP = 'Minimum size of n-gram to generate (integer).'
PREPARE_DESCRIPTION = '''\
Convert CBETA TEI XML files (which may have multiple files per
text) into XML suitable for processing via the tacl strip
command.'''
PREPARE_HELP = 'Convert CBETA TEI XML files into an XML form suitable for stripping.'
PREPARE_INPUT_HELP = 'Directory containing XML files to prepare.'
PREPARE_OUTPUT_HELP = 'Directory to output prepared files to.'
REPORT_CATALOGUE_HELP = '''\
Path to the catalogue file used to generate the results'''
REPORT_DESCRIPTION = '''\
Modify a query results file by removing certain results. Outputs
the new set of results.'''
REPORT_EXTEND_HELP = '''\
Extend the results to list the highest size grams that also count
as matches, going beyond the maximum size recorded in the
database. This has no effect on the results of a diff query, or if
the results contain only 1-grams.'''
REPORT_EPILOG = '''\
If more than one modifier is specified, they are applied in the
following order: --extend, --reduce, --reciprocal, --zero-fill,
--min/max-texts, --min/max-size, --min/max-count, --remove.
It is important to be careful with the use of --reduce. Coupled
with --max-size, many results may be discarded without trace
(since the reduce occurs first). Note too that performing "reduce"
on a set of results more than once will make the results
inaccurate!
Since this command always outputs a valid results file, its output
can be used as input for a subsequent tacl report command. To
chain commands together without creating an intermediate file,
pipe the commands together and use - instead of a filename, as:
tacl report --recriprocal results.csv | tacl report --reduce -\n\n''' \
+ ENCODING_EPILOG
REPORT_HELP = 'Modify a query results file.'
REPORT_MINIMUM_COUNT_HELP = 'Minimum total count of n-gram to include.'
REPORT_MAXIMUM_COUNT_HELP = 'Maximum total count of n-gram to include.'
REPORT_MINIMUM_SIZE_HELP = 'Minimum size of n-grams to include.'
REPORT_MAXIMUM_SIZE_HELP = 'Maximum size of n-grams to include.'
REPORT_MINIMUM_TEXT_HELP = 'Minimum count of texts containing n-gram to include.'
REPORT_MAXIMUM_TEXT_HELP = 'Maximum count of texts containing n-gram to include.'
REPORT_RECIPROCAL_HELP = '''\
Remove n-grams that are not attested by at least one text in each
labelled set of texts. This can be useful after reducing a set of
intersection results.'''
REPORT_REDUCE_HELP = 'Remove n-grams that are contained in larger n-grams.'
REPORT_REMOVE_HELP = 'Remove labelled results.'
REPORT_RESULTS_HELP = 'Path to CSV results; use - for stdin.'
REPORT_SORT_HELP = 'Sort the results.'
REPORT_ZERO_FILL_HELP = '''\
Add rows with a count of 0 for each n-gram in each witness of a
text that has at least one witness bearing that n-gram. The
catalogue used to generate the results must also be specified with
the -c option.'''
SEARCH_DESCRIPTION = '''\
List texts containing at least one of the supplied n-grams, along
with a total count of how many occurrences of the n-grams are
present in each text, and the number of n-grams that match in each
text.
Specifying a catalogue file will not restrict the search to only
those labelled texts, but rather adds the labels to any
appropriate texts in the results.'''
SEARCH_HELP = 'List texts containing at least one of the supplied n-grams.'
SEARCH_NGRAMS_HELP = '''\
Path to file containing list of n-grams to search for, with one
n-gram per line.'''
STATISTICS_DESCRIPTION = '''
Generate summary statistics for a set of results. This gives the
counts of all tokens and matching tokens in each witness and the
percentage of the witness that is encompassed by the matches.'''
STATISTICS_HELP = 'Generate summary statistics for a set of results.'
STATISTICS_RESULTS_HELP = 'Path to CSV results.'
STRIP_DESCRIPTION = '''\
Preprocess a corpus by stripping unwanted material from each
text.'''
STRIP_EPILOG = '''\
The CBETA texts are in TEI XML that needs to have the markup and
metadata removed. If the TEI specifies textual variants, plain
text versions based on these are also created.'''
STRIP_HELP = 'Generate texts for use with TACL from a corpus of TEI XML.'
STRIP_INPUT_HELP = 'Directory containing files to strip.'
STRIP_OUTPUT_HELP = 'Directory to output stripped files to.'
SUPPLIED_DIFF_DESCRIPTION = '''\
List n-grams unique to each set of results (as defined by the
specified results files).'''
SUPPLIED_DIFF_HELP = 'List n-grams unique to each results file.'
SUPPLIED_EPILOG = '''\
The number of labels supplied must match the number of results
files. The first label is assigned to all results in the first
results file, the second label to all results in the second
results file, etc. The labels specified in the results files are
replaced with the supplied labels in the output.'''
SUPPLIED_DIFF_EPILOG = SUPPLIED_EPILOG.format('sdiff')
SUPPLIED_INTERSECT_EPILOG = SUPPLIED_EPILOG.format('sintersect')
SUPPLIED_INTERSECT_DESCRIPTION = '''\
List n-grams common to all sets of results (as defined by the
specified results files).'''
SUPPLIED_INTERSECT_HELP = 'List n-grams common to all results files.'
SUPPLIED_LABELS_HELP = 'Labels to be assigned in order to the supplied results.'
SUPPLIED_RESULTS_HELP = 'Paths to results files to be used in the query.'
TACL_DESCRIPTION = 'Analyse the text of corpora in various simple ways.'
TACL_HELPER_DESCRIPTION = '''\
Perform helpful but non-essential tacl-related functions.'''
TACL_HELPER_AGAINST_DESCRIPTION = '''\
Generate a script to compare each text of a corpus against all the
texts in another corpus.'''
TACL_HELPER_AGAINST_HELP = '''\
Generate a script to compare each text of a corpus against all the
texts in another corpus.'''
TACL_HELPER_AGAINST_A_HELP = '''\
File containing text names to compare (one per line).'''
TACL_HELPER_AGAINST_B_HELP = '''\
File containing corpus text names to be compared against (one per
line).'''
TACL_HELPER_COLLAPSE_DESCRIPTION = '''
Collapse result rows for multiple witnesses having the same count
for an n-gram. Instead of the "siglum" column, all of the
witnesses (per text) with the same n-gram count are listed, space
separated, in the "sigla" column.'''
TACL_HELPER_COLLAPSE_HELP = 'Collapse result rows for multiple witnesses having the same count for an n-gram'
TACL_HELPER_IN_DESCRIPTION = '''\
Generate a script to compare each text of a corpus with all the
other texts of that corpus.'''
TACL_HELPER_IN_HELP = '''\
Generate a script to compare each text of a corpus with all the
other texts of that corpus.'''
TACL_HELPER_IN_TEXTS_HELP = '''\
File containing text names to examine (one per line).'''
TACL_HELPER_OUTPUT = 'Output directory for script and catalogue files.'
TACL_HELPER_RESULTS_HELP = 'Path to CSV results'
VERBOSE_HELP = '''\
Display debug information; multiple -v options increase the verbosity.'''
# Error messages.
CATALOGUE_TEXT_RELABELLED_ERROR = 'Catalogue file labels "{}" more than once'
INSUFFICIENT_LABELS_QUERY_ERROR = 'Not running query with less than two defined labels'
LABEL_NOT_IN_CATALOGUE_ERROR = 'Supplied label is not present in the supplied catalogue'
SUPPLIED_ARGS_LENGTH_MISMATCH_ERROR = 'The number of labels supplied does not match the number of results files.'
# SQL statements.
ANALYSE_SQL = 'ANALYZE {}'
CREATE_INDEX_INPUT_RESULTS_SQL = 'CREATE INDEX IF NOT EXISTS ' \
'temp.InputResultsLabel ON InputResults (ngram)'
CREATE_INDEX_TEXT_SQL = 'CREATE INDEX IF NOT EXISTS TextIndexLabel ' \
'ON Text (label)'
CREATE_INDEX_TEXTHASNGRAM_SQL = 'CREATE UNIQUE INDEX IF NOT EXISTS ' \
'TextHasNGramIndex ON TextHasNGram (text, size)'
CREATE_INDEX_TEXTNGRAM_SQL = 'CREATE INDEX IF NOT EXISTS ' \
'TextNGramIndexTextNGram ON TextNGram (text, ngram)'
CREATE_TABLE_TEXT_SQL = 'CREATE TABLE IF NOT EXISTS Text (' \
'id INTEGER PRIMARY KEY ASC, ' \
'name TEXT NOT NULL, ' \
'siglum TEXT NOT NULL, ' \
'checksum TEXT NOT NULL, ' \
'token_count INTEGER NOT NULL, ' \
'label TEXT NOT NULL, ' \
'UNIQUE (name, siglum))'
CREATE_TABLE_TEXTNGRAM_SQL = 'CREATE TABLE IF NOT EXISTS TextNGram (' \
'text INTEGER NOT NULL REFERENCES Text (id), ' \
'ngram TEXT NOT NULL, ' \
'size INTEGER NOT NULL, ' \
'count INTEGER NOT NULL)'
CREATE_TABLE_TEXTHASNGRAM_SQL = 'CREATE TABLE IF NOT EXISTS TextHasNGram (' \
'text INTEGER NOT NULL REFERENCES Text (id), ' \
'size INTEGER NOT NULL, ' \
'count INTEGER NOT NULL)'
CREATE_TEMPORARY_NGRAMS_TABLE_SQL = 'CREATE TEMPORARY TABLE InputNGram (' \
'ngram TEXT)'
CREATE_TEMPORARY_RESULTS_TABLE_SQL = 'CREATE TEMPORARY TABLE InputResults (' \
'ngram TEXT NOT NULL, ' \
'size INTEGER NOT NULL, ' \
'name TEXT NOT NULL, ' \
'siglum TEXT NOT NULL, ' \
'count INTEGER NOT NULL, ' \
'label TEXT NOT NULL)'
DELETE_TEXT_HAS_NGRAMS_SQL = 'DELETE FROM TextHasNGram WHERE text = ?'
DELETE_TEXT_NGRAMS_SQL = 'DELETE FROM TextNGram WHERE text = ?'
DROP_TEMPORARY_NGRAMS_TABLE_SQL = 'DROP TABLE IF EXISTS InputNGram'
DROP_TEMPORARY_RESULTS_TABLE_SQL = 'DROP TABLE IF EXISTS InputResults'
DROP_TEXTNGRAM_INDEX_SQL = 'DROP INDEX IF EXISTS TextNGramIndexTextNGram'
INSERT_NGRAM_SQL = 'INSERT INTO TextNGram (text, ngram, size, count) ' \
'VALUES (?, ?, ?, ?)'
INSERT_TEXT_HAS_NGRAM_SQL = 'INSERT INTO TextHasNGram (text, size, count) ' \
'VALUES (?, ?, ?)'
INSERT_TEXT_SQL = 'INSERT INTO Text ' \
'(name, siglum, checksum, token_count, label) ' \
'VALUES (?, ?, ?, ?, ?)'
INSERT_TEMPORARY_NGRAM_SQL = 'INSERT INTO temp.InputNGram (ngram) VALUES (?)'
INSERT_TEMPORARY_RESULTS_SQL = 'INSERT INTO temp.InputResults ' \
'(ngram, size, name, siglum, count, label) ' \
'VALUES (?, ?, ?, ?, ?, ?)'
PRAGMA_CACHE_SIZE_SQL = 'PRAGMA cache_size={}'
PRAGMA_COUNT_CHANGES_SQL = 'PRAGMA count_changes=OFF'
PRAGMA_FOREIGN_KEYS_SQL = 'PRAGMA foreign_keys=ON'
PRAGMA_LOCKING_MODE_SQL = 'PRAGMA locking_mode=EXCLUSIVE'
PRAGMA_SYNCHRONOUS_SQL = 'PRAGMA synchronous=OFF'
PRAGMA_TEMP_STORE_SQL = 'PRAGMA temp_store=MEMORY'
SELECT_COUNTS_SQL = 'SELECT Text.name AS "text name", Text.siglum, ' \
'TextHasNGram.size, TextHasNGram.count AS "unique ngrams", ' \
'Text.token_count + 1 - TextHasNGram.size AS "total ngrams", ' \
'Text.token_count AS "total tokens", Text.label ' \
'FROM Text, TextHasNGram ' \
'WHERE Text.id = TextHasNGram.text AND Text.label IN ({}) ' \
'ORDER BY Text.name, TextHasNGram.size'
SELECT_DIFF_ASYMMETRIC_SQL = 'SELECT TextNGram.ngram, TextNGram.size, ' \
'TextNGram.count, Text.name AS "text name", Text.siglum, Text.label ' \
'FROM Text, TextNGram ' \
'WHERE Text.label = ? AND Text.id = TextNGram.text ' \
'AND TextNGram.ngram IN (' \
'SELECT TextNGram.ngram FROM Text, TextNGram ' \
'WHERE Text.id = TextNGram.text AND Text.label = ? ' \
'EXCEPT ' \
'SELECT TextNGram.ngram FROM Text, TextNGram ' \
'WHERE Text.id = TextNGram.text AND Text.label IN ({}))'
SELECT_DIFF_SQL = 'SELECT TextNGram.ngram, TextNGram.size, TextNGram.count, ' \
'Text.name AS "text name", Text.siglum, Text.label ' \
'FROM Text, TextNGram ' \
'WHERE Text.label IN ({}) AND Text.id = TextNGram.text ' \
'AND TextNGram.ngram IN (' \
'SELECT TextNGram.ngram FROM Text, TextNGram ' \
'WHERE Text.id = TextNGram.text AND Text.label IN ({}) ' \
'GROUP BY TextNGram.ngram HAVING COUNT(DISTINCT Text.label) = 1)'
SELECT_DIFF_SUPPLIED_SQL = '''SELECT ngram, size, count, name AS "text name",
siglum, label
FROM temp.InputResults
WHERE ngram IN (
SELECT ngram FROM temp.InputResults
GROUP BY ngram HAVING COUNT(DISTINCT label) = 1)'''
SELECT_HAS_NGRAMS_SQL = 'SELECT text FROM TextHasNGram ' \
'WHERE text = ? AND size = ?'
SELECT_INTERSECT_SQL = 'SELECT TextNGram.ngram, TextNGram.size, ' \
'TextNGram.count, Text.name AS "text name", Text.siglum, Text.label ' \
'FROM Text, TextNGram ' \
'WHERE Text.label IN ({}) AND Text.id = TextNGram.text ' \
'AND TextNGram.ngram IN ({})'
SELECT_INTERSECT_SUB_EXTRA_SQL = ' AND TextNGram.ngram IN ({})'
SELECT_INTERSECT_SUB_SQL = 'SELECT TextNGram.ngram ' \
'FROM Text, TextNGram ' \
'WHERE Text.label = ? AND Text.id = TextNGram.text'
SELECT_INTERSECT_SUPPLIED_SQL = '''SELECT ngram, size, count,
name AS "text name", siglum, label
FROM temp.InputResults
WHERE ngram IN (
SELECT ngram FROM temp.InputResults
GROUP BY ngram HAVING COUNT(DISTINCT label) = ?)'''
SELECT_SEARCH_SQL = 'SELECT Text.name AS "text name", Text.siglum, ' \
'SUM(TextNGram.count) AS count, ' \
"Text.label, group_concat(TextNGram.ngram, ', ') AS ngrams, " \
'count(TextNGram.ngram) AS number ' \
'FROM Text, TextNGram ' \
'WHERE Text.id = TextNGram.text ' \
'AND TextNGram.ngram IN (SELECT ngram FROM temp.InputNGram) ' \
'GROUP BY TextNGram.text'
SELECT_TEXT_TOKEN_COUNT_SQL = 'SELECT Text.token_count ' \
'FROM Text WHERE Text.name = ?'
SELECT_TEXT_SQL = 'SELECT id, checksum FROM Text WHERE name = ? AND siglum = ?'
UPDATE_LABEL_SQL = 'UPDATE Text SET label = ? WHERE name = ?'
UPDATE_LABELS_SQL = 'UPDATE Text SET label = ?'
UPDATE_TEXT_SQL = 'UPDATE Text SET checksum = ?, token_count = ? WHERE id = ?'
VACUUM_SQL = 'VACUUM'
PK &FH[ [ tacl/data_store.py"""Module containing the DataStore class."""
import csv
import logging
import os.path
import sqlite3
import sys
from . import constants
from .exceptions import MalformedQueryError
class DataStore:
"""Class representing the data store for text data.
It provides an interface to the underlying database, with methods
to add and query data.
"""
def __init__ (self, db_name, use_memory=True, ram=0):
self._logger = logging.getLogger(__name__)
if db_name == ':memory:':
self._db_name = db_name
else:
self._db_name = os.path.abspath(db_name)
self._conn = sqlite3.connect(self._db_name)
self._conn.row_factory = sqlite3.Row
if use_memory:
self._conn.execute(constants.PRAGMA_TEMP_STORE_SQL)
if ram:
cache_size = ram * -1000000
self._conn.execute(constants.PRAGMA_CACHE_SIZE_SQL.format(
cache_size))
self._conn.execute(constants.PRAGMA_COUNT_CHANGES_SQL)
self._conn.execute(constants.PRAGMA_FOREIGN_KEYS_SQL)
self._conn.execute(constants.PRAGMA_LOCKING_MODE_SQL)
self._conn.execute(constants.PRAGMA_SYNCHRONOUS_SQL)
def _add_indices (self):
"""Adds the database indices relating to n-grams."""
self._logger.info('Adding database indices')
self._conn.execute(constants.CREATE_INDEX_TEXTNGRAM_SQL)
self._logger.info('Indices added')
def add_ngrams (self, corpus, minimum, maximum):
"""Adds n-gram data from `corpus` to the data store.
:param corpus: corpus of texts
:type corpus: `Corpus`
:param minimum: minimum n-gram size
:type minimum: `int`
:param maximum: maximum n-gram size
:type maximum: `int`
"""
self._initialise_database()
for text in corpus.get_texts():
self._add_text_ngrams(text, minimum, maximum)
self._add_indices()
self._analyse()
def _add_temporary_ngrams (self, ngrams):
"""Adds `ngrams` to a temporary table."""
self._conn.execute(constants.DROP_TEMPORARY_NGRAMS_TABLE_SQL)
self._conn.execute(constants.CREATE_TEMPORARY_NGRAMS_TABLE_SQL)
self._conn.executemany(constants.INSERT_TEMPORARY_NGRAM_SQL,
[(ngram,) for ngram in ngrams])
def _add_temporary_results_sets (self, results_filenames, labels):
if len(labels) < 2:
raise MalformedQueryError(
constants.INSUFFICIENT_LABELS_QUERY_ERROR)
if len(results_filenames) != len(labels):
raise MalformedQueryError(
constants.SUPPLIED_ARGS_LENGTH_MISMATCH_ERROR)
self._create_temporary_results_table()
for results_filename, label in zip(results_filenames, labels):
with open(results_filename, encoding='utf-8', newline='') as fh:
self._add_temporary_results(fh, label)
self._add_temporary_results_index()
self._analyse('temp.InputResults')
def _add_temporary_results (self, results, label):
"""Adds `results` to a temporary table with `label`.
:param results: results file
:type results: `File`
:param label: label to be associated with results
:type label: `str`
"""
NGRAM, SIZE, NAME, SIGLUM, COUNT, LABEL = constants.QUERY_FIELDNAMES
reader = csv.DictReader(results)
data = [(row[NGRAM], row[SIZE], row[NAME], row[SIGLUM], row[COUNT],
label) for row in reader]
self._conn.executemany(constants.INSERT_TEMPORARY_RESULTS_SQL, data)
def _add_temporary_results_index (self):
self._logger.info('Adding index to temporary results table')
self._conn.execute(constants.CREATE_INDEX_INPUT_RESULTS_SQL)
self._logger.info('Index added')
def _add_text_ngrams (self, text, minimum, maximum):
"""Adds n-gram data from `text` to the data store.
:param text: text to get n-grams from
:type text: `Text`
:param minimum: minimum n-gram size
:type minimum: `int`
:param maximum: maximum n-gram size
:type maximum: `int`
"""
text_id = self._get_text_id(text)
self._logger.info('Adding n-grams ({} <= n <= {}) for {}'.format(
minimum, maximum, text.get_filename()))
skip_sizes = []
for size in range(minimum, maximum + 1):
if self._has_ngrams(text_id, size):
self._logger.info('{}-grams are already in the database'.format(
size))
skip_sizes.append(size)
for size, ngrams in text.get_ngrams(minimum, maximum, skip_sizes):
self._add_text_size_ngrams(text_id, size, ngrams)
def _add_text_record (self, text):
"""Adds a Text record for `text`.
:param text: text to add a record for
:type text: `Text`
"""
filename = text.get_filename()
name, siglum = text.get_names()
self._logger.info('Adding record for text {}'.format(filename))
checksum = text.get_checksum()
token_count = len(text.get_tokens())
cursor = self._conn.execute(constants.INSERT_TEXT_SQL,
[name, siglum, checksum, token_count, ''])
self._conn.commit()
return cursor.lastrowid
def _add_text_size_ngrams (self, text_id, size, ngrams):
"""Adds `ngrams`, that are of size `size`, to the data store.
The added `ngrams` are associated with `text_id`.
:param text_id: database ID of text associated with `ngrams`
:type text_id: `int`
:param size: size of n-grams
:type size: `int`
:param ngrams: n-grams to be added
:type ngrams: `collections.Counter`
"""
unique_ngrams = len(ngrams)
self._logger.info('Adding {} unique {}-grams'.format(
unique_ngrams, size))
parameters = [[text_id, ngram, size, count]
for ngram, count in ngrams.items()]
self._conn.execute(constants.INSERT_TEXT_HAS_NGRAM_SQL,
[text_id, size, unique_ngrams])
self._conn.executemany(constants.INSERT_NGRAM_SQL, parameters)
self._conn.commit()
def _analyse (self, table=''):
"""Analyses the database, or `table` if it is supplied.
:param table: optional name of table analyse
:type table: `str`
"""
self._logger.info('Starting analysis of database')
self._conn.execute(constants.ANALYSE_SQL.format(table))
self._logger.info('Analysis of database complete')
def counts (self, catalogue, output_fh):
"""Returns `output_fh` populated with CSV results giving
n-gram counts of the texts in `catalogue`.
:param catalogue: catalogue matching filenames to labels
:type catalogue: `Catalogue`
:param output_fh: object to output results to
:type output_fh: file-like object
:rtype: file-like object
"""
labels = list(self._set_labels(catalogue))
label_placeholders = self._get_placeholders(labels)
query = constants.SELECT_COUNTS_SQL.format(label_placeholders)
self._logger.info('Running counts query')
self._logger.debug('Query: {}\nLabels: {}'.format(query, labels))
cursor = self._conn.execute(query, labels)
return self._csv(cursor, constants.COUNTS_FIELDNAMES, output_fh)
def _create_temporary_results_table (self):
self._conn.execute(constants.DROP_TEMPORARY_RESULTS_TABLE_SQL)
self._conn.execute(constants.CREATE_TEMPORARY_RESULTS_TABLE_SQL)
def _csv (self, cursor, fieldnames, output_fh):
"""Writes the rows of `cursor` in CSV format to `output_fh`
and returns it.
:param cursor: database cursor containing data to be be output
:type cursor: `sqlite3.Cursor`
:param fieldnames: row headings
:type fieldnames: `list`
:param output_fh: file to write data to
:type output_fh: file object
:rtype: file object
"""
self._logger.info('Finished query; outputting results in CSV format')
# Specify a lineterminator to avoid an extra \r being added on
# Windows; see
# https://stackoverflow.com/questions/3191528/csv-in-python-adding-extra-carriage-return
if sys.platform in ('win32', 'cygwin') and output_fh is sys.stdout:
writer = csv.writer(output_fh, lineterminator='\n')
else:
writer = csv.writer(output_fh)
writer.writerow(fieldnames)
for row in cursor:
writer.writerow([row[fieldname] for fieldname in fieldnames])
self._logger.info('Finished outputting results')
return output_fh
def _delete_text_ngrams (self, text_id):
"""Deletes all n-grams associated with `text_id` from the data
store.
:param text_id: database ID of text
:type text_id: `int`
"""
self._conn.execute(constants.DELETE_TEXT_NGRAMS_SQL, [text_id])
self._conn.execute(constants.DELETE_TEXT_HAS_NGRAMS_SQL, [text_id])
self._conn.commit()
def diff (self, catalogue, output_fh):
"""Returns `output_fh` populated with CSV results giving the n-grams
that are unique to each labelled set of texts in `catalogue`.
Note that this is not the same as the symmetric difference of
these sets, except in the case where there are only two
labels.
:param catalogue: catalogue matching filenames to labels
:type catalogue: `Catalogue`
:param output_fh: object to output results to
:type output_fh: file-like object
:rtype: file-like object
"""
labels = self._sort_labels(self._set_labels(catalogue))
if len(labels) < 2:
raise MalformedQueryError(constants.INSUFFICIENT_LABELS_QUERY_ERROR)
label_placeholders = self._get_placeholders(labels)
query = constants.SELECT_DIFF_SQL.format(label_placeholders,
label_placeholders)
parameters = labels + labels
self._logger.info('Running diff query')
self._logger.debug('Query: {}\nLabels: {}'.format(query, labels))
self._log_query_plan(query, parameters)
cursor = self._conn.execute(query, parameters)
return self._csv(cursor, constants.QUERY_FIELDNAMES, output_fh)
def diff_asymmetric (self, catalogue, prime_label, output_fh):
"""Returns `output_fh` populated with CSV results giving the
difference in n-grams between the labelled sets of texts in
`catalogue`, limited to those texts labelled with
`prime_label`.
:param catalogue: catalogue matching filenames to labels
:type catalogue: `Catalogue`
:param prime_label: label to limit results to
:type prime_label: `str`
:param output_fh: object to output results to
:type output_fh: file-like object
:rtype: file-like object
"""
labels = list(self._set_labels(catalogue))
if len(labels) < 2:
raise MalformedQueryError(constants.INSUFFICIENT_LABELS_QUERY_ERROR)
try:
labels.remove(prime_label)
except ValueError:
raise MalformedQueryError(constants.LABEL_NOT_IN_CATALOGUE_ERROR)
label_placeholders = self._get_placeholders(labels)
query = constants.SELECT_DIFF_ASYMMETRIC_SQL.format(label_placeholders)
parameters = [prime_label, prime_label] + labels
self._logger.info('Running asymmetric diff query')
self._logger.debug('Query: {}\nLabels: {}\nPrime label: {}'.format(
query, labels, prime_label))
self._log_query_plan(query, parameters)
cursor = self._conn.execute(query, parameters)
return self._csv(cursor, constants.QUERY_FIELDNAMES, output_fh)
def diff_supplied (self, results_filenames, labels, output_fh):
"""Returns `output_fh` populated with CSV results giving the n-grams
that are unique to each set of texts in `results_sets`, using
the labels in `labels`.
Note that this is not the same as the symmetric difference of
these sets, except in the case where there are only two
labels.
:param results_filenames: list of results filenames to be diffed
:type results_filenames: `list` of `str`
:param labels: labels to be applied to the results_sets
:type labels: `list`
:param output_fh: object to output results to
:type output_fh: file-like object
:rtype: file-like object
"""
self._add_temporary_results_sets(results_filenames, labels)
query = constants.SELECT_DIFF_SUPPLIED_SQL
self._logger.info('Running supplied diff query')
self._logger.debug('Query: {}'.format(query))
self._log_query_plan(query, [])
cursor = self._conn.execute(query)
return self._csv(cursor, constants.QUERY_FIELDNAMES, output_fh)
def _drop_indices (self):
"""Drops the database indices relating to n-grams."""
self._logger.info('Dropping database indices')
self._conn.execute(constants.DROP_TEXTNGRAM_INDEX_SQL)
self._logger.info('Finished dropping database indices')
@staticmethod
def _get_intersection_subquery (labels):
# Create nested subselects.
subquery = constants.SELECT_INTERSECT_SUB_SQL
# The subqueries are nested in reverse order of 'size', so
# that the inmost select is operating on the smallest corpus,
# thereby minimising the result sets of outer queries the most.
for label in labels[1:]:
subquery = constants.SELECT_INTERSECT_SUB_SQL + \
constants.SELECT_INTERSECT_SUB_EXTRA_SQL.format(
subquery)
return subquery
@staticmethod
def _get_placeholders (items):
"""Returns a string of placeholders, one for each item in
`items`.
:param items: items to create placeholders for
:type items: `list`
:rtype: `str`
"""
return ('?,' * len(items)).strip(',')
def _get_text_id (self, text):
"""Returns the database ID of the Text record for `text`.
This may require creating such a record.
If `text`\'s checksum does not match an existing record's
checksum, the record's checksum is updated and all associated
TextNGram and TextHasNGram records are deleted.
:param text: text to add a record for
:type text: `.Text`
:rtype: `int`
"""
name, siglum = text.get_names()
text_record = self._conn.execute(constants.SELECT_TEXT_SQL,
[name, siglum]).fetchone()
if text_record is None:
text_id = self._add_text_record(text)
else:
text_id = text_record['id']
if text_record['checksum'] != text.get_checksum():
filename = text.get_filename()
self._logger.info('Text {} has changed since it was added to '
'the database'.format(filename))
self._update_text_record(text, text_id)
self._logger.info('Deleting potentially out-of-date n-grams')
self._delete_text_ngrams(text_id)
return text_id
def _has_ngrams (self, text_id, size):
"""Returns True if a text has existing records for n-grams of
size `size`.
:param text_id: database ID of text to check
:type text_id: `int`
:param size: size of n-grams
:type size: `int`
:rtype: `bool`
"""
if self._conn.execute(constants.SELECT_HAS_NGRAMS_SQL,
[text_id, size]).fetchone() is None:
return False
return True
def _initialise_database (self):
"""Creates the database schema.
This will not create tables or indices that already exist and
is safe to be called on an existing database.
"""
self._logger.info('Creating database schema, if necessary')
self._conn.execute(constants.CREATE_TABLE_TEXT_SQL)
self._conn.execute(constants.CREATE_TABLE_TEXTNGRAM_SQL)
self._conn.execute(constants.CREATE_TABLE_TEXTHASNGRAM_SQL)
self._conn.execute(constants.CREATE_INDEX_TEXTHASNGRAM_SQL)
self._conn.execute(constants.CREATE_INDEX_TEXT_SQL)
def intersection (self, catalogue, output_fh):
"""Returns `output_fh` populated with CSV results giving the
intersection in n-grams of the labelled sets of texts in
`catalogue`.
:param catalogue: catalogue matching filenames to labels
:type catalogue: `Catalogue`
:param output_fh: object to output results to
:type output_fh: file-like object
:rtype: file-like object
"""
labels = self._sort_labels(self._set_labels(catalogue))
if len(labels) < 2:
raise MalformedQueryError(constants.INSUFFICIENT_LABELS_QUERY_ERROR)
label_placeholders = self._get_placeholders(labels)
subquery = self._get_intersection_subquery(labels)
query = constants.SELECT_INTERSECT_SQL.format(label_placeholders,
subquery)
parameters = labels + labels
self._logger.info('Running intersection query')
self._logger.debug('Query: {}\nLabels: {}'.format(query, labels))
self._log_query_plan(query, parameters)
cursor = self._conn.execute(query, parameters)
return self._csv(cursor, constants.QUERY_FIELDNAMES, output_fh)
def intersection_supplied (self, results_filenames, labels, output_fh):
"""Returns `output_fh` populated with CSV results giving the n-grams
that are common to every set of texts in `results_sets`, using
the labels in `labels`.
:param results_filenames: list of results to be diffed
:type results_filenames: `list` of `str`
:param labels: labels to be applied to the results_sets
:type labels: `list`
:param output_fh: object to output results to
:type output_fh: file-like object
:rtype: file-like object
"""
self._add_temporary_results_sets(results_filenames, labels)
query = constants.SELECT_INTERSECT_SUPPLIED_SQL
parameters = [len(labels)]
self._logger.info('Running supplied intersect query')
self._logger.debug('Query: {}\nNumber of labels: {}'.format(
query, parameters[0]))
self._log_query_plan(query, parameters)
cursor = self._conn.execute(query, parameters)
return self._csv(cursor, constants.QUERY_FIELDNAMES, output_fh)
def _log_query_plan (self, query, parameters):
cursor = self._conn.execute('EXPLAIN QUERY PLAN ' + query, parameters)
query_plan = 'Query plan:\n'
for row in cursor.fetchall():
query_plan += '|'.join([str(value) for value in row]) + '\n'
self._logger.debug(query_plan)
def search (self, catalogue, ngrams, output_fh):
self._set_labels(catalogue)
self._add_temporary_ngrams(ngrams)
query = constants.SELECT_SEARCH_SQL
self._logger.info('Running search query')
self._logger.debug('Query: {}\nN-grams: {}'.format(
query, ', '.join(ngrams)))
self._log_query_plan(query, [])
cursor = self._conn.execute(query)
return self._csv(cursor, constants.SEARCH_FIELDNAMES, output_fh)
def _set_labels (self, catalogue):
"""Returns a dictionary of the unique labels in `catalogue` and the
number of their associated texts, and sets the record of each
Text to the corresponding label.
Texts that do not have a label specified are set to the empty
string.
Token counts are included in the results to allow for
semi-accurate sorting based on corpora size.
:param catalogue: catalogue matching filenames to labels
:type catalogue: `Catalogue`
:rtype: `dict`
"""
self._conn.execute(constants.UPDATE_LABELS_SQL, [''])
labels = {}
for name, label in catalogue.items():
self._conn.execute(constants.UPDATE_LABEL_SQL, [label, name])
cursor = self._conn.execute(constants.SELECT_TEXT_TOKEN_COUNT_SQL,
[name])
token_count = cursor.fetchone()['token_count']
labels[label] = labels.get(label, 0) + token_count
self._conn.commit()
return labels
@staticmethod
def _sort_labels (label_data):
"""Returns the labels in `label_data` sorted in descending order
according to the 'size' (total token count) of their referent
corpora.
:param label_data: labels (with their token counts) to sort
:type: `dict`
:rtype: `list`
"""
labels = list(label_data)
labels.sort(key=label_data.get, reverse=True)
return labels
def _update_text_record (self, text, text_id):
"""Updates the record with `text_id` with `text`\'s checksum and
token count.
:param text: text to update from
:type text: `Text`
:param text_id: database ID of Text record
:type text_id: `int`
"""
checksum = text.get_checksum()
token_count = len(text.get_tokens())
self._conn.execute(constants.UPDATE_TEXT_SQL,
[checksum, token_count, text_id])
self._conn.commit()
def validate (self, corpus, catalogue):
"""Returns True if all of the files labelled in `catalogue`
are up-to-date in the database.
:param corpus: corpus of texts
:type corpus: `Corpus`
:param catalogue: catalogue matching filenames to labels
:type catalogue: `Catalogue`
:rtype: `bool`
"""
is_valid = True
for name in catalogue:
count = 0
# It is unfortunate that this creates Text objects for
# each text, since that involves reading the file.
for text in corpus.get_texts(name):
count += 1
name, siglum = text.get_names()
filename = text.get_filename()
row = self._conn.execute(constants.SELECT_TEXT_SQL,
[name, siglum]).fetchone()
if row is None:
is_valid = False
self._logger.warning(
'No record (or n-grams) exists for {} in '
'the database'.format(filename))
elif row['checksum'] != text.get_checksum():
is_valid = False
self._logger.warning(
'{} has changed since its n-grams were '
'added to the database'.format(filename))
if count == 0:
self._logger.error('Catalogue references text {} that does not '
'exist in the corpus'.format(name))
raise FileNotFoundError
return is_valid
PK YoGFf f tacl/sequence.py"""Module containing the Sequence and Sequencer classes."""
import logging
import os
import re
from Bio import pairwise2
from jinja2 import Environment, PackageLoader
import pandas as pd
from . import constants
class Sequence:
def __init__ (self, alignment, substitutes):
self._alignment = alignment
self._substitutes = substitutes
def _format_alignment (self, a1, a2):
html = []
for index, char in enumerate(a1):
output = self._substitutes.get(char, char)
if a2[index] == char:
html.append('{}'.format(output))
elif char != '-':
html.append(output)
return ''.join(html)
def render (self):
"""Returns a tuple of HTML fragments rendering each element of the
sequence."""
f1 = self._format_alignment(self._alignment[0], self._alignment[1])
f2 = self._format_alignment(self._alignment[1], self._alignment[0])
return f1, f2
class Sequencer:
def __init__ (self, corpus, tokenizer, results, output_dir):
self._logger = logging.getLogger(__name__)
self._corpus = corpus
self._tokenizer = tokenizer
self._matches = pd.read_csv(results, encoding='utf-8', na_filter=False)
self._output_dir = output_dir
def _generate_sequence (self, t1, t1_span, t2, t2_span, context_length,
covered_spans):
old_length = 0
self._logger.debug('Match found; generating new sequence')
while True:
s1, span1 = self._get_text_sequence(t1, t1_span, context_length)
s2, span2 = self._get_text_sequence(t2, t2_span, context_length)
length = len(s1)
alignment = pairwise2.align.globalms(
s1, s2, constants.IDENTICAL_CHARACTER_SCORE,
constants.DIFFERENT_CHARACTER_SCORE,
constants.OPEN_GAP_PENALTY, constants.EXTEND_GAP_PENALTY)[0]
context_length = length
score = alignment[2] / length
if not alignment:
return None
elif score < constants.SCORE_THRESHOLD or length == old_length:
break
else:
self._logger.debug('Score: {}'.format(score))
old_length = length
covered_spans[0].append(span1)
covered_spans[1].append(span2)
return Sequence(alignment, self._r_substitutes)
def generate_sequences (self, minimum_size):
loader = PackageLoader('tacl', 'assets/templates')
env = Environment(loader=loader)
template = env.get_template('sequence.html')
# Get a list of the files in the matches, grouped by label
# (ordered by number of texts).
labels = list(self._matches.groupby([constants.LABEL_FIELDNAME])[constants.NAME_FIELDNAME].nunique().index)
ngrams = self._matches[self._matches[constants.SIZE_FIELDNAME] >= minimum_size].sort(constants.SIZE_FIELDNAME, ascending=False)[constants.NGRAM_FIELDNAME].unique()
for index, primary_label in enumerate(labels):
for secondary_label in labels[index+1:]:
self._generate_sequences(primary_label, secondary_label, ngrams,
template)
def _generate_sequences (self, primary_label, secondary_label, ngrams,
template):
self._substitutes = {}
self._char_code = 61440
cols = [constants.NAME_FIELDNAME, constants.SIGLUM_FIELDNAME]
primary_texts = self._matches[self._matches[constants.LABEL_FIELDNAME] == primary_label][cols].drop_duplicates()
secondary_texts = self._matches[self._matches[constants.LABEL_FIELDNAME] == secondary_label][cols].drop_duplicates()
for index, (name1, siglum1) in primary_texts.iterrows():
text1 = self._get_text(name1, siglum1)
label1 = '{}_{}'.format(name1, siglum1)
for index, (name2, siglum2) in secondary_texts.iterrows():
text2 = self._get_text(name2, siglum2)
label2 = '{}_{}'.format(name2, siglum2)
self._generate_sequences_for_texts(label1, text1, label2, text2,
ngrams, template)
def _generate_sequences_for_texts (self, l1, t1, l2, t2, ngrams, template):
self._r_substitutes = dict((v, k) for k, v in self._substitutes.items())
sequences = []
covered_spans = [[], []]
for ngram in ngrams:
# Keep track of the spans within each text that have been
# covered by a sequence, to ensure that they aren't
# reported more than once.
sequences.extend(self._generate_sequences_for_ngram(
t1, t2, ngram, covered_spans))
if sequences:
html = template.render(l1=l1, l2=l2, sequences=sequences)
os.makedirs(self._output_dir, exist_ok=True)
output_name = os.path.join(self._output_dir,
'{}-{}.html'.format(l1, l2))
with open(output_name, 'w', encoding='utf-8') as fh:
fh.write(html)
def _generate_sequences_for_ngram (self, t1, t2, ngram, covered_spans):
self._logger.debug('Generating sequences for n-gram "{}"'.format(ngram))
pattern = re.compile(re.escape(ngram))
context_length = len(ngram)
t1_spans = [match.span() for match in pattern.finditer(t1)]
t2_spans = [match.span() for match in pattern.finditer(t2)]
sequences = []
for t1_span in t1_spans:
for t2_span in t2_spans:
if self._is_inside(t1_span, t2_span, covered_spans):
self._logger.debug('Skipping match due to existing coverage')
continue
sequence = self._generate_sequence(
t1, t1_span, t2, t2_span, context_length, covered_spans)
if sequence:
sequences.append(sequence.render())
return sequences
def _get_text (self, name, siglum):
"""Returns the text identified by `name` and `siglum`, with all []
tokens replaced with a single character. Substitutions are
recorded in self._substitutes.
"""
tokens = self._corpus.get_text(name, siglum).get_tokens()
for i, token in enumerate(tokens):
if len(token) > 1:
char = chr(self._char_code)
substitute = self._substitutes.setdefault(token, char)
if substitute == char:
self._char_code += 1
tokens[i] = substitute
return self._tokenizer.joiner.join(tokens)
def _get_text_sequence (self, text, span, context_length):
start = max(0, span[0] - context_length)
end = min(len(text), span[1] + context_length)
return text[start:end], (start, end)
def _is_inside (self, span1, span2, covered_spans):
"""Returns True if both `span1` and `span2` fall within
`covered_spans`."""
if self._is_span_inside(span1, covered_spans[0]) and \
self._is_span_inside(span2, covered_spans[1]):
return True
return False
def _is_span_inside (self, span, covered_spans):
start = span[0]
end = span[1]
for c_start, c_end in covered_spans:
if start >= c_start and end <= c_end:
return True
return False
PK .F
Nl tacl/corpus.py"""Module containing the Corpus class."""
import glob
import logging
import os.path
from .text import Text
class Corpus:
"""A Corpus represents a collection of `Text`\s.
A Corpus is built from a directory that contains the text files
that become `Text` objects.
"""
def __init__ (self, path, tokenizer):
self._logger = logging.getLogger(__name__)
self._path = os.path.abspath(path)
self._tokenizer = tokenizer
def get_sigla (self, name):
"""Returns a list of all of the sigla for the named text.
:param name: name of text
:type name: `str`
:rtype: `list` of `str`
"""
return [os.path.splitext(os.path.basename(path))[0]
for path in glob.glob(os.path.join(self._path, name, '*.txt'))]
def get_text (self, name, siglum):
"""Returns a `Text` representing the file associated with `name` and
`siglum`.
Combined, `name` and `siglum` form the basis of a filename for
retrieving the text.
:param name: name of text
:type name: `str`
:param siglum: siglum (variant name) of text
:type siglum: `str`
:rtype: `Text`
"""
filename = os.path.join(name, siglum + '.txt')
self._logger.debug('Creating Text object from {}'.format(filename))
with open(os.path.join(self._path, filename), encoding='utf-8') as text:
content = text.read()
return Text(name, siglum, content, self._tokenizer)
def get_texts (self, name='*'):
"""Returns a generator supplying `Text` objects for each file
in the corpus.
:rtype: `generator`
"""
for filepath in glob.glob(os.path.join(self._path, name, '*.txt')):
if os.path.isfile(filepath):
name = os.path.split(os.path.split(filepath)[0])[1]
siglum = os.path.splitext(os.path.basename(filepath))[0]
yield self.get_text(name, siglum)
PK &F) tacl/exceptions.pyclass TACLError (Exception):
def __init__ (self, msg):
self._msg = msg
def __str__ (self):
return self._msg
class MalformedCatalogueError (TACLError):
pass
class MalformedQueryError (TACLError):
pass
PK oG dw w tacl/highlighter.py"""Module containing the Highlighter class."""
import logging
import re
from jinja2 import Environment, PackageLoader
from lxml import etree
import pandas as pd
from . import constants
from .text import Text
class Highlighter:
def __init__ (self, corpus, tokenizer):
self._logger = logging.getLogger(__name__)
self._corpus = corpus
self._tokenizer = tokenizer
def _annotate_tokens (self, match_obj):
match = match_obj.group(0)
root = etree.fromstring('
{}
'.format(match))
for span in root.xpath('//span'):
# The results are not guaranteed to have non-base matches
# in it, so do not rely on being able to derive base
# matches from them.
if self._match_source == self._base_filename:
if span.get('data-base-match') is None:
span.set('data-base-match', '')
else:
texts = span.get('data-texts')
if ' {} '.format(self._match_source) not in texts:
new_value = '{}{} '.format(texts, self._match_source)
span.set('data-texts', new_value)
return etree.tostring(root, encoding='unicode')[5:-6]
def _format_text (self, text):
"""Returns `text` with consecutive spaces converted to non-break
spaces, and linebreak converted into HTML br elements.
:param text: text to format
:type text: `str`
:rtype: `str`
"""
text = re.sub(r'\n', ' \n', text)
text = re.sub(r' ', ' ', text)
text = re.sub(r' ', ' ', text)
return text
def generate_base (self, matches, text_name, siglum, all=True):
"""Returns an XML document containing the text of `filename`
marked up with its n-grams in `matches`.
If `all` is True, generate results for all matches, not just
those on `filename`.
:param matches: matches data
:type matches: `pandas.DataFrame`
:param text_name: name of text to generate an XML document from
:type text_name: `str`
:param siglum: siglum of text variant to generate an XML document from
:type siglum: `str`
:rtype: `lxml.etree._Element`
"""
text = self._corpus.get_text(text_name, siglum)
filename = text.get_filename()
self._logger.debug('Generating the base XML file for {}'.format(
filename))
self._base_filename = filename
content = text.get_content().strip()
content = self._prepare_text(content)
if not all:
matches = matches[matches[constants.NAME_FIELDNAME] == filename]
content = self._highlight(content, matches)
content = self._format_text(content)
root = etree.fromstring('
{}
'.format(content))
return root
def _generate_html (self, matches, text_name, siglum, text):
loader = PackageLoader('tacl', 'assets/templates')
env = Environment(loader=loader)
text_list = self._generate_text_list(matches, text_name, siglum)
text_data = {'base_name': text_name, 'base_siglum': siglum,
'text': text, 'text_list': text_list}
template = env.get_template('highlight.html')
return template.render(text_data)
@staticmethod
def _generate_text_list (matches, base_name, base_siglum):
texts = matches[[constants.NAME_FIELDNAME,
constants.SIGLUM_FIELDNAME]].drop_duplicates()
text_list = []
for index, (name, siglum) in texts.iterrows():
if not(name == base_name and siglum == base_siglum):
text_list.append(Text.assemble_filename(name, siglum))
text_list.sort()
return text_list
def _get_regexp_pattern (self, ngram):
inter_token_pattern = r'\W*]*>'
pattern = inter_token_pattern.join(
[re.escape(token) for token in self._tokenizer.tokenize(ngram)])
return r'(]*>{})'.format(pattern)
def highlight (self, matches_filename, text_name, siglum):
"""Returns the text of `filename` as an HTML document with its matches
in `matches` highlighted.
:param results: file containing matches to highlight
:type results: `TextIOWrapper`
:param corpus: corpus of documents containing `text_filename`
:type corpus: `tacl.Corpus`
:param text_name: name of text to highlight
:type text_name: `str`
:param siglum: siglum of text to highlight
:type siglum: `str`
:rtype: `str`
"""
matches = pd.read_csv(matches_filename)
base = self.generate_base(matches, text_name, siglum, all=True)
text = etree.tostring(base, encoding='unicode', xml_declaration=False)
return self._generate_html(matches, text_name, siglum, text)
def _highlight (self, text, matches):
for row_index, row in matches.iterrows():
ngram = row[constants.NGRAM_FIELDNAME]
self._match_source = Text.assemble_filename(
row[constants.NAME_FIELDNAME], row[constants.SIGLUM_FIELDNAME])
pattern = self._get_regexp_pattern(ngram)
text = re.sub(pattern, self._annotate_tokens, text)
return text
def _prepare_text (self, text):
"""Returns `text` with each consituent token wrapped in HTML markup
for later match annotation.
:param text: text to be marked up
:type text: `str`
:rtype: `str`
"""
# Remove characters that should be escaped for XML input (but
# which cause problems when escaped, since they become
# tokens).
text = re.sub(r'[<>&]', '', text)
pattern = r'({})'.format(self._tokenizer.pattern)
replacement = r'\1'
return re.sub(pattern, replacement, text)
PK F&pS tacl/jitc-old.pyimport csv
import io
import logging
import os
from bokeh.embed import components
from bokeh.charts import Bar
import pandas as pd
from . import constants
from .report import Report
from .statistics_report import StatisticsReport
class JITCProcessor:
"""Generate statistics to list texts from one corpus (referred to
below as "Maybe" and defined in a catalogue file) in order of
similarity to each text in that corpus. Takes into account a
second corpus of texts (referred to below as "No" and defined in a
catalogue file) that are similar to those in the first, but not in
the way(s) that are the subject of the investigation.
Given the two corpora, Maybe and No, the script performs the
following actions:
1. For each text Y in Maybe:
1. Run an intersection between Y and No.
2. For each text M in Maybe (excluding Y):
1. Run an intersect between Y and M.
2. Drop Y results.
3. Run a supplied diff between results from [1.2.2] and
results from [1.1].
4. Get number of tokens in M.
3. Rank and list texts in Maybe in descending order of the
ratio, from [1.2.3], of matching tokens (n-gram size x count)
to total tokens [1.2.5].
4. Concatenate all results from [1.2.3] files.
"""
def __init__ (self, store, corpus, catalogue, maybe_label, tokenizer,
output_dir):
self._logger = logging.getLogger(__name__)
self._corpus = corpus
self._maybe_label = maybe_label
self._maybe_texts = [text for text, label in catalogue.items()
if label == maybe_label]
self._no_texts = [text for text, label in catalogue.items()
if label != maybe_label]
self._no_label = catalogue[self._no_texts[0]]
self._output_dir = output_dir
self._store = store
self._tokenizer = tokenizer
self._ratios = {}
def _drop_no_label_results (self, results, fh, reduce=False):
# Drop results associated with the 'no' label.
results.seek(0)
report = Report(results, self._tokenizer)
report.remove_label(self._no_label)
if reduce:
report.reduce()
results = report.csv(fh)
def process_maybe_text (self, yes_text, maybe_text, work_dir,
yn_results_path):
if maybe_text == yes_text:
return
self._logger.debug(
'Processing "maybe" text {} against "yes" text {}.'.format(
maybe_text, yes_text))
ym_results_path = os.path.join(
work_dir, 'intersect_with_' + maybe_text + '.csv')
catalogue = {yes_text: self._no_label,
maybe_text: self._maybe_label}
self._run_query(ym_results_path, self._store.intersection, [catalogue])
distinct_results_path = os.path.join(
work_dir, 'distinct_' + maybe_text + '.csv')
results = [yn_results_path, ym_results_path]
labels = [self._no_label, self._maybe_label]
self._run_query(distinct_results_path, self._store.diff_supplied,
[results, labels])
stats_path = os.path.join(work_dir, 'stats_' + maybe_text + '.csv')
if not os.path.exists(stats_path):
stats_report = StatisticsReport(self._corpus, self._tokenizer,
distinct_results_path)
stats_report.generate_statistics()
with open(stats_path, mode='w', encoding='utf-8', newline='') as fh:
stats_report.csv(fh)
with open(stats_path, encoding='utf-8', newline='') as fh:
reader = csv.DictReader(fh)
for row in reader:
siglum = row[constants.SIGLUM_FIELDNAME]
ratio = float(row[constants.PERCENTAGE_FIELDNAME])
self._ratios[yes_text][(maybe_text, siglum)] = ratio
def process_yes_text (self, yes_text, no_catalogue):
self._logger.debug('Processing "maybe" text {} as "yes".'.format(
yes_text))
self._ratios[yes_text] = {}
yes_work_dir = os.path.join(self._output_dir, yes_text)
os.makedirs(yes_work_dir, exist_ok=True)
results_path = os.path.join(yes_work_dir, 'intersect_with_no.csv')
self._run_query(results_path, self._store.intersection, [no_catalogue])
for maybe_text in self._maybe_texts:
self.process_maybe_text(yes_text, maybe_text, yes_work_dir,
results_path)
def process_yes_texts (self):
no_catalogue = {text: self._no_label for text in self._no_texts}
data = {}
graphs = {}
for yes_text in self._maybe_texts:
no_catalogue[yes_text] = self._maybe_label
self.process_yes_text(yes_text, no_catalogue)
no_catalogue.pop(yes_text)
values = [ratio for ratio in self._ratios[yes_text].values()]
index = pd.MultiIndex.from_tuples(list(self._ratios[yes_text].keys()),
names=['text', 'siglum'])
series = pd.Series(values, index=index)
data[yes_text] = series
df = pd.DataFrame(data)
# Create a chart that has two bars per text on x-axis: one for
# the percentage of that text that overlaps with the base
# text, and one for the percentage of the base text that
# overlaps with that text. A tooltip showing the values per
# witness would be good.
#
# Create a stacked bar chart that shows the percentage the
# content consisting of shared markers that aren't in the no
# corpus, shared markers that are in the no corpus, and
# unshared markers.
#texts = list(set(index.get_level_values('text')))
#ratios = []
#for text in texts:
# ratio = series[text].max()
# ratios.append(ratio)
#title = 'Shared markers with {}'.format(yes_text)
#bar = Bar(ratios, texts, stacked=False, title=title,
# xlabel='Text', ylabel='% of text sharing markers')
#graphs[yes_text + '-related'] = bar
script, divs = components(graphs)
def _run_query (self, path, query, query_args):
if os.path.exists(path):
return
output_results = io.StringIO(newline='')
query(*query_args, output_fh=output_results)
with open(path, mode='w', encoding='utf-8', newline='') as fh:
self._drop_no_label_results(output_results, fh)
PK &F#WV V tacl/stripper.py"""Module containing the Stripper class."""
import logging
import os
import re
from lxml import etree
BASE_WITNESS = 'base'
witnesses_splitter = re.compile(r'【|】')
STRIP_XSLT = '''
'''.format(base=BASE_WITNESS)
class Stripper:
"""Class used for preprocessing a corpus of texts by stripping out
all material that is not the textual material proper.
The intention is to keep the stripped text as close in formatting
to the original as possible, including whitespace."""
def __init__ (self, input_dir, output_dir):
self._logger = logging.getLogger(__name__)
self._input_dir = os.path.abspath(input_dir)
self._output_dir = os.path.abspath(output_dir)
self._transform = etree.XSLT(etree.XML(STRIP_XSLT))
self._texts = {}
def get_witnesses (self, source_tree):
"""Returns a list of all witnesses of variant readings in
`source_tree`.
:param source_tree: XML tree of source document
:type source_tree: `etree._ElementTree`
:rtype: `set`
"""
witnesses = set([BASE_WITNESS])
witness_values = source_tree.xpath('//app/rdg[@wit]/@wit')
for witness_value in witness_values:
for witness in witnesses_splitter.split(witness_value):
if witness:
witnesses.add(witness)
return witnesses
def _output_file (self, text_name, witnesses):
text_dir = os.path.join(self._output_dir, text_name)
try:
os.makedirs(text_dir)
except OSError as err:
logging.error('Could not create output directory: {}'.format(
err))
raise
for witness in witnesses.keys():
witness_file_path = os.path.join(
text_dir, '{}.txt'.format(witness))
with open(witness_file_path, 'wb') as output_file:
output_file.write(witnesses[witness].encode('utf-8'))
def strip_files (self):
if not os.path.exists(self._output_dir):
try:
os.makedirs(self._output_dir)
except OSError as err:
self._logger.error(
'Could not create output directory: {}'.format(err))
raise
for dirpath, dirnames, filenames in os.walk(self._input_dir):
for filename in filenames:
if os.path.splitext(filename)[1] == '.xml':
text_name, witnesses = self.strip_file(
os.path.join(dirpath, filename))
self._output_file(text_name, witnesses)
def strip_file (self, filename):
file_path = os.path.join(self._input_dir, filename)
text_name = os.path.splitext(os.path.basename(filename))[0]
stripped_file_path = os.path.join(self._output_dir, text_name)
self._logger.info('Stripping file {} into {}'.format(
file_path, stripped_file_path))
try:
tei_doc = etree.parse(file_path)
except etree.XMLSyntaxError:
logging.warning('XML file "{}" is invalid'.format(filename))
return
text_witnesses = self._texts.setdefault(stripped_file_path, {})
for witness in self.get_witnesses(tei_doc):
witness_param = "'{}'".format(witness)
text = str(self._transform(tei_doc, witness=witness_param))
text_witnesses[witness] = text
return text_name, text_witnesses
PK &FwP P tacl/catalogue.pyimport csv
import os
from .constants import CATALOGUE_TEXT_RELABELLED_ERROR
from .exceptions import MalformedCatalogueError
class Catalogue (dict):
def generate (self, path, label):
"""Creates default data from the corpus at `path`, marking all
texts with `label`.
:param path: path to a corpus directory
:type path: `str`
:param label: label to categorise each text as
:type label: `str`
"""
for filename in os.listdir(path):
self[filename] = label
def load (self, path):
"""Loads the data from `path` into the catalogue.
:param path: path to catalogue file
:type path: `str`
"""
fieldnames = ['text', 'label']
with open(path, 'r', encoding='utf-8', newline='') as fh:
reader = csv.DictReader(fh, delimiter=' ', fieldnames=fieldnames,
skipinitialspace=True)
for row in reader:
text, label = row['text'], row['label']
if label:
if text in self:
raise MalformedCatalogueError(
CATALOGUE_TEXT_RELABELLED_ERROR.format(text))
self[text] = label
def save (self, path):
"""Saves this catalogue's data to `path`.
:param path: file path to save catalogue data to
:type path: `str`
"""
writer = csv.writer(open(path, 'w', newline=''), delimiter=' ')
rows = list(self.items())
rows.sort(key=lambda x: x[0])
writer.writerows(rows)
PK PGSe1? ? tacl/jitc.pyimport csv
import io
import json
import logging
import math
import os
import shutil
import pandas as pd
from pkg_resources import resource_filename, resource_listdir
from . import constants
from .report import Report
from .statistics_report import StatisticsReport
class JITCProcessor:
"""Generate statistics to list texts from one corpus (referred to
below as "Maybe" and defined in a catalogue file) in order of
similarity to each text in that corpus. Takes into account a
second corpus of texts (referred to below as "No" and defined in a
catalogue file) that are similar to those in the first, but not in
the way(s) that are the subject of the investigation.
Given the two corpora, Maybe and No, the script performs the
following actions:
1. For each text Y in Maybe:
1. Run an intersection between Y and No.
2. For each text M in Maybe (excluding Y):
1. Run an intersect between Y and M.
2. Drop Y results.
3. Run a supplied diff between results from [1.2.2] and
results from [1.1].
4. Get number of tokens in M.
3. Rank and list texts in Maybe in descending order of the
ratio, from [1.2.3], of matching tokens (n-gram size x count)
to total tokens [1.2.4].
4. Concatenate all results from [1.2.3] files.
Note that in the above, when a text is treated as Y, its different
witnesses are not treated separately. The statistics derived from
queries including it are those that treat all of its witnesses
together; eg, if two n-grams in a witness of M are found only in
two different witnesses of Y, they will both be counted as shared.
"""
def __init__ (self, store, corpus, catalogue, maybe_label, tokenizer,
output_dir):
self._logger = logging.getLogger(__name__)
self._corpus = corpus
self._maybe_label = maybe_label
self._maybe_texts = [text for text, label in catalogue.items()
if label == maybe_label]
self._no_texts = [text for text, label in catalogue.items()
if label != maybe_label]
self._no_label = catalogue[self._no_texts[0]]
self._output_dir = output_dir
self._output_data_dir = os.path.join(self._output_dir, 'data')
self._store = store
self._tokenizer = tokenizer
self._stats = {}
self._ym_intersects_dir = os.path.join(self._output_data_dir,
'ym_intersects')
def _copy_static_assets (self, output_dir):
for asset in resource_listdir(__name__, 'assets'):
filename = resource_filename(__name__, 'assets/{}'.format(
asset))
shutil.copy2(filename, output_dir)
def _create_breakdown_chart (self, data, text, output_dir):
# Create a stacked bar chart that shows the percentage of the
# content consisting of shared tokens that aren't in the no
# corpus, shared tokens that are also in the no corpus, and
# unshared tokens.
chart_data = data.loc[text].sort('shared', ascending=False)[
['shared', 'unique', 'common']]
csv_path = os.path.join(output_dir, 'breakdown_{}.csv'.format(
text))
chart_data.to_csv(csv_path)
def _create_chord_chart (self, data, output_dir):
matrix = []
chord_data = data.unstack('main_text')['shared']
for index, row_data in chord_data.fillna(value=0).iterrows():
matrix.append([value / 100 for value in row_data])
colours = generate_colours(len(self._maybe_texts))
colour_texts = [{'name': text, 'colour': colour} for text, colour
in zip(chord_data, colours)]
json_data = json.dumps({'texts': colour_texts, 'matrix': matrix})
with open(os.path.join(output_dir, 'chord_data.js'), 'w') as fh:
fh.write('var chordData = {}'.format(json_data))
def _create_matrix_chart (self, data, output_dir):
nodes = [{'name': name, 'group': 1} for name in self._maybe_texts]
weights = data.stack().unstack('related_text').max()
seen = []
links = []
for (source, target), weight in weights.iteritems():
if target not in seen and target != source:
seen.append(source)
links.append({'source': self._maybe_texts.index(source),
'target': self._maybe_texts.index(target),
'value': weight})
json_data = json.dumps({'nodes': nodes, 'links': links})
with open(os.path.join(output_dir, 'matrix_data.js'), 'w') as fh:
fh.write('var matrixData = {}'.format(json_data))
def _create_related_chart (self, data, text, output_dir):
# Create a chart that has two bars per text on x-axis: one for
# the percentage of that text that overlaps with the base
# text, and one for the percentage of the base text that
# overlaps with that text. A tooltip showing the values per
# witness would be good.
chart_data = data[text].dropna().sort('shared_related_text',
ascending=False)
csv_path = os.path.join(output_dir, 'related_{}.csv'.format(text))
chart_data.to_csv(csv_path)
def _drop_no_label_results (self, results, fh):
"""Writes `results` to `fh` minus those results associated with the
'no' label.
:param results: results to be manipulated
:type results: file-like object
:param fh: output destination
:type fh: file-like object
"""
results.seek(0)
report = Report(results, self._tokenizer)
report.remove_label(self._no_label)
report.csv(fh)
def _generate_statistics (self, out_path, results_path):
"""Write a statistics report for `results_path` to `out_path`."""
if not os.path.exists(out_path):
report = StatisticsReport(self._corpus, self._tokenizer,
results_path)
report.generate_statistics()
with open(out_path, mode='w', encoding='utf-8', newline='') as fh:
report.csv(fh)
def _get_reversed_data (self, data):
reverse_data = data.unstack('main_text')['shared']
tuples = list(zip(['shared_related_text'] * len(reverse_data.columns),
reverse_data.columns))
reverse_data.columns = pd.MultiIndex.from_tuples(
tuples, names=['text', 'main_text'])
for text in reverse_data['shared_related_text'].columns:
reverse_data['shared_base_text', text] = reverse_data[
'shared_related_text'].loc[text].tolist()
return reverse_data.swaplevel('text', 'main_text', axis=1)
def _process_maybe_text (self, yes_text, maybe_text, work_dir,
yn_results_path):
if maybe_text == yes_text:
return
self._logger.info(
'Processing "maybe" text {} against "yes" text {}.'.format(
maybe_text, yes_text))
for siglum in self._corpus.get_sigla(maybe_text):
witness = (maybe_text, siglum)
self._stats[yes_text]['common'][witness] = 0
self._stats[yes_text]['shared'][witness] = 0
self._stats[yes_text]['unique'][witness] = 100
texts = [yes_text, maybe_text]
texts.sort()
ym_results_path = os.path.join(
self._ym_intersects_dir, '{}_intersect_{}.csv'.format(*texts))
catalogue = {yes_text: self._no_label,
maybe_text: self._maybe_label}
self._run_query(ym_results_path, self._store.intersection, [catalogue],
False)
intersect_stats_path = os.path.join(
work_dir, 'stats_intersect_{}.csv'.format(maybe_text))
self._generate_statistics(intersect_stats_path, ym_results_path)
with open(intersect_stats_path, encoding='utf-8', newline='') as fh:
reader = csv.DictReader(fh)
for row in reader:
if row[constants.NAME_FIELDNAME] == maybe_text:
witness = (maybe_text, row[constants.SIGLUM_FIELDNAME])
ratio = float(row[constants.PERCENTAGE_FIELDNAME])
self._stats[yes_text]['common'][witness] = ratio
self._stats[yes_text]['unique'][witness] -= ratio
distinct_results_path = os.path.join(
work_dir, 'distinct_{}.csv'.format(maybe_text))
results = [yn_results_path, ym_results_path]
labels = [self._no_label, self._maybe_label]
self._run_query(distinct_results_path, self._store.diff_supplied,
[results, labels])
diff_stats_path = os.path.join(work_dir,
'stats_diff_{}.csv'.format(maybe_text))
self._generate_statistics(diff_stats_path, distinct_results_path)
with open(diff_stats_path, encoding='utf-8', newline='') as fh:
reader = csv.DictReader(fh)
for row in reader:
if row[constants.NAME_FIELDNAME] == maybe_text:
witness = (maybe_text, row[constants.SIGLUM_FIELDNAME])
ratio = float(row[constants.PERCENTAGE_FIELDNAME])
self._stats[yes_text]['shared'][witness] = ratio
self._stats[yes_text]['common'][witness] -= ratio
def _process_yes_text (self, yes_text, no_catalogue, output_dir):
self._logger.info('Processing "maybe" text {} as "yes".'.format(
yes_text))
self._stats[yes_text] = {'common': {}, 'shared': {}, 'unique': {}}
yes_work_dir = os.path.join(output_dir, yes_text)
os.makedirs(yes_work_dir, exist_ok=True)
results_path = os.path.join(yes_work_dir, 'intersect_with_no.csv')
self._run_query(results_path, self._store.intersection, [no_catalogue])
for maybe_text in self._maybe_texts:
self._process_maybe_text(yes_text, maybe_text, yes_work_dir,
results_path)
def process (self):
no_catalogue = {text: self._no_label for text in self._no_texts}
data = {}
os.makedirs(self._ym_intersects_dir, exist_ok=True)
for yes_text in self._maybe_texts:
no_catalogue[yes_text] = self._maybe_label
self._process_yes_text(yes_text, no_catalogue,
self._output_data_dir)
no_catalogue.pop(yes_text)
for scope in ('shared', 'common', 'unique'):
text_data = self._stats[yes_text][scope]
# QAZ: Check that keys() and values() will always
# return items in the same order when called
# consecutively like this.
index = pd.MultiIndex.from_tuples(
list(text_data.keys()), names=['related_text', 'siglum'])
data[(yes_text, scope)] = pd.Series(list(text_data.values()),
index=index)
full_data = pd.DataFrame(data)
full_data.columns.names = ['main_text', 'scope']
full_data = full_data.stack('main_text').swaplevel(
'main_text', 'siglum').swaplevel('related_text', 'main_text')
grouped = full_data.groupby(level=['main_text', 'related_text'],
axis=0, group_keys=False)
max_data = grouped.apply(lambda x: x.loc[x['shared'].idxmax()])
reverse_data = self._get_reversed_data(max_data)
report_data_dir = os.path.join(self._output_dir, 'report_data')
os.makedirs(report_data_dir, exist_ok=True)
report_assets_dir = os.path.join(self._output_dir, 'report_assets')
os.makedirs(report_assets_dir, exist_ok=True)
# Matrix chart.
self._create_matrix_chart(reverse_data, report_data_dir)
# Chord chart.
self._create_chord_chart(max_data, report_data_dir)
# Individual text bar charts.
texts = []
scripts = []
toc = []
export_data = full_data.unstack('main_text').swaplevel(
'main_text', 'scope', axis=1)
export_data.index.names = ['related text', 'siglum']
for index, text in enumerate(self._maybe_texts):
self._create_breakdown_chart(max_data, text, report_data_dir)
self._create_related_chart(reverse_data, text, report_data_dir)
table = export_data[text].dropna().to_html()
toc.append(TOC_HTML.format(index=index, text=text))
texts.append(TEXT_HTML.format(index=index, table=table, text=text))
scripts.append(SCRIPT_HTML.format(index=index, sep=os.sep,
text=text))
with open(os.path.join(self._output_dir, 'report.html'), 'w') as fh:
fh.write(CHART_HTML.format(scripts='\n'.join(scripts),
sep=os.sep,
texts='\n'.join(texts),
toc='\n'.join(toc)))
self._copy_static_assets(report_assets_dir)
def _run_query (self, path, query, query_args, drop_no=True):
if os.path.exists(path):
return
output_results = io.StringIO(newline='')
query(*query_args, output_fh=output_results)
with open(path, mode='w', encoding='utf-8', newline='') as fh:
if drop_no:
self._drop_no_label_results(output_results, fh)
else:
fh.write(output_results.getvalue())
def rgb_colour (h, f):
"""Convert a colour specified by h-value and f-value to an RGB string."""
v = 1
p = 0
if h == 0:
colour = v, f, p
elif h == 1:
colour = 1 - f, v, p
elif h == 2:
colour = p, v, f
elif h == 3:
colour = p, 1 - f, v
elif h == 4:
colour = f, p, v
elif h == 5:
colour = v, p, 1 - f
return 'rgb({}, {}, {})'.format(*[round(value * 255) for value in colour])
def generate_colours (n):
"""Return a list of distinct colours, each of which is represented as
an RGB string suitable for use in CSS."""
hues = [360 / n * i for i in range(n)]
hs = (math.floor(hue / 60) % 6 for hue in hues)
fs = (hue / 60 - math.floor(hue / 60) for hue in hues)
return [rgb_colour(h, f) for h, f in zip(hs, fs)]
CHART_HTML = '''
JitC Report
'''
SCRIPT_HTML = '''
'''
PK w*F:tRΩ tacl/__init__.pyfrom . import constants
from .catalogue import Catalogue
from .corpus import Corpus
from .data_store import DataStore
from .highlighter import Highlighter
from .jitc import JITCProcessor
from .report import Report
from .sequence import Sequencer
from .statistics_report import StatisticsReport
from .stripper import Stripper
from .tei_corpus import TEICorpus
from .text import BaseText, Text
from .tokenizer import Tokenizer
PK =nG5w w tacl/statistics_report.py"""Module containing the StatisticsReport class."""
import re
import pandas as pd
from . import constants
from .text import BaseText
class StatisticsReport:
def __init__ (self, corpus, tokenizer, matches):
self._corpus = corpus
self._tokenizer = tokenizer
self._matches = pd.read_csv(matches, encoding='utf-8', na_filter=False)
self._stats = pd.DataFrame()
def csv (self, fh):
self._stats.to_csv(fh, columns=constants.STATISTICS_FIELDNAMES,
encoding='utf-8', index=False)
return fh
def generate_statistics (self):
"""Replaces result rows with summary statistics about the results.
These statistics give the filename, total matching tokens,
percentage of matching tokens and label for each witness in
the results.
"""
matches = self._matches
witness_fields = [constants.NAME_FIELDNAME, constants.SIGLUM_FIELDNAME,
constants.LABEL_FIELDNAME]
witnesses = matches[witness_fields].drop_duplicates()
rows = []
for index, (text_name, siglum, label) in witnesses.iterrows():
text = self._corpus.get_text(text_name, siglum)
witness_matches = matches[
(matches[constants.NAME_FIELDNAME] == text_name) &
(matches[constants.SIGLUM_FIELDNAME] == siglum)]
total_count, matching_count = self._process_witness(
text, witness_matches)
percentage = matching_count / total_count * 100
rows.append({constants.NAME_FIELDNAME: text_name,
constants.SIGLUM_FIELDNAME: siglum,
constants.COUNT_TOKENS_FIELDNAME: matching_count,
constants.TOTAL_TOKENS_FIELDNAME: total_count,
constants.PERCENTAGE_FIELDNAME: percentage,
constants.LABEL_FIELDNAME: label})
self._stats = pd.DataFrame(rows)
def _generate_text_from_slices (self, full_text, slices):
"""Return a single string consisting of the parts specified in
`slices` joined together by the tokenizer's joining string.
:param full_text: the text to be sliced
:type full_text: `str`
:param slices: list of slice indices to apply to `full_text`
:type slices: `list` of `list`s
:rtype: `str`
"""
sliced_text = []
for start, end in slices:
sliced_text.append(full_text[start:end])
return self._tokenizer.joiner.join(sliced_text)
@staticmethod
def _merge_slices (match_slices):
"""Return a list of slice indices lists derived from `match_slices`
with no overlaps."""
# Sort by earliest range, then by largest range.
match_slices.sort(key=lambda x: (x[0], -x[1]))
merged_slices = [match_slices.pop(0)]
for slice_indices in match_slices:
last_end = merged_slices[-1][1]
if slice_indices[0] <= last_end:
if slice_indices[1] > last_end:
merged_slices[-1][1] = slice_indices[1]
else:
merged_slices.append(slice_indices)
return merged_slices
def _process_witness (self, text, matches):
"""Return the counts of total tokens and matching tokens in `text`.
:param text: witness text
:type text: `tacl.Text`
:param matches: n-gram matches
:type matches: `pandas.DataFrame`
:rtype: `tuple` of `int`
"""
# In order to provide a correct count of matched tokens,
# avoiding the twin dangers of counting the same token
# multiple times due to being part of multiple n-grams (which
# can happen even in reduced results) and not counting tokens
# due to an n-gram overlapping with itself or another n-gram,
# a bit of work is required.
#
# Using regular expressions, get the slice indices for all
# matches (including overlapping ones) for all matching
# n-grams. Merge these slices together (without overlap) and
# create a Text using that text, which can then be tokenised
# and the tokens counted.
tokens = text.get_tokens()
full_text = self._tokenizer.joiner.join(tokens)
fields = [constants.NGRAM_FIELDNAME, constants.SIZE_FIELDNAME]
match_slices = []
for index, (ngram, size) in matches[fields].iterrows():
pattern = re.compile(re.escape(ngram))
# Because the same n-gram may overlap itself ("heh" in the
# string "heheh"), re.findall cannot be used.
start = 0
while True:
match = pattern.search(full_text, start)
if match is None:
break
match_slices.append([match.start(), match.end()])
start = match.start() + 1
merged_slices = self._merge_slices(match_slices)
match_content = self._generate_text_from_slices(
full_text, merged_slices)
match_text = BaseText(match_content, self._tokenizer)
return len(tokens), len(match_text.get_tokens())
PK &F@]) ) tacl/tei_corpus.py"""Module containing the TEICorpus class."""
from copy import deepcopy
import logging
import os
import re
from lxml import etree
text_name_pattern = re.compile(
r'^(?P[A-Z]{1,2})\d+n(?P[^_\.]+)_(?P\d+)$')
# XSLT to transform a P4 TEI document with a DTD, external entity
# references, and insanely complex gaiji elements into a P4 TEI
# document with no DTD or external references and all gaiji elements
# replaced with the best representation available, encoded in UTF-8.
SIMPLIFY_XSLT = '''
GAIJI WITHOUT REPRESENTATION
'''
TEI_CORPUS_XML = ''''''
class TEICorpus:
"""A TEICorpus represents a collection of TEI XML documents.
The CBETA texts are TEI XML that have certain quirks that make
them difficult to use directly in TACL's stripping process. This
class provides a tidy method to deal with these quirks; in
particular it consolidates multiple XML files for a single text
into one XML file. This is most useful for variant handling, which
requires that all of the variants used in a given text be known
before processing the file(s) associated with that text.
"""
def __init__ (self, input_dir, output_dir):
self._logger = logging.getLogger(__name__)
self._input_dir = os.path.abspath(input_dir)
self._output_dir = os.path.abspath(output_dir)
self._transform = etree.XSLT(etree.XML(SIMPLIFY_XSLT))
self._texts = {}
def _correct_entity_file (self, file_path):
"""Adds an unused entity declaration to the entity file for
`file_path`, in the hopes that this will make it not cause a
validation failure."""
path, basename = os.path.split(file_path)
entity_file = '{}.ent'.format(os.path.join(
path, basename.split('_')[0]))
with open(entity_file, 'rb') as input_file:
text = input_file.read()
with open(entity_file, 'wb') as output_file:
output_file.write(text)
output_file.write(b'')
def extract_text_name (self, filename):
"""Returns the name of the text in `filename`.
Many texts are divided into multiple parts that need to be
joined together.
"""
basename = os.path.splitext(os.path.basename(filename))[0]
match = text_name_pattern.search(basename)
if match is None:
self._logger.warning('Found an anomalous filename "{}"'.format(
filename))
return None, None
text_name = '{}{}'.format(match.group('prefix'), match.group('text'))
return text_name, int(match.group('part'))
def _output_text (self, text_name, parts):
"""Saves a TEI XML document `text_name` that consists of all of the
indidivual TEI XML source documents joined."""
# Add each part in turn to the skeleton TEICorpus document.
corpus_root = etree.XML(TEI_CORPUS_XML)
for index, part in enumerate(parts):
# Add the teiHeader for the first part as the
# teiHeader of the teiCorpus.
if index == 0:
corpus_root.append(deepcopy(part[0]))
corpus_root.append(part)
tree = etree.ElementTree(corpus_root)
output_filename = os.path.join(self._output_dir, text_name)
tree.write(output_filename, encoding='utf-8', pretty_print=True)
def tidy (self):
if not os.path.exists(self._output_dir):
try:
os.makedirs(self._output_dir)
except OSError as err:
self._logger.error(
'Could not create output directory: {}'.format(err))
raise
# The CBETA texts are organised into directories, and each
# text may be in multiple numbered parts. Crucially, these
# parts may be split over multiple directories. Since it is
# too memory intensive to store all of the lxml
# representations of the XML files at once, before joining the
# parts together, assemble the filenames into groups and then
# process them one by one.
for dirpath, dirnames, filenames in os.walk(self._input_dir):
for filename in filenames:
if os.path.splitext(filename)[1] == '.xml':
text_name, part_number = self.extract_text_name(filename)
if text_name is None:
self._logger.warning('Skipping file "{}"'.format(
filename))
else:
text_name = '{}.xml'.format(text_name)
text_parts = self._texts.setdefault(text_name, {})
text_parts[part_number] = os.path.join(
dirpath, filename)
for text_name, paths in self._texts.items():
parts = list(paths.keys())
parts.sort()
xml_parts = []
for part in parts:
xml_parts.append(self._tidy(text_name, paths[part]))
self._output_text(text_name, xml_parts)
def _tidy (self, text_name, file_path, tried=False):
"""Transforms the file at `file_path` into simpler XML and returns
it."""
output_file = os.path.join(self._output_dir, text_name)
self._logger.info('Tidying file {} into {}'.format(
file_path, output_file))
try:
tei_doc = etree.parse(file_path)
except etree.XMLSyntaxError as err:
self._logger.warning('XML file "{}" is invalid'.format(file_path))
if tried:
self._logger.error(
'XML file "{}" is irretrievably invalid: {}'.format(
file_path, err))
raise
self._logger.warning('Retrying after modifying entity file')
self._correct_entity_file(file_path)
xml = self._tidy(text_name, file_path, True)
else:
xml = self._transform(tei_doc).getroot()
return xml
PK ,KG%s tacl/assets/grouped_bar.js~function groupedBarChart() {
var margin = {top: 20, right: 20, bottom: 100, left: 40},
width = 960 - margin.left - margin.right,
height = 600 - margin.top - margin.bottom;
var x0 = d3.scale.ordinal().rangeRoundBands([0, width], .1);
var x1 = d3.scale.ordinal();
var y = d3.scale.linear().range([height, 0]);
var color = d3.scale.ordinal().range(["#D8B365", "#5AB4AC"]);
var xAxis = d3.svg.axis().scale(x0).orient("bottom");
var yAxis = d3.svg.axis().scale(y).orient("left")
.tickFormat(d3.format(".2s"));
function chart(selection) {
selection.each(function(data) {
var groupNames = d3.keys(data[0]).filter(function(key) {
return key !== "";
});
data.forEach(function(d) {
d.groups = groupNames.map(function(name) {
return {name: name, value: +d[name]};
});
});
});
}
return chart;
}
PK .IG5p p tacl/assets/report.css~.axis path,
.axis line {
fill: none;
stroke: #000;
shape-rendering: crispEdges;
}
.x.axis path {
display: none;
}
#circle circle {
fill: none;
pointer-events: all;
}
.group path {
fill-opacity: .5;
}
path.chord {
stroke: #000; stroke-width: .25px;
}
#circle:hover path.fade {
display: none;
}
#matrix {
margin-left: 8em;
}PK .IGH" " tacl/assets/stacked_bar.js~/*
* Encapsulating chart 'class'.
*/
function stackedBarChart() {
var margin = {top: 20, right: 20, bottom: 100, left: 40},
width = 960 - margin.left - margin.right,
height = 600 - margin.top - margin.bottom;
var x = d3.scale.ordinal()
.rangeRoundBands([0, width], .1);
var y = d3.scale.linear()
.rangeRound([height, 0]);
var color = d3.scale.ordinal()
.range(["#D8B365", "#5AB4AC", "#F5F5F5"]);
var xAxis = d3.svg.axis()
.scale(x)
.orient("bottom");
var yAxis = d3.svg.axis()
.scale(y)
.orient("left")
.tickFormat(d3.format(".2s"));
function chart(selection) {
selection.each(function(data) {
color.domain(d3.keys(data[0]).filter(function(key) {
return key !== "related_text";
}));
data.forEach(function(d) {
var y0 = 0;
d.groups = color.domain().map(function(name) {
return {name: name, y0: y0, y1: y0 += +d[name]};
});
});
x.domain(data.map(function(d) { return d.related_text; }));
y.domain([0, 100]);
var svg = d3.select(this).append("svg")
.attr("width", width + margin.left + margin.right)
.attr("height", height + margin.top + margin.bottom)
.append("g")
.attr("transform", "translate(" + margin.left + "," + margin.top + ")");
svg.append("g")
.attr("class", "x axis")
.attr("transform", "translate(0," + height + ")")
.call(xAxis)
.selectAll("text")
.attr("y", 0)
.attr("x", 9)
.attr("dy", ".35em")
.attr("transform", "rotate(90)")
.style("text-anchor", "start");
svg.append("g")
.attr("class", "y axis")
.call(yAxis)
.append("text")
.attr("transform", "rotate(-90)")
.attr("y", 6)
.attr("dy", ".71em")
.style("text-anchor", "end")
.text("Percentage of text");
var state = svg.selectAll(".state")
.data(data)
.enter().append("g")
.attr("class", "g")
.attr("transform", function(d) { return "translate(" + x(d.related_text) + ",0)"; });
state.selectAll("rect")
.data(function(d) { return d.groups; })
.enter().append("rect")
.attr("width", x.rangeBand())
.attr("y", function(d) { return y(d.y1); })
.attr("height", function(d) { return y(d.y0) - y(d.y1); })
.style("fill", function(d) { return color(d.name); });
var legend = svg.selectAll(".legend")
.data(color.domain().slice().reverse())
.enter().append("g")
.attr("class", "legend")
.attr("transform", function(d, i) { return "translate(0," + i * 20 + ")"; });
legend.append("rect")
.attr("x", width - 18)
.attr("width", 18)
.attr("height", 18)
.style("fill", color);
legend.append("text")
.attr("x", width - 24)
.attr("y", 9)
.attr("dy", ".35em")
.style("text-anchor", "end")
.text(function(d) { return d; });
});
}
return chart;
}
PK GPGhN tacl/assets/grouped_bar.jsfunction groupedBarChart() {
var margin = {top: 20, right: 20, bottom: 100, left: 40},
width = 960 - margin.left - margin.right,
height = 600 - margin.top - margin.bottom;
var x0 = d3.scale.ordinal().rangeRoundBands([0, width], .1);
var x1 = d3.scale.ordinal();
var y = d3.scale.linear().range([height, 0]);
var color = d3.scale.ordinal().range(["#D8B365", "#5AB4AC"]);
var xAxis = d3.svg.axis().scale(x0).orient("bottom");
var yAxis = d3.svg.axis().scale(y).orient("left")
.tickFormat(d3.format(".2s"));
function chart(selection) {
selection.each(function(data) {
var groupNames = d3.keys(data[0]).filter(function(key) {
return key !== "related_text";
});
data.forEach(function(d) {
d.groups = groupNames.map(function(name) {
return {name: name, value: +d[name]};
});
});
x0.domain(data.map(function(d) { return d.related_text; }));
x1.domain(groupNames).rangeRoundBands([0, x0.rangeBand()]);
y.domain([0, 100]);
var svg = d3.select(this).append("svg")
.attr("width", width + margin.left + margin.right)
.attr("height", height + margin.top + margin.bottom)
.append("g")
.attr("transform", "translate(" + margin.left + "," + margin.top + ")");
svg.append("g")
.attr("class", "x axis")
.attr("transform", "translate(0," + height + ")")
.call(xAxis)
.selectAll("text")
.attr("y", 0)
.attr("x", 9)
.attr("dy", ".35em")
.attr("transform", "rotate(90)")
.style("text-anchor", "start");
svg.append("g")
.attr("class", "y axis")
.call(yAxis)
.append("text")
.attr("transform", "rotate(-90)")
.attr("y", 6)
.attr("dy", ".71em")
.style("text-anchor", "end")
.text("Percentage of text");
var related_text = svg.selectAll(".related_text")
.data(data)
.enter().append("g")
.attr("class", "g")
.attr("transform", function(d) {
return "translate(" + x0(d.related_text) + ",0)";
});
related_text.selectAll("rect")
.data(function(d) { return d.groups; })
.enter().append("rect")
.attr("width", x1.rangeBand())
.attr("x", function(d) { return x1(d.name); })
.attr("y", function(d) { return y(d.value); })
.attr("height", function(d) { return height - y(d.value); })
.style("fill", function(d) { return color(d.name); });
var legend = svg.selectAll(".legend")
.data(groupNames.slice().reverse())
.enter().append("g")
.attr("class", "legend")
.attr("transform", function(d, i) {
return "translate(0," + i * 20 + ")";
});
legend.append("rect")
.attr("x", width - 18)
.attr("width", 18)
.attr("height", 18)
.style("fill", color);
legend.append("text")
.attr("x", width - 24)
.attr("y", 9)
.attr("dy", ".35em")
.style("text-anchor", "end")
.text(function(d) { return d; });
});
}
return chart;
}
PK
PGf tacl/assets/stacked_bar.js/*
* Encapsulating chart 'class'.
*/
function stackedBarChart() {
var margin = {top: 20, right: 20, bottom: 100, left: 40},
width = 960 - margin.left - margin.right,
height = 600 - margin.top - margin.bottom;
var x = d3.scale.ordinal()
.rangeRoundBands([0, width], .1);
var y = d3.scale.linear()
.rangeRound([height, 0]);
var color = d3.scale.ordinal()
.range(["#D8B365", "#5AB4AC", "#F5F5F5"]);
var xAxis = d3.svg.axis()
.scale(x)
.orient("bottom");
var yAxis = d3.svg.axis()
.scale(y)
.orient("left")
.tickFormat(d3.format(".2s"));
var tip = d3.tip()
.attr('class', 'd3-tip')
.offset([10, 0])
.html(function(d) {
return "" + d.name + ": " + (d.y1 - d.y0).toFixed(3);
});
function chart(selection) {
selection.each(function(data) {
color.domain(d3.keys(data[0]).filter(function(key) {
return key !== "related_text";
}));
data.forEach(function(d) {
var y0 = 0;
d.groups = color.domain().map(function(name) {
return {name: name, y0: y0, y1: y0 += +d[name]};
});
});
x.domain(data.map(function(d) { return d.related_text; }));
y.domain([0, 100]);
var svg = d3.select(this).append("svg")
.attr("width", width + margin.left + margin.right)
.attr("height", height + margin.top + margin.bottom)
.append("g")
.attr("transform", "translate(" + margin.left + "," + margin.top + ")");
svg.call(tip);
svg.append("g")
.attr("class", "x axis")
.attr("transform", "translate(0," + height + ")")
.call(xAxis)
.selectAll("text")
.attr("y", 0)
.attr("x", 9)
.attr("dy", ".35em")
.attr("transform", "rotate(90)")
.style("text-anchor", "start");
svg.append("g")
.attr("class", "y axis")
.call(yAxis)
.append("text")
.attr("transform", "rotate(-90)")
.attr("y", 6)
.attr("dy", ".71em")
.style("text-anchor", "end")
.text("Percentage of text");
var related_text = svg.selectAll(".related_text")
.data(data)
.enter().append("g")
.attr("class", "g")
.attr("transform", function(d) {
return "translate(" + x(d.related_text) + ",0)";
});
related_text.selectAll("rect")
.data(function(d) { return d.groups; })
.enter().append("rect")
.attr("width", x.rangeBand())
.attr("y", function(d) { return y(d.y1); })
.attr("height", function(d) { return y(d.y0) - y(d.y1); })
.style("fill", function(d) { return color(d.name); })
.on('mouseover', tip.show)
.on('mouseout', tip.hide);
var legend = svg.selectAll(".legend")
.data(color.domain().slice().reverse())
.enter().append("g")
.attr("class", "legend")
.attr("transform", function(d, i) { return "translate(0," + i * 20 + ")"; });
legend.append("rect")
.attr("x", width - 18)
.attr("width", 18)
.attr("height", 18)
.style("fill", color);
legend.append("text")
.attr("x", width - 24)
.attr("y", 9)
.attr("dy", ".35em")
.style("text-anchor", "end")
.text(function(d) { return d; });
});
}
return chart;
}
PK .IGL L tacl/assets/matrix.js// Code adapted from http://bost.ocks.org/mike/miserables/
var margin = {top: 120, right: 0, bottom: 10, left: 180},
width = 720,
height = 720;
var x = d3.scale.ordinal().rangeBands([0, width]),
z = d3.scale.linear().domain([0, 20]).range([0, 1]).clamp(true),
c = d3.scale.category10().domain(d3.range(10));
var table_svg = d3.select("#matrix").append("svg")
.attr("width", width + margin.left + margin.right)
.attr("height", height + margin.top + margin.bottom)
.style("margin-left", -margin.left + "px")
.append("g")
.attr("transform", "translate(" + margin.left + "," + margin.top + ")");
function handleTableData (texts) {
var matrix = [],
nodes = texts.nodes,
n = nodes.length;
// Compute index per node.
nodes.forEach(function(node, i) {
node.index = i;
node.count = 0;
matrix[i] = d3.range(n).map(function(j) { return {x: j, y: i, z: 0}; });
});
// Convert links to matrix; sum percentages for each text (to get a
// measure of how much it overlaps with all the other texts).
texts.links.forEach(function(link) {
matrix[link.source][link.target].z += link.value;
matrix[link.target][link.source].z += link.value;
matrix[link.source][link.source].z = 0;
matrix[link.target][link.target].z = 0;
nodes[link.source].count += link.value;
nodes[link.target].count += link.value;
});
// Precompute the orders.
var orders = {
name: d3.range(n).sort(function(a, b) {
return d3.ascending(nodes[a].name, nodes[b].name); }),
count: d3.range(n).sort(function(a, b) {
return nodes[b].count - nodes[a].count; })
};
// The default sort order.
x.domain(orders.name);
table_svg.append("rect")
.attr("class", "background")
.attr("width", width)
.attr("height", height);
var row = table_svg.selectAll(".row")
.data(matrix)
.enter().append("g")
.attr("class", "row")
.attr("transform", function(d, i) {
return "translate(0," + x(i) + ")"; })
.each(row);
row.append("line")
.attr("x2", width);
row.append("text")
.attr("x", -6)
.attr("y", x.rangeBand() / 2)
.attr("dy", ".32em")
.attr("text-anchor", "end")
.text(function(d, i) { return nodes[i].name; });
var column = table_svg.selectAll(".column")
.data(matrix)
.enter().append("g")
.attr("class", "column")
.attr("transform", function(d, i) {
return "translate(" + x(i) + ")rotate(-90)"; });
column.append("line")
.attr("x1", -width);
column.append("text")
.attr("x", 6)
.attr("y", x.rangeBand() / 2)
.attr("dy", ".32em")
.attr("text-anchor", "start")
.text(function(d, i) { return nodes[i].name; });
function row(row) {
var cell = d3.select(this).selectAll(".cell")
.data(row.filter(function(d) { return d.z; }))
.enter().append("rect")
.attr("class", "cell")
.attr("x", function(d) { return x(d.x); })
.attr("width", x.rangeBand())
.attr("height", x.rangeBand())
.style("fill-opacity", function(d) { return z(d.z); })
.style("fill", function(d) { return nodes[d.x].group == nodes[d.y].group ? c(nodes[d.x].group) : null; })
.on("mouseover", mouseover)
.on("mouseout", mouseout)
.append("svg:title").text(function(d) { return d.z; });
}
function mouseover(p) {
d3.selectAll(".row text").classed("active", function(d, i) { return i == p.y; });
d3.selectAll(".column text").classed("active", function(d, i) { return i == p.x; });
}
function mouseout() {
d3.selectAll("text").classed("active", false);
}
d3.select("#order").on("change", function() {
clearTimeout(timeout);
order(this.value);
});
function order(value) {
x.domain(orders[value]);
var t = table_svg.transition().duration(2500);
t.selectAll(".row")
.delay(function(d, i) { return x(i) * 4; })
.attr("transform", function(d, i) { return "translate(0," + x(i) + ")"; })
.selectAll(".cell")
.delay(function(d) { return x(d.x) * 4; })
.attr("x", function(d) { return x(d.x); });
t.selectAll(".column")
.delay(function(d, i) { return x(i) * 4; })
.attr("transform", function(d, i) { return "translate(" + x(i) + ")rotate(-90)"; });
}
var timeout = setTimeout(function() {
order("name");
d3.select("#order").property("selectedIndex", 0).node().focus();
}, 5000);
}
handleTableData(matrixData);
PK PGBwɺ tacl/assets/report.css.axis path,
.axis line {
fill: none;
stroke: #000;
shape-rendering: crispEdges;
}
.x.axis path {
display: none;
}
#circle circle {
fill: none;
pointer-events: all;
}
.group path {
fill-opacity: .5;
}
path.chord {
stroke: #000; stroke-width: .25px;
}
#circle:hover path.fade {
display: none;
}
#matrix {
margin-left: 8em;
}
.d3-tip {
background: white;
color: black;
padding: 0.5em;
}PK .IGST tacl/assets/chord.js// From http://bost.ocks.org/mike/uberdata/
var width = 720,
height = 720,
outerRadius = Math.min(width, height) / 2 - 10,
innerRadius = outerRadius - 24;
var formatPercent = d3.format(".1%");
var arc = d3.svg.arc()
.innerRadius(innerRadius)
.outerRadius(outerRadius);
var layout = d3.layout.chord()
.padding(.04)
.sortSubgroups(d3.descending)
.sortChords(d3.ascending);
var path = d3.svg.chord()
.radius(innerRadius);
var chord_svg = d3.select("#chord").append("svg")
.attr("width", width)
.attr("height", height)
.attr("viewBox", "0 0 " + width + " " + height)
.append("g")
.attr("id", "circle")
.attr("transform", "translate(" + width / 2 + "," + height / 2 + ")");
chord_svg.append("circle")
.attr("r", outerRadius);
function handleChordData (chordData) {
var texts = chordData.texts;
// Compute the chord layout.
layout.matrix(chordData.matrix);
// Add a group per neighborhood.
var group = chord_svg.selectAll(".group")
.data(layout.groups)
.enter().append("g")
.attr("class", "group")
.on("mouseover", mouseover);
// Add a mouseover title.
group.append("title").text(function(d, i) {
return texts[i].name + ": sum of shared text percentages: " + formatPercent(d.value);
});
// Add the group arc.
var groupPath = group.append("path")
.attr("id", function(d, i) { return "group" + i; })
.attr("d", arc)
.style("fill", function(d, i) { return texts[i].colour; });
// Add a text label.
var groupText = group.append("text")
.attr("x", 6)
.attr("dy", 15);
groupText.append("textPath")
.attr("xlink:href", function(d, i) { return "#group" + i; })
.text(function(d, i) { return texts[i].name; });
// Remove the labels that don't fit. :(
groupText.filter(function(d, i) {
return groupPath[0][i].getTotalLength() / 2 - 16 < this.getComputedTextLength(); })
.remove();
// Add the chords.
var chord = chord_svg.selectAll(".chord")
.data(layout.chords)
.enter().append("path")
.attr("class", "chord")
.style("fill", function(d) { return texts[d.source.index].colour; })
.attr("d", path);
// Add an elaborate mouseover title for each chord.
chord.append("title").text(function(d) {
return "Percentage of " + texts[d.source.index].name
+ " shared with " + texts[d.target.index].name
+ ": " + formatPercent(d.source.value)
+ "\nPercentage of " + texts[d.target.index].name
+ " shared with " + texts[d.source.index].name
+ ": " + formatPercent(d.target.value);
});
function mouseover(d, i) {
chord.classed("fade", function(p) {
return p.source.index != i
&& p.target.index != i;
});
}
};
handleChordData(chordData);
PK 8sGBq $ tacl/assets/templates/highlight.html
{{ base_name }} {{ base_siglum }} with matches from each other text highlighted
{{ base_name }} {{ base_siglum }} with matches from each other text highlighted
{{ text }}
PK 8oGP # tacl/assets/templates/sequence.html
Alignment between {{ l1 }} and {{ l2 }}
Alignment between {{ l1 }} and {{ l2 }}
{{ l1 }}
{{ l2 }}
{% for sequence in sequences %}
{{ sequence.0 }}
{{ sequence.1 }}
{% endfor %}
PK isGΘl'Q Q tacl/command/tacl_script.py"""Command-line script to perform n-gram analysis of a corpus of
texts."""
import argparse
import io
import logging
import sys
import tacl
from tacl import constants
from tacl.command.formatters import ParagraphFormatter
logger = logging.getLogger('tacl')
def main ():
parser = generate_parser()
args = parser.parse_args()
if hasattr(args, 'verbose'):
configure_logging(args.verbose)
if hasattr(args, 'func'):
args.func(args, parser)
else:
parser.print_help()
def add_common_arguments (parser):
"""Adds common arguments for all parsers."""
parser.add_argument('-v', '--verbose', action='count',
help=constants.VERBOSE_HELP)
def add_corpus_arguments (parser):
"""Adds common arguments for commands making use of a corpus to
`parser`."""
add_tokenizer_argument(parser)
parser.add_argument('corpus', help=constants.DB_CORPUS_HELP,
metavar='CORPUS')
def add_db_arguments (parser, db_option=False):
"""Adds common arguments for the database sub-commands to
`parser`.
`db_option` provides a means to work around
https://bugs.python.org/issue9338 whereby a positional argument
that follows an optional argument with nargs='+' will not be
recognised. When `db_optional` is True, create the database
argument as a required optional argument, rather than a positional
argument.
"""
parser.add_argument('-m', '--memory', action='store_true',
help=constants.DB_MEMORY_HELP)
parser.add_argument('-r', '--ram', default=3, help=constants.DB_RAM_HELP,
type=int)
if db_option:
parser.add_argument('-d', '--db', help=constants.DB_DATABASE_HELP,
metavar='DATABASE', required=True)
else:
parser.add_argument('db', help=constants.DB_DATABASE_HELP,
metavar='DATABASE')
def add_query_arguments (parser):
"""Adds common arguments for query sub-commonads to `parser`."""
parser.add_argument('catalogue', help=constants.CATALOGUE_CATALOGUE_HELP,
metavar='CATALOGUE')
def add_supplied_query_arguments (parser):
"""Adds common arguments for supplied query sub-commands to
`parser`."""
parser.add_argument('-l', '--labels', help=constants.SUPPLIED_LABELS_HELP,
nargs='+', required=True)
parser.add_argument('-s', '--supplied', help=constants.SUPPLIED_RESULTS_HELP,
metavar='RESULTS', nargs='+', required=True)
def add_tokenizer_argument (parser):
parser.add_argument('-t', '--tokenizer', choices=constants.TOKENIZER_CHOICES,
default=constants.TOKENIZER_CHOICE_CBETA,
help=constants.DB_TOKENIZER_HELP)
def align_results (args, parser):
if args.results == '-':
results = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8',
newline='')
else:
results = open(args.results, 'r', encoding='utf-8', newline='')
tokenizer = get_tokenizer(args)
corpus = tacl.Corpus(args.corpus, tokenizer)
s = tacl.Sequencer(corpus, tokenizer, results, args.output)
s.generate_sequences(args.minimum)
def configure_logging (verbose):
"""Configures the logging used."""
if not verbose:
log_level = logging.WARNING
elif verbose == 1:
log_level = logging.INFO
else:
log_level = logging.DEBUG
logger.setLevel(log_level)
ch = logging.StreamHandler()
ch.setLevel(log_level)
formatter = logging.Formatter(
'%(asctime)s %(name)s %(levelname)s: %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
def generate_parser ():
"""Returns a parser configured with sub-commands and arguments."""
parser = argparse.ArgumentParser(
description=constants.TACL_DESCRIPTION,
formatter_class=ParagraphFormatter)
subparsers = parser.add_subparsers(title='subcommands')
generate_align_subparser(subparsers)
generate_catalogue_subparser(subparsers)
generate_counts_subparser(subparsers)
generate_diff_subparser(subparsers)
generate_highlight_subparser(subparsers)
generate_intersect_subparser(subparsers)
generate_ngrams_subparser(subparsers)
generate_prepare_subparser(subparsers)
generate_report_subparser(subparsers)
generate_supplied_diff_subparser(subparsers)
generate_search_subparser(subparsers)
generate_supplied_intersect_subparser(subparsers)
generate_statistics_subparser(subparsers)
generate_strip_subparser(subparsers)
return parser
def generate_align_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to generate aligned
sequences from a set of results."""
parser = subparsers.add_parser(
'align', description=constants.ALIGN_DESCRIPTION,
epilog=constants.ALIGN_EPILOG,
formatter_class=ParagraphFormatter, help=constants.ALIGN_HELP)
parser.set_defaults(func=align_results)
add_common_arguments(parser)
parser.add_argument('-m', '--minimum', default=20,
help=constants.ALIGN_MINIMUM_SIZE_HELP, type=int)
add_corpus_arguments(parser)
parser.add_argument('output', help=constants.ALIGN_OUTPUT_HELP,
metavar='OUTPUT')
parser.add_argument('results', help=constants.REPORT_RESULTS_HELP,
metavar='RESULTS')
def generate_catalogue (args, parser):
"""Generates and saves a catalogue file."""
catalogue = tacl.Catalogue()
catalogue.generate(args.corpus, args.label)
catalogue.save(args.catalogue)
def generate_catalogue_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to generate and save
a catalogue file."""
parser = subparsers.add_parser(
'catalogue', description=constants.CATALOGUE_DESCRIPTION,
epilog=constants.CATALOGUE_EPILOG,
formatter_class=ParagraphFormatter, help=constants.CATALOGUE_HELP)
add_common_arguments(parser)
parser.set_defaults(func=generate_catalogue)
parser.add_argument('corpus', help=constants.DB_CORPUS_HELP,
metavar='CORPUS')
add_query_arguments(parser)
parser.add_argument('-l', '--label', default='',
help=constants.CATALOGUE_LABEL_HELP)
def generate_counts_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to make a counts
query."""
parser = subparsers.add_parser(
'counts', description=constants.COUNTS_DESCRIPTION,
epilog=constants.COUNTS_EPILOG, formatter_class=ParagraphFormatter,
help=constants.COUNTS_HELP)
parser.set_defaults(func=ngram_counts)
add_common_arguments(parser)
add_db_arguments(parser)
add_corpus_arguments(parser)
add_query_arguments(parser)
def generate_diff_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to make a diff
query."""
parser = subparsers.add_parser(
'diff', description=constants.DIFF_DESCRIPTION,
epilog=constants.DIFF_EPILOG, formatter_class=ParagraphFormatter,
help=constants.DIFF_HELP)
parser.set_defaults(func=ngram_diff)
group = parser.add_mutually_exclusive_group()
group.add_argument('-a', '--asymmetric', help=constants.ASYMMETRIC_HELP,
metavar='LABEL')
add_common_arguments(parser)
add_db_arguments(parser)
add_corpus_arguments(parser)
add_query_arguments(parser)
def generate_highlight_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to highlight a text with
its matches in a result."""
parser = subparsers.add_parser(
'highlight', description=constants.HIGHLIGHT_DESCRIPTION,
epilog=constants.HIGHLIGHT_EPILOG, formatter_class=ParagraphFormatter,
help=constants.HIGHLIGHT_HELP)
parser.set_defaults(func=highlight_text)
add_common_arguments(parser)
add_corpus_arguments(parser)
parser.add_argument('results', help=constants.STATISTICS_RESULTS_HELP,
metavar='RESULTS')
parser.add_argument('base_name', help=constants.HIGHLIGHT_BASE_NAME_HELP,
metavar='BASE_NAME')
parser.add_argument('base_siglum', metavar='BASE_SIGLUM',
help=constants.HIGHLIGHT_BASE_SIGLUM_HELP)
def generate_intersect_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to make an
intersection query."""
parser = subparsers.add_parser(
'intersect', description=constants.INTERSECT_DESCRIPTION,
epilog=constants.INTERSECT_EPILOG, formatter_class=ParagraphFormatter,
help=constants.INTERSECT_HELP)
parser.set_defaults(func=ngram_intersection)
add_common_arguments(parser)
add_db_arguments(parser)
add_corpus_arguments(parser)
add_query_arguments(parser)
def generate_ngrams (args, parser):
"""Adds n-grams data to the data store."""
store = get_data_store(args)
corpus = get_corpus(args)
store.add_ngrams(corpus, args.min_size, args.max_size)
def generate_ngrams_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to add n-grams data
to the data store."""
parser = subparsers.add_parser(
'ngrams', description=constants.NGRAMS_DESCRIPTION,
formatter_class=ParagraphFormatter, help=constants.NGRAMS_HELP)
parser.set_defaults(func=generate_ngrams)
add_common_arguments(parser)
add_db_arguments(parser)
add_corpus_arguments(parser)
parser.add_argument('min_size', help=constants.NGRAMS_MINIMUM_HELP,
metavar='MINIMUM', type=int)
parser.add_argument('max_size', help=constants.NGRAMS_MAXIMUM_HELP,
metavar='MAXIMUM', type=int)
def generate_prepare_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to prepare source XML
files for stripping."""
parser = subparsers.add_parser(
'prepare', description=constants.PREPARE_DESCRIPTION,
formatter_class=ParagraphFormatter, help=constants.PREPARE_HELP)
parser.set_defaults(func=prepare_xml)
add_common_arguments(parser)
parser.add_argument('input', help=constants.PREPARE_INPUT_HELP,
metavar='INPUT')
parser.add_argument('output', help=constants.PREPARE_OUTPUT_HELP,
metavar='OUTPUT')
def generate_report_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to manipulate CSV
results data."""
parser = subparsers.add_parser(
'report', description=constants.REPORT_DESCRIPTION,
epilog=constants.REPORT_EPILOG, formatter_class=ParagraphFormatter,
help=constants.REPORT_HELP)
add_common_arguments(parser)
parser.set_defaults(func=report)
parser.add_argument('-c', '--catalogue', dest='catalogue',
help=constants.REPORT_CATALOGUE_HELP,
metavar='CATALOGUE')
parser.add_argument('-e', '--extend', dest='extend',
help=constants.REPORT_EXTEND_HELP, metavar='CORPUS')
parser.add_argument('--min-count', dest='min_count',
help=constants.REPORT_MINIMUM_COUNT_HELP,
metavar='COUNT', type=int)
parser.add_argument('--max-count', dest='max_count',
help=constants.REPORT_MAXIMUM_COUNT_HELP,
metavar='COUNT', type=int)
parser.add_argument('--min-size', dest='min_size',
help=constants.REPORT_MINIMUM_SIZE_HELP, metavar='SIZE',
type=int)
parser.add_argument('--max-size', dest='max_size',
help=constants.REPORT_MAXIMUM_SIZE_HELP, metavar='SIZE',
type=int)
parser.add_argument('--min-texts', dest='min_texts',
help=constants.REPORT_MINIMUM_TEXT_HELP,
metavar='COUNT', type=int)
parser.add_argument('--max-texts', dest='max_texts',
help=constants.REPORT_MAXIMUM_TEXT_HELP,
metavar='COUNT', type=int)
parser.add_argument('--reciprocal', action='store_true',
help=constants.REPORT_RECIPROCAL_HELP)
parser.add_argument('--reduce', action='store_true',
help=constants.REPORT_REDUCE_HELP)
parser.add_argument('--remove', help=constants.REPORT_REMOVE_HELP,
metavar='LABEL', type=str)
parser.add_argument('--sort', action='store_true',
help=constants.REPORT_SORT_HELP)
add_tokenizer_argument(parser)
parser.add_argument('-z', '--zero-fill', dest='zero_fill',
help=constants.REPORT_ZERO_FILL_HELP, metavar='CORPUS')
parser.add_argument('results', help=constants.REPORT_RESULTS_HELP,
metavar='RESULTS')
def generate_search_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to generate search
results for a set of n-grams."""
parser = subparsers.add_parser(
'search', description=constants.SEARCH_DESCRIPTION,
formatter_class=ParagraphFormatter, help=constants.SEARCH_HELP)
parser.set_defaults(func=search_texts)
add_common_arguments(parser)
add_db_arguments(parser)
add_corpus_arguments(parser)
parser.add_argument('-c', '--catalogue', metavar='CATALOGUE',
help=constants.CATALOGUE_CATALOGUE_HELP)
parser.add_argument('ngrams', help=constants.SEARCH_NGRAMS_HELP,
metavar='NGRAMS')
def generate_statistics (args, parser):
corpus = get_corpus(args)
tokenizer = get_tokenizer(args)
report = tacl.StatisticsReport(corpus, tokenizer, args.results)
report.generate_statistics()
report.csv(sys.stdout)
def generate_statistics_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to generate statistics
from a set of results."""
parser = subparsers.add_parser(
'stats', description=constants.STATISTICS_DESCRIPTION,
formatter_class=ParagraphFormatter, help=constants.STATISTICS_HELP)
parser.set_defaults(func=generate_statistics)
add_common_arguments(parser)
add_corpus_arguments(parser)
parser.add_argument('results', help=constants.STATISTICS_RESULTS_HELP,
metavar='RESULTS')
def generate_strip_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to process original
texts for use with the tacl ngrams command."""
parser = subparsers.add_parser(
'strip', description=constants.STRIP_DESCRIPTION,
epilog=constants.STRIP_EPILOG, formatter_class=ParagraphFormatter,
help=constants.STRIP_HELP)
parser.set_defaults(func=strip_texts)
add_common_arguments(parser)
parser.add_argument('input', help=constants.STRIP_INPUT_HELP,
metavar='INPUT')
parser.add_argument('output', help=constants.STRIP_OUTPUT_HELP,
metavar='OUTPUT')
def generate_supplied_diff_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to run a diff query using
the supplied results sets."""
parser = subparsers.add_parser(
'sdiff', description=constants.SUPPLIED_DIFF_DESCRIPTION,
epilog=constants.SUPPLIED_DIFF_EPILOG,
formatter_class=ParagraphFormatter, help=constants.SUPPLIED_DIFF_HELP)
parser.set_defaults(func=supplied_diff)
add_common_arguments(parser)
add_db_arguments(parser, True)
add_supplied_query_arguments(parser)
def generate_supplied_intersect_subparser (subparsers):
"""Adds a sub-command parser to `subparsers` to run an intersect query
using the supplied results sets."""
parser = subparsers.add_parser(
'sintersect', description=constants.SUPPLIED_INTERSECT_DESCRIPTION,
epilog=constants.SUPPLIED_INTERSECT_EPILOG,
formatter_class=ParagraphFormatter,
help=constants.SUPPLIED_INTERSECT_HELP)
parser.set_defaults(func=supplied_intersect)
add_common_arguments(parser)
add_db_arguments(parser, True)
add_supplied_query_arguments(parser)
def get_corpus (args):
"""Returns a `tacl.Corpus`."""
tokenizer = get_tokenizer(args)
return tacl.Corpus(args.corpus, tokenizer)
def get_catalogue (path):
"""Returns a `tacl.Catalogue`."""
catalogue = tacl.Catalogue()
catalogue.load(path)
return catalogue
def get_data_store (args):
"""Returns a `tacl.DataStore`."""
return tacl.DataStore(args.db, args.memory, args.ram)
def get_input_fh (arg):
"""Returns an open file of CSV data, or None if `arg` is None."""
input_fh = None
if arg:
input_fh = open(arg, 'r', encoding='utf-8', newline='')
return input_fh
def get_tokenizer (args):
return tacl.Tokenizer(*constants.TOKENIZERS[args.tokenizer])
def highlight_text (args, parser):
"""Outputs the result of highlighting a text."""
tokenizer = get_tokenizer(args)
corpus = get_corpus(args)
highlighter = tacl.Highlighter(corpus, tokenizer)
text = highlighter.highlight(args.results, args.base_name, args.base_siglum)
print(text)
def ngram_counts (args, parser):
"""Outputs the results of performing a counts query."""
store = get_data_store(args)
corpus = get_corpus(args)
catalogue = get_catalogue(args.catalogue)
store.validate(corpus, catalogue)
store.counts(catalogue, sys.stdout)
def ngram_diff (args, parser):
"""Outputs the results of performing a diff query."""
store = get_data_store(args)
corpus = get_corpus(args)
catalogue = get_catalogue(args.catalogue)
store.validate(corpus, catalogue)
if args.asymmetric:
store.diff_asymmetric(catalogue, args.asymmetric, sys.stdout)
else:
store.diff(catalogue, sys.stdout)
def ngram_intersection (args, parser):
"""Outputs the results of performing an intersection query."""
store = get_data_store(args)
corpus = get_corpus(args)
catalogue = get_catalogue(args.catalogue)
store.validate(corpus, catalogue)
store.intersection(catalogue, sys.stdout)
def prepare_xml (args, parser):
"""Prepares XML texts for stripping.
This process creates a single, normalised TEI XML file for each
text.
"""
corpus = tacl.TEICorpus(args.input, args.output)
corpus.tidy()
def report (args, parser):
if args.results == '-':
results = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8',
newline='')
else:
results = open(args.results, 'r', encoding='utf-8', newline='')
tokenizer = get_tokenizer(args)
report = tacl.Report(results, tokenizer)
if args.extend:
corpus = tacl.Corpus(args.extend, tokenizer)
report.extend(corpus)
if args.reduce:
report.reduce()
if args.reciprocal:
report.reciprocal_remove()
if args.zero_fill:
if not args.catalogue:
parser.error('The zero-fill option requires that the -c option also be supplied.')
corpus = tacl.Corpus(args.zero_fill, tokenizer)
catalogue = get_catalogue(args.catalogue)
report.zero_fill(corpus, catalogue)
if args.min_texts or args.max_texts:
report.prune_by_text_count(args.min_texts, args.max_texts)
if args.min_size or args.max_size:
report.prune_by_ngram_size(args.min_size, args.max_size)
if args.min_count or args.max_count:
report.prune_by_ngram_count(args.min_count, args.max_count)
if args.remove:
report.remove_label(args.remove)
if args.sort:
report.sort()
report.csv(sys.stdout)
def search_texts (args, parser):
"""Searches texts for presence of n-grams."""
store = get_data_store(args)
corpus = get_corpus(args)
catalogue = tacl.Catalogue()
if args.catalogue:
catalogue.load(args.catalogue)
store.validate(corpus, catalogue)
with open(args.ngrams, 'r', encoding='utf-8') as fh:
ngrams = [ngram.strip() for ngram in fh.readlines()]
store.search(catalogue, ngrams, sys.stdout)
def strip_texts (args, parser):
"""Processes prepared XML texts for use with the tacl ngrams
command."""
stripper = tacl.Stripper(args.input, args.output)
stripper.strip_files()
def supplied_diff (args, parser):
labels = args.labels
results = args.supplied
store = get_data_store(args)
store.diff_supplied(results, labels, sys.stdout)
def supplied_intersect (args, parser):
labels = args.labels
results = args.supplied
store = get_data_store(args)
store.intersection_supplied(results, labels, sys.stdout)
PK &F\[}
}
tacl/command/formatters.pyimport argparse
import re
import textwrap
class ParagraphFormatter (argparse.ArgumentDefaultsHelpFormatter):
"""argparse formatter to maintain paragraph breaks in text, while
wrapping those blocks.
Code minimally adapted from the patch at
http://bugs.python.org/file28091, authored by rurpy2.
"""
def _split_lines (self, text, width):
return self._para_reformat(text, width, multiline=True)
def _fill_text (self, text, width, indent):
lines = self._para_reformat(text, width, indent, True)
return '\n'.join(lines)
def _para_reformat (self, text, width, indent='', multiline=False):
new_lines = list()
main_indent = len(re.match(r'( *)', text).group(1))
def blocker (text):
"""On each call yields 2-tuple consisting of a boolean and
the next block of text from 'text'. A block is either a
single line, or a group of contiguous lines. The former
is returned when not in multiline mode, the text in the
line was indented beyond the indentation of the first
line, or it was a blank line (the latter two jointly
referred to as "no-wrap" lines). A block of concatenated
text lines up to the next no-wrap line is returned when
in multiline mode. The boolean value indicates whether
text wrapping should be done on the returned text."""
block = list()
for line in text.splitlines():
line_indent = len(re.match(r'( *)', line).group(1))
isindented = line_indent - main_indent > 0
isblank = re.match(r'\s*$', line)
if isblank or isindented:
# A no-wrap line.
if block:
# Yield previously accumulated block of text
# if any, for wrapping.
yield True, ''.join(block)
block = list()
# And now yield our no-wrap line.
yield False, line
else:
# We have a regular text line.
if multiline:
# In multiline mode accumulate it.
block.append(line)
else:
# Not in multiline mode, yield it for
# wrapping.
yield True, line
if block:
# Yield any text block left over.
yield (True, ''.join(block))
for wrap, line in blocker(text):
if wrap:
# We have either a single line or a group of
# concatented lines. Either way, we treat them as a
# block of text and wrap them (after reducing multiple
# whitespace to just single space characters).
line = self._whitespace_matcher.sub(' ', line).strip()
# Textwrap will do all the hard work for us.
new_lines.extend(textwrap.wrap(text=line, width=width,
initial_indent=indent,
subsequent_indent=indent))
else:
# The line was a no-wrap one so leave the formatting
# alone.
new_lines.append(line[main_indent:])
return new_lines
PK Fh_# _# tacl/command/old_jitc_script.py"""Command-line script to list texts from one corpus (referred to
below as "Maybe" and defined in a catalogue file) in order of
similarity to each text in that corpus. Takes into account a second
corpus of texts (referred to below as "No" and defined in a catalogue
file) that are similar to those in the first, but not in the way(s)
that are the subject of the investigation.
Given the two corpora, Maybe and No, the script performs the following
actions:
1. For each text Y in Maybe:
1. Run an intersection between Y and No.
2. For each text M in Maybe (excluding Y):
1. Run an intersect between Y and M.
2. Drop Y results.
3. Run a supplied diff between results from [1.2.2] and results from [1.1].
4. Drop results with fewer than 5 matches.
5. Get number of tokens in M.
3. Rank and list texts in Maybe in descending order of the ratio, from
[1.2.4], of matching tokens (n-gram size x count) to total tokens
[1.2.5].
4. Concatenate all results from [1.2.4] files.
"""
import argparse
import csv
import io
import logging
import os
import tacl
from tacl import constants
logger = logging.getLogger('jitc')
class Processor:
def __init__ (self, store, corpus, catalogue, maybe_label, tokenizer,
output_dir):
self._corpus = corpus
self._maybe_label = maybe_label
self._maybe_texts = [text for text, label in catalogue.items()
if label == maybe_label]
self._no_texts = [text for text, label in catalogue.items()
if label != maybe_label]
self._no_label = catalogue[self._no_texts[0]]
self._output_dir = output_dir
self._store = store
self._tokenizer = tokenizer
self._ratios = {}
def _drop_no_label_results (self, results, fh, reduce=False):
# Drop results associated with the 'no' label.
results.seek(0)
report = tacl.Report(results, self._tokenizer)
report.remove_label(self._no_label)
if reduce:
report.reduce()
results = report.csv(fh)
def process_maybe_text (self, yes_text, maybe_text, work_dir,
yn_results_path):
if maybe_text == yes_text:
return
logger.debug('Processing "maybe" text {} against "yes" text {}.'.format(
maybe_text, yes_text))
ym_results_path = os.path.join(
work_dir, 'intersect_with_' + maybe_text + '.csv')
catalogue = {yes_text: self._no_label,
maybe_text: self._maybe_label}
self._run_query(ym_results_path, self._store.intersection, [catalogue])
distinct_results_path = os.path.join(
work_dir, 'distinct_' + maybe_text + '.csv')
results = [yn_results_path, ym_results_path]
labels = [self._no_label, self._maybe_label]
self._run_query(distinct_results_path, self._store.diff_supplied,
[results, labels])
stats_path = os.path.join(work_dir, 'stats_' + maybe_text + '.csv')
if not os.path.exists(stats_path):
stats_report = tacl.StatisticsReport(self._corpus, self._tokenizer,
distinct_results_path)
stats_report.generate_statistics()
with open(stats_path, mode='w', encoding='utf-8', newline='') as fh:
stats_report.csv(fh)
with open(stats_path, encoding='utf-8', newline='') as fh:
reader = csv.DictReader(fh)
ratio_data = {}
for row in reader:
ratio_data[row[constants.SIGLUM_FIELDNAME]] = row[constants.PERCENTAGE_FIELDNAME]
self._ratios[yes_text].append((maybe_text, ratio_data))
def process_yes_text (self, yes_text, no_catalogue):
logger.debug('Processing "maybe" text {} as "yes".'.format(yes_text))
self._ratios[yes_text] = []
yes_work_dir = os.path.join(self._output_dir, yes_text)
os.makedirs(yes_work_dir, exist_ok=True)
results_path = os.path.join(yes_work_dir, 'intersect_with_no.csv')
self._run_query(results_path, self._store.intersection, [no_catalogue])
for maybe_text in self._maybe_texts:
self.process_maybe_text(yes_text, maybe_text, yes_work_dir,
results_path)
def process_yes_texts (self):
no_catalogue = {text: self._no_label for text in self._no_texts}
for yes_text in self._maybe_texts:
no_catalogue[yes_text] = self._maybe_label
self.process_yes_text(yes_text, no_catalogue)
no_catalogue.pop(yes_text)
for texts in self._ratios.values():
logger.debug(texts)
texts.sort(key=lambda x: max([0] + [float(ratio) for ratio
in x[1].values()]), reverse=True)
with open(os.path.join(self._output_dir, 'groupings.txt'), mode='w') \
as fh:
for main_text, group_data in self._ratios.items():
fh.write('{}:\n'.format(main_text))
for related_text, related_text_data in group_data:
fh.write(' {} ('.format(related_text))
for witness, ratio in related_text_data.items():
fh.write('{}: {}; '.format(witness, ratio))
fh.write(')\n')
fh.write('\n\n')
def _run_query (self, path, query, query_args):
if os.path.exists(path):
return
output_results = io.StringIO(newline='')
query(*query_args, output_fh=output_results)
with open(path, mode='w', encoding='utf-8', newline='') as fh:
self._drop_no_label_results(output_results, fh)
def main ():
parser = generate_parser()
args = parser.parse_args()
if hasattr(args, 'verbose'):
configure_logging(args.verbose)
store = get_data_store(args)
corpus = get_corpus(args)
catalogue = get_catalogue(args)
tokenizer = get_tokenizer(args)
check_catalogue(catalogue, args.label)
store.validate(corpus, catalogue)
output_dir = os.path.abspath(args.output)
if os.path.exists(output_dir):
logger.warning('Output directory already exists; any results therein '
'will be reused rather than regenerated.')
os.makedirs(output_dir, exist_ok=True)
processor = Processor(store, corpus, catalogue, args.label, tokenizer,
output_dir)
processor.process_yes_texts()
def check_catalogue (catalogue, label):
"""Raise an exception if `catalogue` contains more than two labels, or
if `label` is not used in the `catalogue`."""
labels = set(catalogue.values())
if label not in labels:
raise Exception(
'The specified label "{}" must be present in the catalogue.')
elif len(labels) != 2:
raise Exception('The catalogue must specify only two labels.')
def configure_logging (verbose):
"""Configures the logging used."""
if not verbose:
log_level = logging.WARNING
elif verbose == 1:
log_level = logging.INFO
else:
log_level = logging.DEBUG
logger.setLevel(log_level)
ch = logging.StreamHandler()
ch.setLevel(log_level)
formatter = logging.Formatter(
'%(asctime)s %(name)s %(levelname)s: %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
def generate_parser ():
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--label', required=True)
parser.add_argument('-m', '--memory', action='store_true',
help=constants.DB_MEMORY_HELP)
parser.add_argument('-r', '--ram', default=3, help=constants.DB_RAM_HELP,
type=int)
parser.add_argument('-t', '--tokenizer', choices=constants.TOKENIZER_CHOICES,
default=constants.TOKENIZER_CHOICE_CBETA,
help=constants.DB_TOKENIZER_HELP)
parser.add_argument('-v', '--verbose', action='count',
help=constants.VERBOSE_HELP)
parser.add_argument('db', help=constants.DB_DATABASE_HELP,
metavar='DATABASE')
parser.add_argument('corpus', help=constants.DB_CORPUS_HELP,
metavar='CORPUS')
parser.add_argument('catalogue', help=constants.CATALOGUE_CATALOGUE_HELP,
metavar='CATALOGUE')
parser.add_argument('output', help='Directory to output results into')
return parser
def get_corpus (args):
"""Returns a `tacl.Corpus`."""
tokenizer = get_tokenizer(args)
return tacl.Corpus(args.corpus, tokenizer)
def get_catalogue (args):
"""Returns a `tacl.Catalogue`."""
catalogue = tacl.Catalogue()
catalogue.load(args.catalogue)
return catalogue
def get_data_store (args):
"""Returns a `tacl.DataStore`."""
return tacl.DataStore(args.db, args.memory, args.ram)
def get_tokenizer (args):
return tacl.Tokenizer(*constants.TOKENIZERS[args.tokenizer])
PK \FRE E tacl/command/jitc_script.py"""Command-line script to list texts from one corpus (referred to
below as "Maybe" and defined in a catalogue file) in order of
similarity to each text in that corpus. Takes into account a second
corpus of texts (referred to below as "No" and defined in a catalogue
file) that are similar to those in the first, but not in the way(s)
that are the subject of the investigation.
Given the two corpora, Maybe and No, the script performs the following
actions:
1. For each text Y in Maybe:
1. Run an intersection between Y and No.
2. For each text M in Maybe (excluding Y):
1. Run an intersect between Y and M.
2. Drop Y results.
3. Run a supplied diff between results from [1.2.2] and results from [1.1].
4. Drop results with fewer than 5 matches.
5. Get number of tokens in M.
3. Rank and list texts in Maybe in descending order of the ratio, from
[1.2.4], of matching tokens (n-gram size x count) to total tokens
[1.2.5].
4. Concatenate all results from [1.2.4] files.
"""
import argparse
import logging
import os
import tacl
from tacl import constants
logger = logging.getLogger('tacl')
def main ():
parser = generate_parser()
args = parser.parse_args()
if hasattr(args, 'verbose'):
configure_logging(args.verbose)
store = get_data_store(args)
corpus = get_corpus(args)
catalogue = get_catalogue(args)
tokenizer = get_tokenizer(args)
check_catalogue(catalogue, args.label)
store.validate(corpus, catalogue)
output_dir = os.path.abspath(args.output)
if os.path.exists(output_dir):
logger.warning('Output directory already exists; any results therein '
'will be reused rather than regenerated.')
os.makedirs(output_dir, exist_ok=True)
processor = tacl.JITCProcessor(store, corpus, catalogue, args.label,
tokenizer, output_dir)
processor.process()
def check_catalogue (catalogue, label):
"""Raise an exception if `catalogue` contains more than two labels, or
if `label` is not used in the `catalogue`."""
labels = set(catalogue.values())
if label not in labels:
raise Exception(
'The specified label "{}" must be present in the catalogue.')
elif len(labels) != 2:
raise Exception('The catalogue must specify only two labels.')
def configure_logging (verbose):
"""Configures the logging used."""
if not verbose:
log_level = logging.WARNING
elif verbose == 1:
log_level = logging.INFO
else:
log_level = logging.DEBUG
logger.setLevel(log_level)
ch = logging.StreamHandler()
ch.setLevel(log_level)
formatter = logging.Formatter(
'%(asctime)s %(name)s %(levelname)s: %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
def generate_parser ():
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--label', required=True)
parser.add_argument('-m', '--memory', action='store_true',
help=constants.DB_MEMORY_HELP)
parser.add_argument('-r', '--ram', default=3, help=constants.DB_RAM_HELP,
type=int)
parser.add_argument('-t', '--tokenizer', choices=constants.TOKENIZER_CHOICES,
default=constants.TOKENIZER_CHOICE_CBETA,
help=constants.DB_TOKENIZER_HELP)
parser.add_argument('-v', '--verbose', action='count',
help=constants.VERBOSE_HELP)
parser.add_argument('db', help=constants.DB_DATABASE_HELP,
metavar='DATABASE')
parser.add_argument('corpus', help=constants.DB_CORPUS_HELP,
metavar='CORPUS')
parser.add_argument('catalogue', help=constants.CATALOGUE_CATALOGUE_HELP,
metavar='CATALOGUE')
parser.add_argument('output', help='Directory to output results into')
return parser
def get_corpus (args):
"""Returns a `tacl.Corpus`."""
tokenizer = get_tokenizer(args)
return tacl.Corpus(args.corpus, tokenizer)
def get_catalogue (args):
"""Returns a `tacl.Catalogue`."""
catalogue = tacl.Catalogue()
catalogue.load(args.catalogue)
return catalogue
def get_data_store (args):
"""Returns a `tacl.DataStore`."""
return tacl.DataStore(args.db, args.memory, args.ram)
def get_tokenizer (args):
return tacl.Tokenizer(*constants.TOKENIZERS[args.tokenizer])
PK &F