PK!xJJprosemirror/__init__.pyfrom .model import Fragment, Mark, Node, ResolvedPos, Schema, Slice from .schema.basic import schema as basic_schema from .transform import Mapping, Step, Transform __all__ = [ "Fragment", "Mark", "Node", "ResolvedPos", "Schema", "Slice", "basic_schema", "Mapping", "Step", "Transform", ] PK!׵prosemirror/model/__init__.pyfrom .content import ContentMatch from .fragment import Fragment from .mark import Mark from .node import Node from .replace import ReplaceError, Slice from .resolvedpos import NodeRange, ResolvedPos from .schema import MarkType, NodeType, Schema __all__ = [ "Node", "ResolvedPos", "NodeRange", "Fragment", "Slice", "ReplaceError", "Mark", "Schema", "NodeType", "MarkType", "ContentMatch", ] PK!c** prosemirror/model/comparedeep.pydef compare_deep(a, b): return a == b PK!lLU1U1prosemirror/model/content.pyimport re from functools import reduce from .fragment import Fragment class ContentMatch: def __init__(self, valid_end): self.valid_end = valid_end self.next = [] self.wrap_cache = [] @classmethod def parse(cls, string, node_types): stream = TokenStream(string, node_types) if stream.next is None: return ContentMatch.empty expr = parse_expr(stream) if stream.next: stream.err("Unexpected traling text") match = dfa(nfa(expr)) check_for_dead_ends(match, stream) return match def match_type(self, type, *args): for i in range(0, len(self.next), 2): if self.next[i].name == type.name: return self.next[i + 1] return None def match_fragment(self, frag, start=0, end=None): if end is None: end = frag.child_count cur = self i = start while cur and i < end: cur = cur.match_type(frag.child(i).type) i += 1 return cur @property def inline_content(self): if not self.next: return None first = self.next[0] return first.is_inline if first else False @property def default_type(self): for i in range(0, len(self.next), 2): type = self.next[i] if not (type.is_text or type.has_required_attrs()): return type def compatible(self, other): for i in range(0, len(self.next), 2): for j in range(0, len(other.next), 2): if self.next[i].name == other.next[j].name: return True return False def fill_before(self, after, to_end=False, start_index=0): seen = [self] def search(match, types): nonlocal seen finished = match.match_fragment(after, start_index) if finished and (not to_end or finished.valid_end): return Fragment.from_([tp.create_and_fill() for tp in types]) for i in range(0, len(match.next), 2): type = match.next[i] next = match.next[i + 1] if not (type.is_text or type.has_required_attrs()) and next not in seen: seen.append(next) found = search(next, types + [type]) if found: return found return search(self, []) def find_wrapping(self, target): for i in range(0, len(self.wrap_cache), 2): if self.wrap_cache[i].name == target.name: return self.wrap_cache[i + 1] computed = self.compute_wrapping(target) self.wrap_cache.extend([target, computed]) return computed def compute_wrapping(self, target): seen = {} active = [{"match": self, "type": None, "via": None}] while len(active): current = active.pop(0) match = current["match"] if match.match_type(target): result = [] obj = current while obj["type"]: result.append(obj["type"]) obj = obj["via"] return list(reversed(result)) for i in range(0, len(match.next), 2): type = match.next[i] if ( not type.is_leaf and not type.has_required_attrs() and type.name not in seen and (not current["type"] or match.next[i + 1].valid_end) ): active.append( {"match": type.content_match, "via": current, "type": type} ) seen[type.name] = True @property def edge_count(self): return len(self.next) >> 1 def edge(self, n): i = n << 1 if i > len(self.next): raise ValueError(f"There's no {n}th edge in this content match") return {"type": self.next[i], "next": self.next[i + 1]} def __str__(self): seen = [] def scan(m): nonlocal seen for i in range(1, len(m.next), 2): if m.next[i] in seen: scan(m.next[i]) scan(self) def iteratee(m, i): out = i + ("*" if m.valid_end else " ") + " " for i in range(0, len(m.next), 2): out += ( (", " if i else "") + m.next(i) + "->" + seen.index(m.next[i + 1]) ) return "\n".join((iteratee(m, i)) for m, i in enumerate(seen)) def __repr__(self): return f"ContentMatch<{dict(valid_end=self.valid_end, next=self.next, cache=self.wrap_cache)}>" ContentMatch.empty = ContentMatch(True) TOKEN_REGEX = re.compile(r"\w+|\W") class TokenStream: def __init__(self, string, node_types): self.string = string self.node_types = node_types self.inline = None self.pos = 0 self.tokens = [i for i in TOKEN_REGEX.findall(string) if i.strip()] @property def next(self): try: return self.tokens[self.pos] except IndexError: return None def eat(self, tok): if self.next == tok: pos = self.pos self.pos += 1 return pos or True else: return False def err(self, str): raise SyntaxError(f'{str} (in content expression) "{self.string}"') def parse_expr(stream): exprs = [] while True: exprs.append(parse_expr_seq(stream)) if not stream.eat("|"): break if len(exprs) == 1: return exprs[0] return {"type": "choice", "exprs": exprs} def parse_expr_seq(stream): exprs = [] while True: exprs.append(parse_expr_subscript(stream)) if not (stream.next and stream.next != ")" and stream.next != "|"): break if len(exprs) == 1: return exprs[0] return {"type": "seq", "exprs": exprs} def parse_expr_subscript(stream): expr = parse_expr_atom(stream) while True: if stream.eat("+"): expr = {"type": "plus", "expr": expr} elif stream.eat("*"): expr = {"type": "star", "expr": expr} elif stream.eat("?"): expr = {"type": "opt", "expr": expr} elif stream.eat("{"): expr = parse_expr_range(stream, expr) else: break return expr NUMBER_REGEX = re.compile(r"\D") def parse_num(stream: TokenStream): if NUMBER_REGEX.match(stream.next): stream.err(f'Expected number, got "{stream.next}"') result = int(stream.next) stream.pos += 1 return result def parse_expr_range(stream: TokenStream, expr): min_ = parse_num(stream) max_ = min_ if stream.eat(","): if stream.next != "}": max_ = parse_num(stream) else: max_ = -1 if not stream.eat("}"): stream.err("Unclosed braced range") return {"type": "range", "min": min_, "max": max_, "expr": expr} def resolve_name(stream: TokenStream, name): types = stream.node_types type = types.get(name) if type: return [type] result = [] for _, type in types.items(): if name in type.groups: result.append(type) if not result: stream.err(f'No node type or group "{name}" found') return result def parse_expr_atom(stream: TokenStream): if stream.eat("("): expr = parse_expr(stream) if not stream.eat(")"): stream.err("missing closing patren") return expr elif not re.match(r"\W", stream.next): def iteratee(type): nonlocal stream if stream.inline is None: stream.inline = type.is_inline elif stream.inline != type.is_inline: stream.err("Mixing inline and block content") return {"type": "name", "value": type} exprs = [iteratee(type) for type in resolve_name(stream, stream.next)] stream.pos += 1 if len(exprs) == 1: return exprs[0] return {"type": "choice", "exprs": exprs} else: stream.err(f'Unexpected token "{stream.next}"') def nfa(expr): nfa_ = [[]] def node(): nonlocal nfa_ nfa_.append([]) return len(nfa_) - 1 def edge(from_, to=None, term=None): nonlocal nfa_ edge = {"term": term, "to": to} nfa_[from_].append(edge) return edge def connect(edges, to): for edge in edges: edge["to"] = to def compile(expr, from_): if expr["type"] == "choice": return list( reduce(lambda out, expr: out + compile(expr, from_), expr["exprs"], []) ) elif expr["type"] == "seq": i = 0 while True: next = compile(expr["exprs"][i], from_) if i == len(expr["exprs"]) - 1: return next from_ = node() connect(next, from_) i += 1 elif expr["type"] == "star": loop = node() edge(from_, loop) connect(compile(expr["expr"], loop), loop) return [edge(loop)] elif expr["type"] == "plus": loop = node() connect(compile(expr["expr"], from_), loop) connect(compile(expr["expr"], loop), loop) return [edge(loop)] elif expr["type"] == "opt": return [edge(from_)] + compile(expr["expr"], from_) elif expr["type"] == "range": cur = from_ for i in range(expr["min"]): next = node() connect(compile(expr["expr"], cur), next) cur = next if expr["max"] == -1: connect(compile(expr["expr"], cur), cur) else: for i in range(expr["min"], expr["max"]): next = node() edge(cur, next) connect(compile(expr["expr"], cur), next) cur = next return [edge(cur)] elif expr["type"] == "name": return [edge(from_, None, expr["value"])] connect(compile(expr, 0), node()) return nfa_ def cmp(a, b): return a - b def null_from(nfa, node): result = [] def scan(n): nonlocal result edges = nfa[n] if len(edges) == 1 and not edges[0].get("term"): return scan(edges[0].get("to")) result.append(n) for edge in edges: term, to = edge.get("term"), edge.get("to") if not term and to not in result: scan(to) scan(node) return sorted(result) def dfa(nfa): labeled = {} def explore(states): nonlocal labeled out = [] for node in states: for item in nfa[node]: term, to = item.get("term"), item.get("to") if not term: continue known = term in out if known: set = out[out.index(term) + 1] else: set = False for n in null_from(nfa, to): if not set: set = [] out.extend([term, set]) if n not in set: set.append(n) state = ContentMatch((len(nfa) - 1) in states) labeled[",".join([str(s) for s in states])] = state for i in range(0, len(out), 2): states = sorted(out[i + 1]) find_by_key = ",".join(str(s) for s in states) items_to_extend = [out[i], labeled.get(find_by_key) or explore(states)] state.next.extend(items_to_extend) return state return explore(null_from(nfa, 0)) def check_for_dead_ends(match, stream): work = [match] i = 0 while i < len(work): state = work[i] dead = not state.valid_end nodes = [] for j in range(0, len(state.next), 2): node = state.next[j] next = state.next[j + 1] nodes.append(node.name) if dead and not (node.is_text or node.has_required_attrs()): dead = False if next not in work: work.append(next) if dead: stream.err( f'Only non-generatable nodes ({", ".join(nodes)}) in a required position' ) i += 1 PK!7j֫ prosemirror/model/diff.pyfrom prosemirror.utils import text_length def find_diff_start(a, b, pos): i = 0 while True: if a.child_count == i or b.child_count == i: return None if a.child_count == b.child_count else pos child_a, child_b = a.child(i), b.child(i) if child_a == child_b: pos += child_a.node_size continue if not child_a.same_markup(child_b): return pos if child_a.is_text and child_a.text != child_b.text: if child_b.text.startswith(child_a.text): return pos + text_length(child_a.text) if child_a.text.startswith(child_b.text): return pos + text_length(child_b.text) next_index = next( ( index_a for ((index_a, char_a), (_, char_b)) in zip( enumerate(child_a.text), enumerate(child_b.text) ) if char_a != char_b ), None, ) if next_index is not None: return pos + next_index if child_a.content.size or child_b.content.size: inner = find_diff_start(child_a.content, child_b.content, pos + 1) if inner: return inner pos += child_a.node_size i += 1 def find_diff_end(a, b, pos_a, pos_b): i_a, i_b = a.child_count, b.child_count while True: if i_a == 0 or i_b == 0: if i_a == i_b: return None else: return {"a": pos_a, "b": pos_b} i_a -= 1 i_b -= 1 child_a, child_b = a.child(i_a), b.child(i_b) size = child_a.node_size if child_a == child_b: pos_a -= size pos_b -= size continue if not child_a.same_markup(child_b): return {"a": pos_a, "b": pos_b} if child_a.is_text and child_a.text != child_b.text: same, min_size = ( 0, min(text_length(child_a.text), text_length(child_b.text)), ) while ( same < min_size and child_a.text[text_length(child_a.text) - same - 1] == child_b.text[text_length(child_b.text) - same - 1] ): same += 1 pos_a -= 1 pos_b -= 1 return {"a": pos_a, "b": pos_b} if child_a.content.size or child_b.content.size: inner = find_diff_end( child_a.content, child_b.content, pos_a - 1, pos_b - 1 ) if inner: return inner pos_a -= size pos_b -= size PK!y>prosemirror/model/fragment.pyfrom typing import Iterable from prosemirror.utils import text_length from .diff import find_diff_end, find_diff_start def retIndex(index, offset): return {"index": index, "offset": offset} class Fragment: def __init__(self, content, size=None): self.content = content self.size = size if size is None: self.size = sum(c.node_size for c in content) def nodes_between(self, from_, to, f, node_start=0, parent=None): i = 0 pos = 0 while pos < to: child = self.content[i] end = pos + child.node_size if ( end > from_ and f(child, node_start + pos, parent, i) is not False and getattr(child.content, "size", None) ): start = pos + 1 child.nodes_between( max(0, from_ - start), min(child.content.size, to - start), f, node_start + start, ) pos = end i += 1 def descendants(self, f): self.nodes_between(0, self.size, f) def text_between(self, from_, to, block_separator="", leaf_text=""): text, separated = "", True def iteratee(node, pos, *args): nonlocal text nonlocal separated if node.is_text: text += node.text[max(from_, pos) - pos : to - pos] separated = not block_separator elif node.is_leaf and leaf_text: text += leaf_text separated = not block_separator elif not separated and node.is_block: text += block_separator separated = True self.nodes_between(from_, to, iteratee, 0) return text def append(self, other): if not other.size: return self if not self.size: return other last, first, content, i = ( self.last_child, other.first_child, self.content.copy(), 0, ) if last.is_text and last.same_markup(first): content[len(content) - 1] = last.with_text(last.text + first.text) i = 1 while i < len(other.content): content.append(other.content[i]) i += 1 return Fragment(content, self.size + other.size) def cut(self, from_, to=None): if to is None: to = self.size if from_ == 0 and to == self.size: return self result, size = [], 0 if to <= from_: return Fragment(result, size) i, pos = 0, 0 while pos < to: child = self.content[i] end = pos + child.node_size if end > from_: if pos < from_ or end > to: if child.is_text: child = child.cut( max(0, from_ - pos), min(text_length(child.text), to - pos) ) else: child = child.cut( max(0, from_ - pos - 1), min(child.content.size, to - pos - 1), ) result.append(child) size += child.node_size pos = end i += 1 return Fragment(result, size) def cut_by_index(self, from_, to=None): if from_ == to: return Fragment.empty if from_ == 0 and to == len(self.content): return self return Fragment(self.content[from_:to]) def replace_child(self, index, node): current = self.content[index] if current == node: return self copy = self.content.copy() size = self.size + node.node_size - current.node_size copy[index] = node return Fragment(copy, size) def add_to_start(self, node): return Fragment([node] + self.content, self.size + node.node_size) def add_to_end(self, node): return Fragment(self.content + [node], self.size + node.node_size) def eq(self, other): if len(self.content) != len(other.content): return False return all(a.eq(b) for (a, b) in zip(self.content, other.content)) @property def first_child(self): return self.content[0] if self.content else None @property def last_child(self): return self.content[-1] if self.content else None @property def child_count(self): return len(self.content) def child(self, index): return self.content[index] def maybe_child(self, index): try: return self.content[index] except IndexError: return None def for_each(self, f): i = 0 p = 0 while i < len(self.content): child = self.content[i] f(child, p, i) p += child.node_size i += 1 def find_diff_start(self, other, pos=0): return find_diff_start(self, other, pos) def find_diff_end(self, other, pos=None, other_pos=None): if pos is None: pos = self.size if other_pos is None: other_pos = other.size return find_diff_end(self, other, pos, other_pos) def find_index(self, pos, round=-1): if pos == 0: return retIndex(0, pos) if pos == self.size: return retIndex(len(self.content), pos) if pos > self.size or pos < 0: raise ValueError(f"Position {pos} outside of fragment ({self})") i = 0 cur_pos = 0 while True: cur = self.child(i) end = cur_pos + cur.node_size if end >= pos: if end == pos or round > 0: return retIndex(i + 1, end) return retIndex(i, cur_pos) i += 1 cur_pos = end def to_json(self): if self.content: return [item.to_json() for item in self.content] @classmethod def from_json(cls, schema, value): if not value: return cls.empty if isinstance(value, str): import json value = json.loads(value) if not isinstance(value, list): raise ValueError("Invalid input for Fragment.from_json") return cls([schema.node_from_json(item) for item in value]) @classmethod def from_array(cls, array): if not array: return cls.empty joined, size = None, 0 for i in range(len(array)): node = array[i] size += node.node_size if i and node.is_text and array[i - 1].same_markup(node): if not joined: joined = array[0:i] joined[-1] = node.with_text(joined[-1].text + node.text) elif joined: joined.append(node) return cls(joined or array, size) @classmethod def from_(cls, nodes): if not nodes: return cls.empty if isinstance(nodes, cls): return nodes if isinstance(nodes, Iterable): return cls.from_array(list(nodes)) if hasattr(nodes, "attrs"): return cls([nodes], nodes.node_size) raise ValueError(f"cannot convert {nodes} to a fragment") def to_string_inner(self): return ", ".join([str(i) for i in self.content]) def __str__(self): return f"<{self.to_string_inner()}>" def __repr__(self): return f"<{self.__class__.__name__} {self.__str__()}>" Fragment.empty = Fragment([], 0) PK!+sprosemirror/model/mark.pyclass Mark: def __init__(self, type, attrs): self.type = type self.attrs = attrs def add_to_set(self, set): copy = None placed = False for i in range(len(set)): other = set[i] if self.eq(other): return set if self.type.excludes(other.type): if copy is None: copy = set[0:i] elif other.type.excludes(self.type): return set else: if not placed and other.type.rank > self.type.rank: if copy is None: copy = set[0:i] copy.append(self) placed = True if copy: copy.append(other) if copy is None: copy = set[:] if not placed: copy.append(self) return copy def remove_from_set(self, set): return [item for item in set if not item.eq(self)] def is_in_set(self, set): return any(item.eq(self) for item in set) def eq(self, other): if self == other: return True return self.type.name == other.type.name and self.attrs == other.attrs def to_json(self): return {"type": self.type.name, "attrs": self.attrs} @classmethod def from_json(cls, schema, json_data): if isinstance(json_data, str): import json json_data = json.loads(json_data) if not json_data: raise ValueError("Invalid input for Mark.fromJSON") type = schema.marks.get(json_data["type"]) if not type: raise ValueError(f"There is not mark type {type} in this schema") return type.create(json_data.get("attrs")) @classmethod def same_set(cls, a, b): if a == b: return True if len(a) != len(b): return False return all(item_a.eq(item_b) for (item_a, item_b) in zip(a, b)) @classmethod def set_from(cls, marks): if not marks: return cls.none if isinstance(marks, cls): return [marks] copy = marks[:] return sorted(copy, key=lambda item: item.type.rank) Mark.none = [] PK!XZ((prosemirror/model/node.pyfrom icu import UnicodeString from prosemirror.utils import text_length from .comparedeep import compare_deep from .fragment import Fragment from .mark import Mark from .replace import Slice, replace from .resolvedpos import ResolvedPos empty_attrs = {} class Node: def __init__(self, type, attrs, content: Fragment, marks): self.type = type self.attrs = attrs self.content = content or Fragment.empty self.marks = marks or Mark.none @property def node_size(self): return 1 if self.is_leaf else 2 + self.content.size @property def child_count(self): return self.content.child_count def child(self, index): return self.content.child(index) def maybe_child(self, index): return self.content.maybe_child(index) def for_each(self, f): self.content.for_each(f) def nodes_between(self, from_, to, f, start_pos=0): self.content.nodes_between(from_, to, f, start_pos, self) def descendants(self, f): self.nodes_between(0, self.content.size, f) @property def text_content(self): return self.text_between(0, self.content.size, "") def text_between(self, from_, to, block_separator="", leaf_text=""): return self.content.text_between(from_, to, block_separator, leaf_text) @property def first_child(self): return self.content.first_child @property def last_child(self): return self.content.last_child def eq(self, other: "Node"): return self == other or ( self.same_markup(other) and self.content.eq(other.content) ) def same_markup(self, other: "Node"): return self.has_markup(other.type, other.attrs, other.marks) def has_markup(self, type, attrs, marks=None): return ( self.type.name == type.name and (compare_deep(self.attrs, attrs or type.default_attrs or empty_attrs)) and (Mark.same_set(self.marks, marks or Mark.none)) ) def copy(self, content=None): if content == self.content: return self return self.__class__(self.type, self.attrs, content, self.marks) def mark(self, marks): if marks == self.marks: return self return self.__class__(self.type, self.attrs, self.content, marks) def cut(self, from_, to): if from_ == 0 and to == self.content.size: return self return self.copy(self.content.cut(from_, to)) def slice(self, from_, to=None, include_parents=False): if to is None: to = self.content.size if from_ == to: return Slice.empty from__ = self.resolve(from_) to_ = self.resolve(to) depth = 0 if include_parents else from__.shared_depth(to) start = from__.start(depth) node = from__.node(depth) content = node.content.cut(from__.pos - start, to_.pos - start) return Slice(content, from__.depth - depth, to_.depth - depth) def replace(self, from_, to, slice): return replace(self.resolve(from_), self.resolve(to), slice) def node_at(self, pos): node = self while True: index_info = node.content.find_index(pos) index, offset = index_info["index"], index_info["offset"] node = node.maybe_child(index) if not node: return None if offset == pos or node.is_text: return node pos -= offset + 1 def child_after(self, pos): index_info = self.content.find_index(pos) index, offset = index_info["index"], index_info["offset"] return { "node": self.content.maybe_child(index), "index": index, "offset": offset, } def child_before(self, pos): if pos == 0: return {"node": None, "index": 0, "offset": 0} index_info = self.content.find_index(pos) index, offset = index_info["index"], index_info["offset"] if offset < pos: return {"node": self.content.child(index), "index": index, "offset": offset} node = self.content.child(index - 1) return {"node": node, "index": index - 1, "offset": offset - node.node_size} def resolve(self, pos): return ResolvedPos.resolve_cached(self, pos) def resolve_no_cache(self, pos): return ResolvedPos.resolve(self, pos) def range_has_mark(self, from_, to, type): found = False if to > from_: def iteratee(node): nonlocal found if type.is_in_set(node.marks): found = True return not found self.nodes_between(from_, to, iteratee) return found @property def is_block(self): return self.type.is_block @property def is_text_block(self): return self.type.is_text_block @property def inline_content(self): return self.type.inline_content @property def is_inline(self): return self.type.is_inline @property def is_text(self): return self.type.is_text @property def is_leaf(self): return self.type.is_leaf @property def is_atom(self): return self.type.is_atom def __str__(self): to_debug_string = ( self.type.spec["toDebugString"] if "toDebugString" in self.type.spec else None ) if to_debug_string: return to_debug_string(self) name = self.type.name if self.content.size: name += f"({self.content.to_string_inner()})" return wrap_marks(self.marks, name) def __repr__(self): return f"<{self.__class__.__name__} {self.__str__()}>" def content_match_at(self, index): match = self.type.content_match.match_fragment(self.content, 0, index) if not match: raise ValueError("Called contentMatchAt on a node with invalid content") return match def can_replace(self, from_, to, replacement=Fragment.empty, start=0, end=None): if end is None: end = replacement.child_count one = self.content_match_at(from_).match_fragment(replacement, start, end) two = False if one: two = one.match_fragment(self.content, to) if not two or not two.valid_end: return False for i in range(start, end): if not self.type.allows_marks(replacement.child(i).marks): return False return True def can_replace_with(self, from_, to, type, marks=None): if marks and not self.type.allows_marks(marks): return False start = self.content_match_at(from_).match_type(type) end = False if start: end = start.match_fragment(self.content, to) return end.valid_end if end else False def can_append(self, other): if other.content.size: return self.can_replace(self.child_count, self.child_before, other.content) else: return self.type.compatible_content(other.type) def default_content_type(self, at): return self.content_match_at(at).default_type def check(self): if not self.type.valid_content(self.content): raise ValueError( f"Invalid content for node {self.type.name}: {str(self.content)[:50]}" ) return self.content.for_each(lambda node: node.check()) def to_json(self): obj = {"type": self.type.name} for _ in self.attrs: obj["attrs"] = self.attrs break if getattr(self.content, "size", None): obj["content"] = self.content.to_json() if len(self.marks): obj["marks"] = [n.to_json() for n in self.marks] return obj @classmethod def from_json(cls, schema, json_data): if isinstance(json_data, str): import json json_data = json.loads(json_data) if not json_data: raise ValueError("Invalid input for Node.from_json") marks = None if json_data.get("marks"): if not isinstance(json_data["marks"], list): raise ValueError("Invalid mark data for Node.fromJSON") marks = [schema.mark_from_json(item) for item in json_data["marks"]] if json_data["type"] == "text": return schema.text(json_data["text"], marks) content = Fragment.from_json(schema, json_data.get("content")) return schema.node_type(json_data["type"]).create( json_data.get("attrs"), content, marks ) class TextNode(Node): def __init__(self, type, attrs, content, marks): super().__init__(type, attrs, None, marks) if not content: raise ValueError("Empty text nodes are not allowed") self.text = content def __str__(self): import json to_debug_string = ( self.type.spec["toDebugString"] if "toDebugString" in self.type.spec else None ) if to_debug_string: return to_debug_string(self) return wrap_marks(self.marks, json.dumps(self.text)) @property def text_content(self): return self.text def text_between(self, from_, to): return self.text[from_:to] @property def node_size(self): return text_length(self.text) def mark(self, marks): return ( self if marks == self.marks else TextNode(self.type, self.attrs, self.text, marks) ) def with_text(self, text): if text == self.text: return self return TextNode(self.type, self.attrs, text, self.marks) def cut(self, from_=0, to=None): if to is None: to = text_length(self.text) if from_ == 0 and to == text_length(self.text): return self return self.with_text(str(UnicodeString(self.text)[from_:to])) def eq(self, other): return self.same_markup(other) and self.text == getattr(other, "text", None) def to_json(self): base = super().to_json() base["text"] = self.text return base def wrap_marks(marks, str): i = len(marks) - 1 while i >= 0: str = marks[i].type.name + "(" + str + ")" i -= 1 return str PK!+fprosemirror/model/replace.pyfrom .fragment import Fragment class ReplaceError(ValueError): pass def remove_range(content, from_, to): from_index_info = content.find_index(from_) index, offset = from_index_info["index"], from_index_info["offset"] child = content.maybe_child(index) to_index_info = content.find_index(to) index_to, offset_to = to_index_info["index"], to_index_info["offset"] if offset == from_ or child.is_text: if offset_to != to and not content.child(index_to).is_text: raise ValueError("removing non-flat range") return content.cut(0, from_).append(content.cut(to)) if index != index_to: raise ValueError("removing non-flat range") return content.replace_child( index, child.copy(remove_range(child.content, from_ - offset - 1, to - offset - 1)), ) def insert_into(content, dist, insert, parent): a = content.find_index(dist) index, offset = a["index"], a["offset"] child = content.maybe_child(index) if offset == dist or child.is_text: if parent and not parent.can_replace(index, index, insert): return None return content.cut(0, dist).append(insert).append(content.cut(dist)) inner = insert_into(child.content, dist - offset - 1, insert, None) if inner: return content.replace_child(index, child.copy(inner)) return None class Slice: def __init__(self, content, open_start, open_end): self.content = content self.open_start = open_start self.open_end = open_end @property def size(self): return self.content.size - self.open_start - self.open_end def insert_at(self, pos, fragment): content = insert_into(self.content, pos + self.open_start, fragment, None) if content: return Slice(content, self.open_start, self.open_end) def remove_between(self, from_, to): return Slice( remove_range(self.content, from_ + self.open_start, to + self.open_start), self.open_start, self.open_end, ) def eq(self, other): return ( self.content.eq(other.content) and self.open_start == other.open_start and self.open_end == other.open_end ) def __str__(self): return f"{self.content}({self.open_start},{self.open_end})" def to_json(self): if not self.content.size: return None json = {"content": self.content.to_json()} if self.open_start > 0: json["openStart"] = self.open_start if self.open_end > 0: json["openEnd"] = self.open_end return json @classmethod def from_json(cls, schema, json_data): if isinstance(json_data, str): import json json_data = json.loads(json) if not json_data: return cls.empty open_start = json_data.get("openStart", 0) or 0 open_end = json_data.get("openEnd", 0) or 0 if not isinstance(open_start, int) or not isinstance(open_end, int): raise ValueError("invalid input for Slice.from_json") return cls( Fragment.from_json(schema, json_data.get("content")), json_data.get("openStart", 0), json_data.get("openEnd", 0), ) @classmethod def max_open(cls, fragment: Fragment, open_isolating=True): open_start = 0 open_end = 0 n = fragment.first_child while n and not n.is_leaf and (open_isolating or n.type.spec.get("isolating")): open_start += 1 n = n.first_child n = fragment.last_child while n and not n.is_leaf and (open_isolating or n.type.spec.get("isolating")): open_end += 1 n = n.last_child return cls(fragment, open_start, open_end) Slice.empty = Slice(Fragment.empty, 0, 0) def replace(from_, to, slice): if slice.open_start > from_.depth: raise ReplaceError("Inserted content deeper than insertion position") if from_.depth - slice.open_start != to.depth - slice.open_end: raise ReplaceError("Inconsistent open depths") return replace_outer(from_, to, slice, 0) def replace_outer(from_, to, slice: Slice, depth): index = from_.index(depth) node = from_.node(depth) if index == to.index(depth) and depth < from_.depth - slice.open_start: inner = replace_outer(from_, to, slice, depth + 1) return node.copy(node.content.replace_child(index, inner)) elif not slice.content.size: return close(node, replace_two_way(from_, to, depth)) elif ( not slice.open_start and not slice.open_end and from_.depth == depth and to.depth == depth ): parent = from_.parent content = parent.content return close( parent, content.cut(0, from_.parent_offset) .append(slice.content) .append(content.cut(to.parent_offset)), ) else: prepare = prepare_slice_for_replace(slice, from_) start, end = prepare["start"], prepare["end"] return close(node, replace_three_way(from_, start, end, to, depth)) def check_join(main, sub): if not sub.type.compatible_content(main.type): raise ReplaceError(f"Cannot join {sub.type.name} onto {main.type.name}") def joinable(before, after, depth): node = before.node(depth) check_join(node, after.node(depth)) return node def add_node(child, target): last = len(target) - 1 if last >= 0 and child.is_text and child.same_markup(target[last]): target[last] = child.with_text(target[last].text + child.text) else: target.append(child) def add_range(start, end, depth, target): node = (end or start).node(depth) start_index = 0 end_index = end.index(depth) if end else node.child_count if start: start_index = start.index(depth) if start.depth > depth: start_index += 1 elif start.text_offset: add_node(start.node_after, target) start_index += 1 i = start_index while i < end_index: add_node(node.child(i), target) i += 1 if end and end.depth == depth and end.text_offset: add_node(end.node_before, target) def close(node, content): if not node.type.valid_content(content): raise ReplaceError(f"Invalid content for node {node.type.name}") return node.copy(content) def replace_three_way(from_, start, end, to, depth): open_start = joinable(from_, start, depth + 1) if from_.depth > depth else False open_end = joinable(end, to, depth + 1) if to.depth > depth else False content = [] add_range(None, from_, depth, content) if open_start and open_end and start.index(depth) == end.index(depth): check_join(open_start, open_end) add_node( close(open_start, replace_three_way(from_, start, end, to, depth + 1)), content, ) else: if open_start: add_node( close(open_start, replace_two_way(from_, start, depth + 1)), content ) add_range(start, end, depth, content) if open_end: add_node(close(open_end, replace_two_way(end, to, depth + 1)), content) add_range(to, None, depth, content) return Fragment(content) def replace_two_way(from_, to, depth): content = [] add_range(None, from_, depth, content) if from_.depth > depth: type = joinable(from_, to, depth + 1) add_node(close(type, replace_two_way(from_, to, depth + 1)), content) add_range(to, None, depth, content) return Fragment(content) def prepare_slice_for_replace(slice: Slice, along): extra = along.depth - slice.open_start parent = along.node(extra) node = parent.copy(slice.content) for i in range(extra - 1, -1, -1): node = along.node(i).copy(Fragment.from_(node)) return { "start": node.resolve_no_cache(slice.open_start + extra), "end": node.resolve_no_cache(node.content.size - slice.open_end - extra), } PK!Lqq prosemirror/model/resolvedpos.pyfrom .mark import Mark class ResolvedPos: def __init__(self, pos, path, parent_offset): self.pos = pos self.path = path self.depth = int(len(path) / 3 - 1) self.parent_offset = parent_offset def resolve_depth(self, val=None): if val is None: return self.depth return self.depth + val if val < 0 else val @property def parent(self): return self.node(self.depth) @property def doc(self): return self.node(0) def node(self, depth): return self.path[self.resolve_depth(depth) * 3] def index(self, depth=None): return self.path[self.resolve_depth(depth) * 3 + 1] def index_after(self, depth): depth = self.resolve_depth(depth) return self.index(depth) + ( 0 if depth == self.depth and not self.text_offset else 1 ) def start(self, depth=None): depth = self.resolve_depth(depth) return 0 if depth == 0 else self.path[depth * 3 - 1] + 1 def end(self, depth=None): depth = self.resolve_depth(depth) return self.start(depth) + self.node(depth).content.size def before(self, depth=None): depth = self.resolve_depth(depth) if not depth: raise ValueError("There is no position before the top level node") return self.pos if depth == self.depth + 1 else self.path[depth * 3 - 1] def after(self, depth=None): depth = self.resolve_depth(depth) if not depth: raise ValueError("There is no position after the top level node") return ( self.pos if depth == self.depth + 1 else self.path[depth * 3 - 1] + self.path[depth * 3].node_size ) @property def text_offset(self): return self.pos - self.path[-1] @property def node_after(self): parent = self.parent index = self.index(self.depth) if index == parent.child_count: return None d_off = self.pos - self.path[-1] child = parent.child(index) return parent.child(index).cut(d_off) if d_off else child @property def node_before(self): index = self.index(self.depth) d_off = self.pos - self.path[-1] if d_off: return self.parent.child(index).cut(0, d_off) return None if index == 0 else self.parent.child(index - 1) def marks(self): parent = self.parent index = self.index() if parent.content.size == 0: return Mark.none if self.text_offset: return parent.child(index).marks main = parent.maybe_child(index - 1) other = parent.maybe_child(index) if not main: main, other = other, main marks = main.marks i = 0 while i < len(marks): if marks[i].type.spec.get("inclusive") is False and ( not other or not marks[i].is_in_set(other.marks) ): marks = marks[i].remove_from_set(marks) i -= 1 i += 1 return marks def marks_across(self, end): after = self.parent.maybe_child(self.index()) if not after or not after.is_inline: return None marks = after.marks next = end.parent.maybe_child(end.index()) i = 0 while i < len(marks): if marks[i].type.spec.get("inclusive") is False and ( not next or not marks[i].is_in_set(next.marks) ): marks = marks[i].remove_from_set(marks) i -= 1 i += 1 return marks def shared_depth(self, pos): depth = self.depth while depth > 0: if self.start(depth) <= pos and self.end(depth) >= pos: return depth depth -= 1 return 0 def block_range(self, other=None, pred=None): if other is None: other = self if other.pos < self.pos: return other.block_range(self) d = self.depth - ( self.parent.inline_content or (1 if self.pos == other.pos else 0) ) while d >= 0: if other.pos <= self.end(d) and (not pred or pred(self.node(d))): return NodeRange(self, other, d) d -= 1 def same_parent(self, other): return self.pos - self.parent_offset == other.pos - other.parent_offset def max(self, other): return other if other.pos > self.pos else self def min(self, other): return other if other.pos < self.pos else self def __str__(self): str = "" for i in range(self.depth): str += (str or "/") + self.node(i).type.name + "_" + self.idnex(i - 1) return str + ":" + self.parent_offset @classmethod def resolve(cls, doc, pos): if not (pos >= 0 and pos <= doc.content.size): raise ValueError(f"Position {pos} out of range") path = [] start = 0 parent_offset = pos node = doc while True: index_info = node.content.find_index(parent_offset) index, offset = index_info["index"], index_info["offset"] rem = parent_offset - offset path.extend([node, index, start + offset]) if not rem: break node = node.child(index) if node.is_text: break parent_offset = rem - 1 start += offset + 1 return cls(pos, path, parent_offset) @classmethod def resolve_cached(cls, doc, pos): # no cache for now return cls.resolve(doc, pos) class NodeRange: def __init__(self, from_, to, depth): self.from_ = from_ self.to = to self.depth = depth @property def start(self): return self.from_.before(self.depth + 1) @property def end(self): return self.to.after(self.depth + 1) @property def parent(self): return self.from_.node(self.depth) @property def start_index(self): return self.from_.index(self.depth) @property def end_index(self): return self.to.index_after(self.depth) PK!tT(T(prosemirror/model/schema.pyfrom collections import OrderedDict from .content import ContentMatch from .fragment import Fragment from .mark import Mark from .node import Node, TextNode def default_attrs(attrs): defaults = {} for attr_name, attr in attrs.items(): if attr.has_default: return None defaults[attr_name] = attr.default return defaults def compute_attrs(attrs, value): built = {} for name in attrs: given = None if value: given = value.get(name) if given is None: attr = attrs.get(name) if attr.has_default: given = attr.default else: raise ValueError("No value supplied for attribute" + name) built[name] = given return built def init_attrs(attrs): result = {} if attrs: for name in attrs: result[name] = Attribute(attrs[name]) return result class NodeType: def __init__(self, name, schema, spec): self.name = name self.schema = schema self.spec = spec self.groups = spec.get("group").split(" ") if "group" in spec else [] self.attrs = init_attrs(spec.get("attrs")) self.default_attrs = default_attrs(self.attrs) self.content_match = None self.mark_set = None self.inline_content = None self.is_block = not (spec.get("inline") or name == "text") self.is_text = name == "text" @property def is_inline(self): return not self.is_block @property def is_text_block(self): return self.is_block and self.inline_content @property def is_leaf(self): return self.content_match == ContentMatch.empty @property def is_atom(self): return self.is_leaf or self.spec.get("atom") def has_required_attrs(self, ignore=None): for n in self.attrs: if self.attrs[n].is_required and (not ignore or not (n in ignore)): return True return False def compatible_content(self, other): return self == other or (self.content_match.compatible(other.content_match)) def compute_attrs(self, attrs): if not attrs and self.default_attrs: return self.default_attrs return compute_attrs(self.attrs, attrs) def create(self, attrs=None, content=None, marks=None): if self.is_text: raise ValueError("NodeType.create cannot construct text nodes") return Node( self, self.compute_attrs(attrs), Fragment.from_(content), Mark.set_from(marks), ) def create_checked(self, attrs, content, marks): content = Fragment.from_(content) if not self.valid_content(content): raise ValueError("Invalid content for node " + self.name) return Node(self, self.compute_attrs(attrs), content, Mark.set_from(marks)) def create_and_fill(self, attrs=None, content=None, marks=None): attrs = self.compute_attrs(attrs) content = Fragment.from_(content) if content.size: before = self.content_match.fill_before(content) if not before: return None content = before.append(content) after = self.content_match.match_fragment(content).fill_before( Fragment.empty, True ) if not after: return None return Node(self, attrs, content.append(after), Mark.set_from(marks)) def valid_content(self, content): result = self.content_match.match_fragment(content) if not result or not result.valid_end: return False for i in range(content.child_count): if not self.allows_marks(content.child(i).marks): return False return True def allows_mark_type(self, mark_type): return self.mark_set is None or mark_type in self.mark_set def allows_marks(self, marks): if self.mark_set is None: return True return all(self.allows_mark_type(mark.type) for mark in marks) def allowed_marks(self, marks): if self.mark_set is None: return marks copy = None for i, mark in enumerate(marks): if not self.allows_mark_type(mark.type): if not copy: copy = marks[0:i] elif copy: copy.append(mark) if copy is None: return marks elif len(copy): return copy else: return Mark.empty @classmethod def compile(cls, nodes: OrderedDict, schema): result = {} for name, spec in nodes.items(): result[name] = NodeType(name, schema, spec) top_node = schema.spec.get("topNode") or "doc" if not result.get(top_node): raise ValueError(f"Schema is missing its top node type {top_node}") if not result.get("text"): raise ValueError("every schema needs a 'text' type") if getattr(result.get("text"), "attrs", {}): raise ValueError("the text node type should not have attributes") return result def __str__(self): return f"" def __repr__(self): return self.__str__() class Attribute: def __init__(self, options): self.has_default = "default" in options self.default = options["default"] if self.has_default else None @property def is_required(self): return not self.has_default class MarkType: def __init__(self, name, rank, schema, spec): self.name = name self.schema = schema self.spec = spec self.attrs = init_attrs(spec.get("attrs")) self.rank = rank self.excluded = None defaults = default_attrs(self.attrs) self.instance = False if defaults: self.instance = Mark(self, defaults) def create(self, attrs=None): if not attrs and self.instance: return self.instance return Mark(self, compute_attrs(self.attrs, attrs)) @classmethod def compile(cls, marks: OrderedDict, schema): result = {} rank = 0 for name, spec in marks.items(): result[name] = MarkType(name, rank, schema, spec) rank += 1 return result def remove_from_set(self, set): return [item for item in set if item.type != self] def is_in_set(self, set): return next((item for item in set if item.type == self), None) def excludes(self, other): return any(other.name == e.name for e in self.excluded) class Schema: def __init__(self, spec): self.spec = {**spec} self.spec["nodes"] = OrderedDict(self.spec["nodes"]) self.spec["marks"] = OrderedDict(self.spec.get("marks", {})) self.nodes = NodeType.compile(self.spec["nodes"], self) self.marks = MarkType.compile(self.spec["marks"], self) content_expr_cache = {} for prop in self.nodes: if prop in self.marks: raise ValueError(f"{prop} can not be both a node and a mark") type = self.nodes[prop] content_expr = type.spec.get("content", "") mark_expr = type.spec.get("marks") if content_expr not in content_expr_cache: content_expr_cache[content_expr] = ContentMatch.parse( content_expr, self.nodes ) type.content_match = content_expr_cache[content_expr] type.inline_content = type.content_match.inline_content if mark_expr == "_": type.mark_set = None elif mark_expr: type.mark_set = gather_marks(self, mark_expr.split(" ")) elif mark_expr == "" or not type.inline_content: type.mark_set = [] else: type.mark_set = None # type.mark_set = None if mark_expr == "_" else { # gather_marks(self, mark_expr.split(" ")) if mark_expr else ( # [] if (mark_expr == "" or not type.inline_content) else None # ) # } for prop in self.marks: type = self.marks.get(prop) excl = type.spec.get("excludes") type.excluded = ( [type] if excl is None else ([] if excl == "" else (gather_marks(self, excl.split(" ")))) ) self.top_node_type = self.nodes.get((self.spec.get("topNode") or "doc")) self.cached = {} self.cached["wrappings"] = {} def node(self, type, attrs=None, content=None, marks=None): if isinstance(type, str): type = self.node_type(type) elif not isinstance(type, NodeType): raise ValueError(f"Invalid node type: {type}") elif type.schema != self: raise ValueError(f"Node type from different schema used ({type.name})") return type.create_checked(attrs, content, marks) def text(self, text, marks=None): type = self.nodes.get("text") return TextNode(type, type.default_attrs, text, Mark.set_from(marks)) def mark(self, type, attrs=None): if isinstance(type, str): type = self.marks[type] return type.create(attrs) def node_from_json(self, json_data): return Node.from_json(self, json_data) def mark_from_json(self, json_data): return Mark.from_json(self, json_data) def node_type(self, name): found = self.nodes.get(name) if not found: raise ValueError(f"Unknown node type: {name}") return found def gather_marks(schema, marks): found = [] for name in marks: mark = schema.marks.get(name) ok = mark if mark: found.append(mark) else: for prop in schema.marks: mark = schema.marks.get(prop) if name == "_" or ( mark.spec.get("group") and name in mark.spec["group"].split(" ") ): ok = mark found.append(mark) if not ok: raise SyntaxError(f"unknow mark type: '{mark}'") return found PK!E$$$prosemirror/schema/basic/__init__.pyfrom .schema_basic import * # noqa PK!l9 (prosemirror/schema/basic/schema_basic.pyfrom prosemirror.model import Schema spec = { "nodes": { "doc": {"content": "block+"}, "paragraph": { "content": "inline*", "group": "block", "parseDOM": [{"tag": "p"}], }, "blockquote": { "content": "block+", "group": "block", "defining": True, "parseDOM": [{"tag": "blockquote"}], }, "horizontal_rule": {"group": "block", "parseDOM": [{"tag": "hr"}]}, "heading": { "attrs": {"level": {"default": 1}}, "content": "inline*", "group": "block", "defining": True, "parseDOM": [ {"tag": "h1", "attrs": {"level": 1}}, {"tag": "h2", "attrs": {"level": 2}}, {"tag": "h3", "attrs": {"level": 3}}, {"tag": "h4", "attrs": {"level": 4}}, {"tag": "h5", "attrs": {"level": 5}}, {"tag": "h6", "attrs": {"level": 6}}, ], }, "code_block": { "content": "text*", "marks": "", "group": "block", "code": True, "defining": True, "parseDOM": [{"tag": "pre", "preserveWhitespace": "full"}], }, "text": {"group": "inline"}, "image": { "inline": True, "attrs": {"src": {}, "alt": {"default": None}, "title": {"default": None}}, "group": "inline", "draggable": True, "parseDOM": [{"tag": "img[src]"}], }, "hard_break": { "inline": True, "group": "inline", "selectable": False, "parseDOM": [{"tag": "br"}], }, "ordered_list": { "attrs": {"order": {"default": 1}}, "parseDOM": [{"tag": "ol"}], "content": "list_item+", "group": "block", }, "bullet_list": { "parseDOM": [{"tag": "ul"}], "content": "list_item+", "group": "block", }, "list_item": { "parseDOM": [{"tag": "li"}], "defining": True, "content": "paragraph block*", }, }, "marks": { "link": { "attrs": {"href": {}, "title": {"default": None}}, "inclusive": False, "parseDOM": [{"tag": "a[href]"}], }, "em": { "parseDOM": [{"tag": "i"}, {"tag": "em"}, {"style": "font-style=italic"}] }, "strong": { "parseDOM": [{"tag": "strong"}, {"tag": "b"}, {"style": "font-weight"}] }, "code": {"parseDOM": [{"tag": "code"}]}, }, } schema = Schema(spec) PK!,###prosemirror/schema/list/__init__.pyfrom .schema_list import * # noqa PK!T&prosemirror/schema/list/schema_list.py# from prosemirror.transform import find_wrapping, lift_target, can_split, ReplaceAroundStep # from prosemirror.model import Slice, Fragment, NodeRange OL_DOM = ["ol", 0] UL_DOM = ["ul", 0] LI_DOM = ["li", 0] orderd_list = {"attrs": {"order": {"default": 1}}, "parseDOM": [{"tag": "ol"}]} bullet_list = {"parseDOM": [{"tag": "ul"}]} list_item = {"parseDOM": [{"tag": "li"}], "defining": True} def add(obj, props): return {**obj, **props} def add_list_nodes(nodes, item_content, list_group): copy = nodes.copy() copy.update( { "ordered_list": add( orderd_list, {"content": "list_item+", "group": list_group} ), "bullet_list": add( bullet_list, {"content": "list_item+", "group": list_group} ), "list_item": add(list_item, {"content": item_content}), } ) return copy PK!s }i$prosemirror/test_builder/__init__.pyfrom prosemirror.model import Schema from prosemirror.schema.basic import schema from prosemirror.schema.list import add_list_nodes from .build import builders test_schema = Schema( { "nodes": add_list_nodes(schema.spec["nodes"], "paragraph block*", "block"), "marks": schema.spec["marks"], } ) out = builders( test_schema, { "p": {"nodeType": "paragraph"}, "pre": {"nodeType": "code_block"}, "h1": {"nodeType": "heading", "level": 1}, "h2": {"nodeType": "heading", "level": 2}, "h3": {"nodeType": "heading", "level": 3}, "li": {"nodeType": "list_item"}, "ul": {"nodeType": "bullet_list"}, "ol": {"nodeType": "ordered_list"}, "br": {"nodeType": "hard_break"}, "img": {"nodeType": "image", "src": "img.png"}, "hr": {"nodeType": "horizontal_rule"}, "a": {"markType": "link", "href": "foo"}, }, ) def eq(a, b): return a.eq(b) PK!;+ !prosemirror/test_builder/build.pyimport re from prosemirror.model import Node NO_TAG = Node.tag = {} def flatten(schema, children, f): result, pos, tag = [], 0, NO_TAG for child in children: if hasattr(child, "tag") and child.tag != NO_TAG: if tag == NO_TAG: tag = {} for id in child.tag: tag[id] = child.tag[id] + (0 if child.is_text else 1) + pos if isinstance(child, dict) and "tag" in child and child["tag"] != Node.tag: if tag == NO_TAG: tag = {} for id in child["tag"]: tag[id] = child["tag"][id] + (0 if "flat" in child else 1) + pos if isinstance(child, str): at = 0 out = "" for m in re.finditer(r"<(\w+)>", child): out += child[at : m.start()] pos += m.start() - at at = m.start() + len(m[0]) if tag == NO_TAG: tag = {} tag[m[1]] = pos out += child[at:] pos += len(child) - at if out: result.append(f(schema.text(out))) elif isinstance(child, dict) and "flat" in child: for item in child["flat"]: node = f(item) pos += node.node_size result.append(node) elif getattr(child, "flat", 0): for item in child.flat: node = f(item) pos += node.node_size result.append(node) else: node = f(child) pos += node.node_size result.append(node) return {"nodes": result, "tag": tag} def id(x): return x def block(type, attrs): def result(*args): my_attrs = attrs if ( args and args[0] and not isinstance(args[0], (str, Node)) and not getattr(args[0], "flat", None) and "flat" not in args[0] ): my_attrs.update(args[0]) args = args[1:] flatten_res = flatten(type.schema, args, id) nodes = flatten_res["nodes"] tag = flatten_res["tag"] node = type.create(my_attrs, nodes) if tag != NO_TAG: node.tag = tag return node if type.is_leaf: try: result.flat = [type.create(attrs)] except ValueError: pass return result def mark(type, attrs): def result(*args): my_attrs = attrs.copy() if ( args and args[0] and not isinstance(args[0], (str, Node)) and not getattr(args[0], "flat", None) and "flat" not in args[0] ): my_attrs.update(args[0]) args = args[1:] mark = type.create(my_attrs) def f(n): return ( n if mark.type.is_in_set(n.marks) else n.mark(mark.add_to_set(n.marks)) ) flatten_res = flatten(type.schema, args, f) return {"flat": flatten_res["nodes"], "tag": flatten_res["tag"]} return result def builders(schema, names): result = {"schema": schema} for name in schema.nodes: result[name] = block(schema.nodes[name], {}) for name in schema.marks: result[name] = mark(schema.marks[name], {}) if names: for name in names: value = names[name] type_name = value.get("nodeType") or value.get("markType") or name type = schema.nodes.get(type_name) if type: result[name] = block(type, value) else: type = schema.marks.get(type_name) if type: result[name] = mark(type, value) return result PK!C  !prosemirror/transform/__init__.pyfrom .map import Mapping, MapResult, StepMap from .mark_step import AddMarkStep, RemoveMarkStep from .replace import replace_step from .replace_step import ReplaceAroundStep, ReplaceStep from .step import Step, StepResult from .structure import ( can_join, can_split, drop_point, find_wrapping, insert_point, join_point, lift_target, ) from .transform import Transform, TransformError __all__ = [ "Transform", "TransformError", "Step", "StepResult", "join_point", "can_join", "can_split", "insert_point", "drop_point", "lift_target", "find_wrapping", "StepMap", "MapResult", "Mapping", "AddMarkStep", "RemoveMarkStep", "ReplaceAroundStep", "ReplaceStep", "replace_step", ] PK!3sprosemirror/transform/map.pylower16 = 0xFFFF factor16 = 2 ** 16 def make_recover(index, offset): return int(index + offset * factor16) def recover_index(value): return int(value & lower16) def recover_offset(value): return int((value - (value & lower16)) / factor16) class MapResult: def __init__(self, pos, deleted=False, recover=None): self.pos = pos self.deleted = deleted self.recover = recover class StepMap: def __init__(self, ranges, inverted=False): self.ranges = ranges self.inverted = inverted def recover(self, value): diff = 0 index = recover_index(value) if not self.inverted: for i in range(index): diff += self.ranges[i * 3 + 2] - self.ranges[i * 3 + 1] return self.ranges[index * 3] + diff + recover_offset(value) def map_result(self, pos, assoc=1): return self._map(pos, assoc, False) def map(self, pos, assoc=1): return self._map(pos, assoc, True) def _map(self, pos, assoc, simple): diff = 0 old_index = 2 if self.inverted else 1 new_index = 1 if self.inverted else 2 for i in range(0, len(self.ranges), 3): start = self.ranges[i] - (diff if self.inverted else 0) if start > pos: break old_size = self.ranges[i + old_index] new_size = self.ranges[i + new_index] end = start + old_size if pos <= end: if not old_size: side = assoc elif pos == start: side = -1 elif pos == end: side = 1 else: side = assoc result = start + diff + (0 if side < 0 else new_size) if simple: return result recover = make_recover(i / 3, pos - start) return MapResult( result, pos != start if assoc < 0 else pos != end, recover ) diff += new_size - old_size return pos + diff if simple else MapResult(pos + diff) def touches(self, pos, recover): diff = 0 index = recover_index(recover) old_index = 2 if self.inverted else 1 new_index = 1 if self.inverted else 2 for i in range(len(self.ranges), 3): start = self.ranges[i] - (diff if self.inverted else 0) if start > pos: break old_size = self.ranges[i + old_index] end = start + old_size if pos <= end and i == index * 3: return True diff += self.ranges[i + new_index] - old_size return False def for_each(self, f): old_index = 2 if self.inverted else 1 new_index = 1 if self.inverted else 2 i = 0 diff = 0 while i < len(self.ranges): start = self.ranges[i] old_start = start - (diff if self.inverted else 0) new_start = start + (0 if self.inverted else diff) old_size = self.ranges[i + old_index] new_size = self.ranges[i + new_index] f(old_start, old_start + old_size, new_start, new_start + new_size) i += 3 def invert(self): return StepMap(self.ranges, not self.inverted) def __str__(self): return ("-" if self.inverted else "") + str(self.ranges) StepMap.empty = StepMap([]) class Mapping: def __init__(self, maps=None, mirror=None, from_=None, to=None): self.maps = maps or [] self.from_ = from_ or 0 self.to = len(self.maps) if to is None else to self.mirror = mirror def slice(self, from_=0, to=None): if to is None: to = len(self.maps) return Mapping(self.maps, self.mirror, from_, to) def copy(self): return Mapping( self.maps[:], (self.mirror[:] if self.mirror else None), self.from_, self.to ) def append_map(self, map, mirrors=None): self.maps.append(map) self.to = len(self.maps) if mirrors is not None: self.set_mirror(len(self.maps) - 1, mirrors) def append_mapping(self, mapping: "Mapping"): i = 0 start_size = len(self.maps) while i < len(mapping.maps): mirr = mapping.get_mirror(i) i += 1 self.append_map( mapping.maps[i], (start_size + mirr) if (mirr is not None and mirr < i) else None, ) def get_mirror(self, n): if self.mirror: for i in range(len(self.mirror)): if (self.mirror[i]) == n: return self.mirror[i + (-1 if i % 2 else 1)] def set_mirror(self, n, m): if not self.mirror: self.mirror = [] self.mirror.extend([n, m]) def append_mapping_inverted(self, mapping: "Mapping"): i = len(mapping.maps) - 1 total_size = len(self.maps) + len(mapping.maps) while i >= 0: mirr = mapping.get_mirror(i) self.append_map( mapping.maps[i].invert(), (total_size - mirr - 1) if (mirr is not None and mirr > i) else None, ) i -= 1 def invert(self): inverse = Mapping() inverse.append_mapping_inverted(self) return inverse def map(self, pos, assoc=1): if self.mirror: return self._map(pos, assoc, True) for i in range(self.from_, self.to): pos = self.maps[i].map(pos, assoc) return pos def map_result(self, pos, assoc=1): return self._map(pos, assoc, False) def _map(self, pos, assoc, simple): deleted = False recoverables: dict = None i = self.from_ while i < self.to: map = self.maps[i] rec = None if recoverables: rec = recoverables.get(i) if rec is not None and map.touches(pos, rec): pos = map.recover(rec) i += 1 continue result = map.map_result(pos, assoc) if result.recover is not None: corr = self.get_mirror(i) if corr is not None and corr > i and corr < self.to: if result.deleted: i = corr pos = self.maps[corr].recover(result.recover) i += 1 continue else: if recoverables is None: recoverables = {} recoverables[corr] = result.recover if result.deleted: deleted = True pos = result.pos i += 1 return pos if simple else MapResult(pos, deleted) PK!prosemirror/transform/mark.py# Empty PK!y"prosemirror/transform/mark_step.pyfrom prosemirror.model import Fragment, Slice from .step import Step, StepResult def map_fragment(fragment: Fragment, f, parent=None): mapped = [] for i in range(fragment.child_count): child = fragment.child(i) if getattr(child.content, "size", None): child = child.copy(map_fragment(child.content, f, child)) if child.is_inline: child = f(child, parent, i) mapped.append(child) return fragment.from_array(mapped) class AddMarkStep(Step): def __init__(self, from_, to, mark): super().__init__() self.from_ = from_ self.to = to self.mark = mark def apply(self, doc): old_slice = doc.slice(self.from_, self.to) from__ = doc.resolve(self.from_) parent = from__.node(from__.shared_depth(self.to)) def iteratee(node, parent, *args): if not parent.type.allows_mark_type(self.mark.type): return node return node.mark(self.mark.add_to_set(node.marks)) slice = Slice( map_fragment(old_slice.content, iteratee, parent), old_slice.open_start, old_slice.open_end, ) return StepResult.from_replace(doc, self.from_, self.to, slice) def invert(self, doc=None): return RemoveMarkStep(self.from_, self.to, self.mark) def map(self, mapping): from_ = mapping.map_result(self.from_, 1) to = mapping.map_result(self.to, -1) if from_.deleted and to.deleted or from_.pos > to.pos: return None return AddMarkStep(from_.pos, to.pos, self.mark) def merge(self, other: "AddMarkStep"): if ( isinstance(other, AddMarkStep) and other.mark.eq(self.mark) and self.from_ <= other.to and self.to >= other.from_ ): return AddMarkStep( min(self.from_, other.from_), max(self.to, other.to), self.mark ) def to_json(self): json_data = { "stepType": "addMark", "mark": self.mark.to_json(), "from": self.from_, "to": self.to, } return json_data @staticmethod def from_json(schema, json_data): if isinstance(json_data, str): import json json_data = json.loads(json_data) if not isinstance(json_data["from"], int) or not isinstance( json_data["to"], int ): raise ValueError("Invalid input for AddMarkStep.from_json") return AddMarkStep( json_data["from"], json_data["to"], schema.mark_from_json(json_data["mark"]) ) Step.json_id("addMark", AddMarkStep) class RemoveMarkStep(Step): def __init__(self, from_, to, mark): super().__init__() self.from_ = from_ self.to = to self.mark = mark def apply(self, doc): old_slice = doc.slice(self.from_, self.to) def iteratee(node, *args): return node.mark(self.mark.remove_from_set(node.marks)) slice = Slice( map_fragment(old_slice.content, iteratee), old_slice.open_start, old_slice.open_end, ) return StepResult.from_replace(doc, self.from_, self.to, slice) def invert(self, doc=None): return AddMarkStep(self.from_, self.to, self.mark) def map(self, mapping): from_ = mapping.map_result(self.from_, 1) to = mapping.map_result(self.to, -1) if (from_.deleted and to.deleted) or (from_.pos > to.pos): return None return RemoveMarkStep(from_.pos, to.pos, self.mark) def merge(self, other: "RemoveMarkStep"): if ( isinstance(other, RemoveMarkStep) and (other.mark.eq(self.mark)) and (self.from_ <= other.to) and self.to >= other.from_ ): return RemoveMarkStep( min(self.from_, other.from_), max(self.to, other.to), self.mark ) def to_json(self): json_data = { "stepType": "removeMark", "mark": self.mark.to_json(), "from": self.from_, "to": self.to, } return json_data @staticmethod def from_json(schema, json_data): if isinstance(json_data, str): import json json_data = json.loads(json_data) if not isinstance(json_data["from"], int) or not isinstance( json_data["to"], int ): raise ValueError("Invalid input for RemoveMarkStep.from_json") return RemoveMarkStep( json_data["from"], json_data["to"], schema.mark_from_json(json_data["mark"]) ) Step.json_id("removeMark", RemoveMarkStep) PK! >> prosemirror/transform/replace.pyfrom prosemirror.model import Fragment, Slice from .replace_step import ReplaceAroundStep, ReplaceStep def replace_step(doc, from_, to=None, slice=None): if to is None: to = from_ if slice is None: slice = Slice.empty if from_ == to and not slice.size: return None from__ = doc.resolve(from_) to_ = doc.resolve(to) if fits_trivially(from__, to_, slice): return ReplaceStep(from_, to, slice) placed = place_slice(from__, slice) fitted_left = fit_left(from__, placed) fitted = fit_right(from__, to_, fitted_left) if not fitted: return None if fitted_left.size != fitted.size and can_move_text(from__, to_, fitted_left): d = to_.depth after = to_.after(d) while d > 1 and after == to_.end(d - 1): d -= 1 after += 1 fittedAfter = fit_right(from__, doc.resolve(after), fitted_left) if fittedAfter: return ReplaceAroundStep( from_, after, to, to_.end(), fittedAfter, fitted_left.size ) return ReplaceStep(from_, to, fitted, None) if (fitted or from_ != to) else None def fit_left_innter(from__, depth, placed, placed_below): content = Fragment.empty open_end = 0 placed_here = placed[depth] if len(placed) > depth else None if from__.depth > depth: inner = fit_left_innter(from__, depth + 1, placed, placed_below or placed_here) open_end = inner["open_end"] + 1 content = Fragment.from_(from__.node(depth + 1).copy(inner["content"])) if placed_here: content = content.append(placed_here["content"]) open_end = placed_here["open_end"] if placed_below: content = content.append( from__.node(depth) .content_match_at(from__.index_after(depth)) .fill_before(Fragment.empty, True) ) return {"content": content, "open_end": open_end} def fit_left(from__, placed): inner_res = fit_left_innter(from__, 0, placed, False) content = inner_res["content"] open_end = inner_res["open_end"] return Slice(content, from__.depth, open_end or 0) def fit_right_join(content, parent, from__, to_, depth, open_start, open_end): match = None count = content.child_count match_count = count - (1 if open_end > 0 else 0) parent_node = parent if open_start < 0 else from__.node(depth) if open_start < 0: match = parent_node.content_match_at(match_count) elif count == 1 and open_end > 0: match = parent_node.content_match_at( from__.index(depth) if open_start else from__.index_after(depth) ) else: match = parent_node.content_match_at(from__.index_after(depth)).match_fragment( content, (1 if count > 0 and open_start else 0), match_count ) to_node = to_.node(depth) if open_end > 0 and depth < to_.depth: after = to_node.content.cut_by_index(to_.index_after(depth)).add_to_start( content.last_child ) joinable = match.fill_before(after, True) if joinable and joinable.size and open_start > 0 and count == 1: joinable = None if joinable: inner = fit_right_join( content.last_child.content, content.last_child, from__, to_, depth + 1, (open_start - 1 if count == 1 else -1), open_end - 1, ) if inner: last = content.last_child.copy(inner) if joinable.size: return ( content.cut_by_index(0, count - 1) .append(joinable) .add_to_end(last) ) return content.replace_child(count - 1, last) if open_end > 0: match = match.match_type( ( from__.node(depth + 1) if count == 1 and open_start > 0 else content.last_child ).type ) to_index = to_.index(depth) if to_index == to_node.child_count and not to_node.type.compatible_content( parent.type ): return None joinable = match.fill_before(to_node.content, True, to_index) i = to_index while i < to_node.content.child_count: if not parent_node.type.allows_marks(to_node.content.child(i).marks): joinable = None i += 1 if not joinable: return None if open_end > 0: closed = fit_right_closed( content.last_child, open_end - 1, from__, depth + 1, (open_start - 1 if count == 1 else -1), ) content = content.replace_child(count - 1, closed) content = content.append(joinable) if to_.depth > depth: content = content.add_to_end(fit_right_separate(to_, depth + 1)) return content def fit_right_closed(node, open_end, from__, depth, open_start): match = None content = node.content count = content.child_count if open_start >= 0: match = ( from__.node(depth) .content_match_at(from__.index_after(depth)) .match_fragment(content, 1 if open_start > 0 else 0, count) ) else: match = node.content_match_at(count) if open_end > 0: closed = fit_right_closed( content.last_child, open_end - 1, from__, depth + 1, open_start - 1 if count == 1 else -1, ) content = content.replace_child(count - 1, closed) return node.copy(content.append(match.fill_before(Fragment.empty, True))) def fit_right_separate(to_, depth): node = to_.node(depth) fill = node.content_match_at(0).fill_before(node.content, True, to_.index(depth)) if to_.depth > depth: fill = fill.add_to_end(fit_right_separate(to_, depth + 1)) return node.copy(fill) def normalize_slice(content, open_start, open_end): while open_start > 0 and open_end > 0 and content.child_count == 1: content = content.first_child.content open_start -= 1 open_end -= 1 return Slice(content, open_start, open_end) def fit_right(from__, to_, slice): fitted = fit_right_join( slice.content, from__.node(0), from__, to_, 0, slice.open_start, slice.open_end ) if not fitted: return None return normalize_slice(fitted, slice.open_start, to_.depth) def fits_trivially(from__, to_, slice): if not slice.open_start and not slice.open_end and from__.start() == to_.start(): return from__.parent.can_replace(from__.index(), to_.index(), slice.content) return False def can_move_text(from__, to_, slice): if not to_.parent.is_text_block: return False parent = ( node_right(slice.content, slice.open_end) if slice.open_end else from__.node(from__.depth - (slice.open_start - slice.open_end)) ) if not parent.is_text_block: return False for i in range(to_.index(), to_.parent.child_count): if not parent.type.allows_marks(to_.parent.child(i).marks): return False match = None if slice.open_end: match = parent.content_match_at(parent.child_count) else: match = parent.content_match_at(parent.child_count) if slice.size: match = match.match_fragment(slice.content, 1 if slice.open_start else 0) match = match.match_fragment(to_.parent.content, to_.index()) return match.valid_end if match else False def node_right(content, depth): for i in range(1, depth): content = content.last_child.content return content.last_child def place_slice(from__, slice): frontier = Frontier(from__) pass_ = 1 while slice.size and pass_ <= 3: slice = frontier.place_slice( slice.content, slice.open_start, slice.open_end, pass_ ) pass_ += 1 while len(frontier.open): frontier.close_node() return frontier.placed class SparseList(list): def __setitem__(self, index, value): missing = index - len(self) + 1 if missing > 0: self.extend([None] * missing) list.__setitem__(self, index, value) def __getitem__(self, index): try: return list.__getitem__(self, index) except IndexError: return None class Frontier: def __init__(self, pos_): self.open = [] for d in range(pos_.depth + 1): parent = pos_.node(d) match = parent.content_match_at(pos_.index_after(d)) self.open.append( { "parent": parent, "match": match, "content": Fragment.empty, "wrapper": False, "open_end": 0, "depth": d, } ) self.placed = SparseList() def place_slice(self, fragment, open_start, open_end, pass_, parent=None): if open_start > 0: first = fragment.first_child inner = self.place_slice( first.content, max(0, open_start - 1), (open_end - 1 if open_end and fragment.child_count == 1 else 0), pass_, first, ) if inner.content != first.content: if inner.content.size: fragment = fragment.replace_child(0, first.copy(inner.content)) open_start = inner.open_start + 1 else: if fragment.child_count == 1: open_end = 0 fragment = fragment.cut_by_index(1) open_start = 0 result = self.place_content(fragment, open_start, open_end, pass_, parent) if pass_ > 2 and result.size and open_start == 0: for i in range(result.content.child_count): child = result.content.child(i) self.place_content( child.content, 0, ( open_end - 1 if open_end and i == result.content.child_count - 1 else 0 ), pass_, child, ) result = Fragment.empty return result def place_content(self, fragment, open_start, open_end, pass_, parent=None): i = 0 while i < fragment.child_count: child = fragment.child(i) placed = False last = i == (fragment.child_count - 1) d = len(self.open) - 1 while d >= 0: open = self.open[d] wrap = None if pass_ > 1: wrap = open["match"].find_wrapping(child.type) if wrap and not (parent and len(wrap) and wrap[-1] == parent.type): while len(self.open) - 1 > d: self.close_node() w = 0 while w < len(wrap): open["match"] = open["match"].match_type(wrap[w]) d += 1 open = { "parent": wrap[w].create(), "match": wrap[w].content_match, "content": Fragment.empty, "wrapper": True, "open_end": 0, "depth": d + w, } self.open.append(open) w += 1 match = open["match"].match_type(child.type) if not match: fill = open["match"].fill_before(Fragment.from_(child)) if fill: for j in range(fill.child_count): ch = fill.child(j) self.add_node(open, ch, 0) match = open["match"].match_fragment(ch) elif parent and open["match"].match_type(parent.type): break else: d -= 1 continue while len(self.open) - 1 > d: self.close_node() child = child.mark(open["parent"].type.allowed_marks(child.marks)) if open_start: child = close_node_start(child, open_start, open_end if last else 0) open_start = 0 self.add_node(open, child, open_end if last else 0) open["match"] = match if last: open_end = 0 placed = True break if not placed: break i += 1 if len(self.open) > 1 and ( i > 0 and i == fragment.child_count or parent and self.open[-1]["parent"].type == parent.type ): self.close_node() return Slice(fragment.cut_by_index(i), open_start, open_end) def add_node(self, open, node, open_end): open["content"] = close_fragment_end( open["content"], open["open_end"] ).add_to_end(node) open["open_end"] = open_end def close_node(self): open = self.open.pop() if open["content"].size == 0: pass elif open["wrapper"]: self.add_node( self.open[-1], open["parent"].copy(open["content"]), open["open_end"] + 1, ) else: self.placed[open["depth"]] = { "depth": open["depth"], "content": open["content"], "open_end": open["open_end"], } def close_node_start(node, open_start, open_end): content = node.content if open_start > 1: first = close_node_start( node.first_child, open_start - 1, open_end - 1 if node.child_count == 1 else 0, ) content = node.content.replace_child(0, first) fill = node.type.content_match.fill_before(content, open_end == 0) return node.copy(fill.append(content)) def close_node_end(node, depth): content = node.content if depth > 1: last = close_node_end(node.last_child, depth - 1) content = node.content.replace_child(node.child_count - 1, last) fill = node.content_match_at(node.child_count).fill_before(Fragment.empty, True) return node.copy(content.append(fill)) def close_fragment_end(fragment, depth): if depth: return fragment.replace_child( fragment.child_count - 1, close_node_end(fragment.last_child, depth) ) else: return fragment def close_fragment(fragment, depth, old_open, new_open, parent): if depth < old_open: first = fragment.first_child fragment = fragment.replace_child( 0, first.copy( close_fragment(first.content, depth + 1, old_open, new_open, first) ), ) if depth > new_open: fragment = ( parent.content_match_at(0).fill_before(fragment, True).append(fragment) ) return fragment def covered_depths(from__, to_): result = [] min_depth = min(from__.depth, to_.depth) for d in range(min_depth, -1, -1): start = from__.start(d) if ( (start < from__.pos - (from__.depth - d)) or (to_.end(d) > to_.pos + (to_.depth - d)) or (from__.node(d).type.spec.get("isolation")) or (to_.node(d).type.spec.get("isolation")) ): break if start == to_.start(d): result.append(d) return result PK!*qu %prosemirror/transform/replace_step.pyfrom prosemirror.model import Slice from .map import StepMap from .step import Step, StepResult class ReplaceStep(Step): def __init__(self, from_, to, slice, structure=None): super().__init__() self.from_ = from_ self.to = to self.slice = slice self.structure = bool(structure) def apply(self, doc): if self.structure and content_between(doc, self.from_, self.to): return StepResult.fail("Structure replace would overrite content") return StepResult.from_replace(doc, self.from_, self.to, self.slice) def get_map(self): return StepMap([self.from_, self.to - self.from_, self.slice.size]) def invert(self, doc): return ReplaceStep( self.from_, self.from_ + self.slice.size, doc.slice(self.from_, self.to) ) def map(self, mapping): from_ = mapping.map_result(self.from_, 1) to = mapping.map_result(self.to, -1) if from_.deleted and to.deleted: return None return ReplaceStep(from_.pos, max(from_.pos, to.pos), self.slice) def merge(self, other: "ReplaceStep"): if not (isinstance(other, ReplaceStep) or other.structure != self.structure): return None if ( self.from_ + self.slice.size == other.from_ and not self.slice.open_end and not other.slice.open_start ): if self.slice.size + other.slice.size == 0: slice = Slice.empty else: slice = Slice( self.slice.content.append(other.slice.content), self.slice.open_start, other.slice.open_end, ) return ReplaceStep( self.from_, self.to + (other.to - other.from_), slice, self.structure ) elif ( other.to == self.from_ and not self.slice.open_start and not other.slice.open_end ): if self.slice.size + other.slice.size == 0: slice = Slice.empty else: slice = Slice( other.slice.content.append(self.slice.content), other.slice.open_start, self.slice.open_end, ) return ReplaceStep(other.from_, self.to, slice, self.structure) return None def to_json(self): json_data = {"stepType": "replace", "from": self.from_, "to": self.to} if self.slice.size: json_data["slice"] = self.slice.to_json() if self.structure: json_data["structure"] = True return json_data @staticmethod def from_json(schema, json_data): if isinstance(json_data, str): import json json_data = json.loads(json_data) if not isinstance(json_data["from"], int) or not isinstance( json_data["to"], int ): raise ValueError("Invlid input for ReplaceStep.from_json") return ReplaceStep( json_data["from"], json_data["to"], Slice.from_json(schema, json_data.get("slice")), bool(json_data.get("structure")), ) Step.json_id("replace", ReplaceStep) class ReplaceAroundStep(Step): def __init__(self, from_, to, gap_from, gap_to, slice, insert, structure=None): super().__init__() self.from_ = from_ self.to = to self.gap_from = gap_from self.gap_to = gap_to self.slice = slice self.insert = insert self.structure = bool(structure) def apply(self, doc): if self.structure and ( content_between(doc, self.from_, self.gap_from) or content_between(doc, self.gap_to, self.to) ): return StepResult.fail("Structure gap-replace would overwrite content") gap = doc.slice(self.gap_from, self.gap_to) if gap.open_start or gap.open_end: return StepResult.fail("Gap is not a flat range") inserted = self.slice.insert_at(self.insert, gap.content) if not inserted: return StepResult.fail("Content does not fit in gap") return StepResult.from_replace(doc, self.from_, self.to, inserted) def get_map(self): return StepMap( [ self.from_, self.gap_from - self.from_, self.insert, self.gap_to, self.to - self.gap_to, self.slice.size - self.insert, ] ) def invert(self, doc): gap = self.gap_to - self.gap_from return ReplaceAroundStep( self.from_, self.from_ + self.slice.size + gap, self.from_ + self.insert, self.from_ + self.insert + gap, doc.slice(self.from_, self.to).remove_between( self.gap_from - self.from_, self.gap_to - self.from_ ), self.gap_from - self.from_, self.structure, ) def map(self, mapping): from_ = mapping.map_result(self.from_, 1) to = mapping.map_result(self.to, -1) gap_from = mapping.map(self.gap_from, -1) gap_to = mapping.map(self.gap_to, 1) if (from_.deleted and to.delete) or gap_from < from_.pos or gap_to > to.pos: return None return ReplaceAroundStep( from_.pos, to.pos, gap_from, gap_to, self.slice, self.insert, self.structure ) def to_json(self): json_data = { "stepType": "replaceAround", "from": self.from_, "to": self.to, "gapFrom": self.gap_from, "gapTo": self.gap_to, "insert": self.insert, } if self.slice.size: json_data["slice"] = self.slice.to_json() if self.structure: json_data["structure"] = True return json_data @staticmethod def from_json(schema, json_data): if isinstance(json_data, str): import json json_data = json.loads(json_data) if ( not isinstance(json_data["from"], int) or not isinstance(json_data["to"], int) or not isinstance(json_data["gapFrom"], int) or not isinstance(json_data["gapTo"], int) or not isinstance(json_data["insert"], int) ): raise ValueError("Invlid input for ReplaceAroundStep.from_json") return ReplaceAroundStep( json_data["from"], json_data["to"], json_data["gapFrom"], json_data["gapTo"], Slice.from_json(schema, json_data.get("slice")), json_data["insert"], bool(json_data.get("structure")), ) Step.json_id("replaceAround", ReplaceAroundStep) def content_between(doc, from_, to): from__ = doc.resolve(from_) dist = to - from_ depth = from__.depth while ( dist > 0 and depth > 0 and from__.index_after(depth) == from__.node(depth).child_count ): depth -= 1 dist -= 1 if dist > 0: next = from__.node(depth).maybe_child(from__.index_after(depth)) while dist > 0: if not next or next.is_leaf: return True next = next.first_child dist -= 1 return False PK!3Gprosemirror/transform/step.pyimport abc from prosemirror.model import ReplaceError from .map import StepMap # like a registry STEPS_BY_ID = {} class Step(metaclass=abc.ABCMeta): @abc.abstractmethod def apply(self, _doc): return def get_map(self): return StepMap.empty @abc.abstractmethod def invert(self, _doc): return @abc.abstractmethod def map(self, _mapping): return def merge(self, _other): return None @abc.abstractmethod def to_json(self): return @staticmethod def from_json(schema, json_data): if isinstance(json_data, str): import json json_data = json.loads(json_data) if not json_data or not json_data.get("stepType"): raise ValueError("Invalid inpit for Step.from_json") type = STEPS_BY_ID.get(json_data["stepType"]) if not type: raise ValueError(f'no step type {json_data["stepType"]} defined') return type.from_json(schema, json_data) @staticmethod def json_id(id, step_class): if id in STEPS_BY_ID: raise ValueError(f"Duplicated JSON ID for step type: {id}") STEPS_BY_ID[id] = step_class step_class.json_id = id return step_class class StepResult: def __init__(self, doc, failed): self.doc = doc self.failed = failed @classmethod def ok(cls, doc): return cls(doc, None) @classmethod def fail(cls, message): return cls(None, message) @classmethod def from_replace(cls, doc, from_, to, slice): try: return cls.ok(doc.replace(from_, to, slice)) except ReplaceError as e: return cls.fail(e.args[0]) PK!"prosemirror/transform/structure.pyfrom prosemirror.model import Node def can_cut(node, start, end): if start == 0 or node.can_replace(start, node.child_count): return (end == node.child_count) or node.can_replace(0, end) return False def lift_target(range_): parent = range_.parent content = parent.content.cut_by_index(range_.start_index, range_.end_index) depth = range_.depth while True: node = range_.from_.node(depth) index = range_.from_.index(depth) end_index = range_.to.index_after(depth) if depth < range_.depth and node.can_replace(index, end_index, content): return depth if ( depth == 0 or node.type.spec.get("isolating") or not can_cut(node, index, end_index) ): break depth -= 1 def find_wrapping(range_, node_type, attrs=None, inner_range=None): if inner_range is None: inner_range = range_ around = find_wrapping_outside(range_, node_type) inner = False if around is not None: inner = find_wrapping_inside(inner_range, node_type) else: return None if inner is None: return None return ( [with_attrs(item) for item in around] + [{"type": node_type, "attrs": attrs}] + [with_attrs(item) for item in inner] ) def with_attrs(type): return {"type": type, "attrs": None} def find_wrapping_outside(range_, type): parent = range_.parent start_index = range_.start_index end_index = range_.end_index around = parent.content_match_at(start_index).find_wrapping(type) if around is None: return None outer = around[0] if len(around) and around[0] else type return around if parent.can_replace_with(start_index, end_index, outer) else None def find_wrapping_inside(range_, type): parent = range_.parent start_index = range_.start_index end_index = range_.end_index inner = parent.child(start_index) inside = type.content_match.find_wrapping(inner.type) if inside is None: return None last_type = inside[-1] if len(inside) else type inner_match = last_type.content_match i = start_index while inner_match and i < end_index: inner_match = inner_match.match_type(parent.child(i).type) i += 1 if not inner_match or not inner_match.valid_end: return None return inside def can_change_type(doc, pos, type): pos_ = doc.resolve(pos) index = pos_.index() return pos_.parent.can_replace_with(index, index + 1, type) def can_split(doc, pos, depth=None, types_after=None): if depth is None: depth = 1 pos_ = doc.resolve(pos) base = pos_.depth - depth inner_type = None if types_after: inner_type = types_after[-1] if not inner_type: inner_type = pos_.parent if isinstance(inner_type, Node): if ( base < 0 or pos_.parent.type.spec.get("isolating") or not pos_.parent.can_replace(pos_.index(), pos_.parent.child_count) or not inner_type.type.valid_content( pos_.parent.content.cut_by_index(pos_.index(), pos_.parent.child_count) ) ): return False elif isinstance(inner_type, dict): if ( base < 0 or pos_.parent.type.spec.get("isolating") or not pos_.parent.can_replace(pos_.index(), pos_.parent.child_count) or not inner_type["type"].valid_content( pos_.parent.content.cut_by_index(pos_.index(), pos_.parent.child_count) ) ): return False d = pos_.depth - 1 i = depth - 2 while d > base: node = pos_.node(d) index = pos_.index(d) if node.type.spec.get("isolating"): return False rest = node.content.cut_by_index(index, node.child_count) after = None if types_after and len(types_after) > i: after = types_after[i] if not after: after = node if isinstance(after, dict): if after != node: rest = rest.replace_child(0, after["type"].create(after.get("attrs"))) if not node.can_replace(index + 1, node.child_count) or not after[ "type" ].valid_content(rest): return False if isinstance(after, Node): if after != node: rest = rest.replace_child(0, after.type.create(after.attrs)) if not node.can_replace( index + 1, node.child_count ) or not after.type.valid_content(rest): return False d -= 1 i -= 1 index = pos_.index_after(base) base_type = types_after[0] if types_after else None return pos_.node(base).can_replace_with( index, index, base_type["type"] if base_type else pos_.node(base + 1).type ) def can_join(doc, pos): pos_ = doc.resolve(pos) index = pos_.index() return ( pos_.parent.can_replace(index, index + 1) if joinable(pos_.node_before, pos_.node_after) else None ) def joinable(a, b): if a and b and not a.is_leaf: return a.can_append(b) return False def join_point(doc, pos, dir=-1): pos_ = doc.resolve(pos) for d in range(pos_.depth, -1, -1): before = None after = None if d == pos_.depth: before = pos_.node_before after = pos_.node_after elif dir > 0: before = pos_.node(d + 1) after = pos_.node(d).maybe_child(pos_.index(d) + 1) else: before = pos_.node(d).maybe_child(pos_.index(d) - 1) after = pos_.node(d + 1) if before and not before.is_text_block and joinable(before, after): return pos if d == 0: break pos = pos_.before(d) if dir < 0 else pos_.after(d) def insert_point(doc, pos, node_type): pos_ = doc.resolve(pos) if pos_.parent.can_replace_with(pos_.index(), pos_.index(), node_type): return pos if pos_.parent_offset == 0: for d in range(pos_.depth - 1, -1, -1): index = pos_.index(d) if pos_.node(d).can_replace_with(index, index, node_type): return pos_.before(d + 1) if index > 0: return None if pos_.parent_offset == pos_.parent.content.size: for d in range(pos_.depth - 1, -1, -1): index = pos_.index_after(d) if pos_.node(d).can_replace_with(index, index, node_type): return pos_.after(d + 1) if index < pos_.node(d).child_count: return None def drop_point(doc, pos, slice): pos_ = doc.resolve(pos) if not slice.content.size: return pos content = slice.content for i in range(slice.open_start): content = content.first_child.content pass_ = 1 while pass_ <= (2 if slice.open_start == 0 and slice.size else 1): for d in range(pos_.depth, 0, -1): if d == pos_.depth: bias = 0 elif pos_.pos <= (pos_.start(d + 1) + pos_.end(d + 1)) / 2: bias = -1 else: bias = 1 insert_pos = pos_.index(d) + (1 if bias > 0 else 0) if pass_ == 1: cond = pos_.node(d).can_replace(insert_pos, insert_pos, content) else: cond = ( pos_.node(d) .content_match_at(insert_pos) .find_wrapping(content.first_child.type) ) if cond: if bias == 0: return pos_.pos elif bias < 0: return pos_.before(d + 1) else: return pos_.after(d + 1) pass_ += 1 return None PK!"S3<3<"prosemirror/transform/transform.pyfrom prosemirror.model import Fragment, MarkType, Node, Slice from .map import Mapping from .mark_step import AddMarkStep, RemoveMarkStep from .replace import close_fragment, covered_depths, fits_trivially, replace_step from .replace_step import ReplaceAroundStep, ReplaceStep from .structure import can_change_type, insert_point class TransformError(ValueError): pass class Transform: def __init__(self, doc): self.doc = doc self.steps = [] self.docs = [] self.mapping = Mapping() @property def before(self): return self.docs[0] if self.docs else self.doc def step(self, object): result = self.maybe_step(object) if result.failed: raise TransformError(result.failed) return self def maybe_step(self, step): result = step.apply(self.doc) if not result.failed: self.add_step(step, result.doc) return result def doc_changed(self): return bool(len(self.steps)) def add_step(self, step, doc): self.docs.append(self.doc) self.steps.append(step) self.mapping.append_map(step.get_map()) self.doc = doc # mark.js def add_mark(self, from_, to, mark): removed = [] added = [] removing = None adding = None def iteratee(node, pos, parent, *args): nonlocal removing nonlocal adding if not node.is_inline: return marks = node.marks if not mark.is_in_set(marks) and parent.type.allows_mark_type(mark.type): start = max(pos, from_) end = min(pos + node.node_size, to) new_set = mark.add_to_set(marks) for i in range(len(marks)): if not marks[i].is_in_set(new_set): if ( removing and removing.to == start and removing.mark.eq(marks[i]) ): removing.to = end else: removing = RemoveMarkStep(start, end, marks[i]) removed.append(removing) if adding and adding.to == start: adding.to = end else: adding = AddMarkStep(start, end, mark) added.append(adding) self.doc.nodes_between(from_, to, iteratee) for item in removed: self.step(item) for item in added: self.step(item) return self def remove_mark(self, from_, to, mark=None): matched = [] step = 0 def iteratee(node, pos, *args): nonlocal step if not node.is_inline: return step += 1 to_remove = None if isinstance(mark, MarkType): found = mark.is_in_set(node.marks) if found: to_remove = [found] elif mark: if mark.is_in_set(node.marks): to_remove = [mark] else: to_remove = node.marks if to_remove: end = min(pos + node.node_size, to) for style in to_remove: found = None for m in matched: if m["step"] == step - 1 and style.eq(m["style"]): found = m if found: found["to"] = end found["step"] = step else: matched.append( { "style": style, "from": max(pos, from_), "to": end, "step": step, } ) self.doc.nodes_between(from_, to, iteratee) for item in matched: self.step(RemoveMarkStep(item["from"], item["to"], item["style"])) return self def clear_incompatible(self, pos, parent_type, match=None): if match is None: match = parent_type.content_match node = self.doc.node_at(pos) del_steps = [] cur = pos + 1 for i in range(node.child_count): child = node.child(i) end = cur + child.node_size allowed = match.match_type(child.type, child.attrs) if not allowed: del_steps.append(ReplaceStep(cur, end, Slice.empty)) else: match = allowed for j in range(len(child.marks)): if not parent_type.allows_mark_type(child.marks[j].type): self.step(RemoveMarkStep(cur, end, child.marks[j])) cur = end if not match.valid_end: fill = match.fill_before(Fragment.empty, True) self.replace(cur, cur, Slice(fill, 0, 0)) for item in reversed(del_steps): self.step(item) return self # replace.js def replace(self, from_, to=None, slice=None): if to is None: to = from_ if slice is None: slice = Slice.empty step = replace_step(self.doc, from_, to, slice) if step: self.step(step) return self def replace_with(self, from_, to, content): return self.replace(from_, to, Slice(Fragment.from_(content), 0, 0)) def delete(self, from_, to): return self.replace(from_, to, Slice.empty) def insert(self, pos, content): return self.replace_with(pos, pos, content) def replace_range(self, from_, to, slice): if not slice.size: return self.delete_range(from_, to) from__ = self.doc.resolve(from_) to_ = self.doc.resolve(to) if fits_trivially(from__, to_, slice): return self.step(ReplaceStep(from_, to, slice)) target_depths = covered_depths(from__, self.doc.resolve(to)) if target_depths and target_depths[-1] == 0: target_depths.pop() preferred_target = -(from__.depth + 1) target_depths.insert(0, preferred_target) d = from__.depth pos = from__.pos - 1 while d > 0: spec = from__.node(d).type.spec if spec.get("defining") or spec.get("isolating"): break if d in target_depths: preferred_target = d elif from__.before(d) == pos: target_depths.insert(1, -d) d -= 1 pos -= 1 preferred_target_index = target_depths.index(preferred_target) left_nodes = [] preferred_depth = slice.open_start content = slice.content i = 0 while True: node = content.first_child left_nodes.append(node) if i == slice.open_start: break content = node.content i += 1 if ( preferred_depth > 0 and left_nodes[preferred_depth - 1].type.spec.get("defining") and from__.node(preferred_target_index).type.name != left_nodes[preferred_depth - 1].type.name ): preferred_depth -= 1 elif ( preferred_depth >= 2 and left_nodes[preferred_depth - 1].is_text_block and left_nodes[preferred_depth - 2].type.spec.get("defining") and from__.node(preferred_target_index).type.name != left_nodes[preferred_depth - 2].type.name ): preferred_depth -= 2 for j in range(slice.open_start, -1, -1): open_depth = (j + preferred_depth + 1) % (slice.open_start + 1) if len(left_nodes) > open_depth: insert = left_nodes[open_depth] else: continue for i in range(len(target_depths)): target_depth = target_depths[ (i + preferred_target_index) % len(target_depths) ] expand = True if target_depth < 0: expand = False target_depth = -target_depth parent = from__.node(target_depth - 1) index = from__.index(target_depth - 1) if parent.can_replace_with(index, index, insert.type, insert.marks): return self.replace( from__.before(target_depth), to_.after(target_depth) if expand else to, Slice( close_fragment( slice.content, 0, slice.open_start, open_depth, None ), open_depth, slice.open_end, ), ) return self.replace(from_, to, slice) def replace_range_with(self, from_, to, node): if ( not node.is_inline and from_ == to and self.doc.resolve(from_).parent.content.size ): point = insert_point(self.doc, from_, node.type) if point is not None: from_ = to = point return self.replace_range(from_, to, Slice(Fragment.from_(node), 0, 0)) def delete_range(self, from_, to): from__ = self.doc.resolve(from_) to_ = self.doc.resolve(to) covered = covered_depths(from__, to_) for i in range(len(covered)): depth = covered[i] last = len(covered) - 1 == i if (last and depth == 0) or from__.node(depth).type.content_match.valid_end: return self.delete(from__.start(depth), to_.end(depth)) if depth > 0 and ( last or from__.node(depth - 1).can_replace( from__.index(depth - 1), to_.index_after(depth - 1) ) ): return self.delete(from__.before(depth), to_.after(depth)) for d in range(1, from__.depth + 1): if from_ - from__.start(d) == from__.depth - d and to > from__.end(d): return self.delete(from__.before(d), to) return self.delete(from_, to) # structure.js def lift(self, range_, target): from__ = range_.from_ to_ = range_.to depth = range_.depth gap_start = from__.before(depth + 1) gap_end = to_.after(depth + 1) start = gap_start end = gap_end before = Fragment.empty open_start = 0 d = depth splitting = False while d > target: if splitting or from__.index(d) > 0: splitting = True before = Fragment.from_(from__.node(d).copy(before)) open_start += 1 else: start -= 1 d -= 1 after = Fragment.empty open_end = 0 d = depth splitting = False while d > target: if splitting or to_.after(d + 1) < to_.end(d): splitting = True after = Fragment.from_(to_.node(d).copy(after)) open_end += 1 else: end += 1 d -= 1 return self.step( ReplaceAroundStep( start, end, gap_start, gap_end, Slice(before.append(after), open_start, open_end), before.size - open_start, True, ) ) def wrap(self, range_, wrappers): content = Fragment.empty i = len(wrappers) - 1 while i >= 0: content = Fragment.from_( wrappers[i]["type"].create(wrappers[i].get("attrs"), content) ) i -= 1 start = range_.start end = range_.end return self.step( ReplaceAroundStep( start, end, start, end, Slice(content, 0, 0), len(wrappers), True ) ) def set_block_type(self, from_, to, type, attrs): if to is None: to = from_ if not type.is_text_block: raise ValueError("Type given to set_block_type should be a textblock") map_from = len(self.steps) def iteratee(node: "Node", pos, *args): if ( node.is_text_block and not node.has_markup(type, attrs) and can_change_type( self.doc, self.mapping.slice(map_from).map(pos), type ) ): self.clear_incompatible(self.mapping.slice(map_from).map(pos, 1), type) mapping = self.mapping.slice(map_from) start_m = mapping.map(pos, 1) end_m = mapping.map(pos + node.node_size, 1) self.step( ReplaceAroundStep( start_m, end_m, start_m + 1, end_m - 1, Slice( Fragment.from_(type.create(attrs, None, node.marks)), 0, 0 ), 1, True, ) ) return False self.doc.nodes_between(from_, to, iteratee) return self def set_node_markup(self, pos, type, attrs, marks=None): node = self.doc.node_at(pos) if not node: raise ValueError("No node at given position") if not type: type = node.type new_node = type.create(attrs, None, marks or node.marks) if node.is_leaf: return self.replace_with(pos, pos + node.node_size, new_node) if not type.valid_content(node.content): raise ValueError(f"Invalid content for node type {type.name}") return self.step( ReplaceAroundStep( pos, pos + node.node_size, pos + 1, pos + node.node_size - 1, Slice(Fragment.from_(new_node), 0, 0), 1, True, ) ) def split(self, pos, depth=None, types_after=None): if depth is None: depth = 1 pos_ = self.doc.resolve(pos) before = Fragment.empty after = Fragment.empty d = pos_.depth e = pos_.depth - depth i = depth - 1 while d > e: before = Fragment.from_(pos_.node(d).copy(before)) type_after = None if types_after and len(types_after) > i: type_after = types_after[i] after = Fragment.from_( type_after["type"].create(type_after.get("attrs"), after) if type_after else pos_.node(d).copy(after) ) d -= 1 i -= 1 return self.step( ReplaceStep(pos, pos, Slice(before.append(after), depth, depth), True) ) def join(self, pos, depth=1): step = ReplaceStep(pos - depth, pos + depth, Slice.empty, True) return self.step(step) PK! __prosemirror/utils.pyfrom icu import UnicodeString def text_length(text): return UnicodeString(text).length() PK!HڽTU$prosemirror_py-0.1.2.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hs¯'prosemirror_py-0.1.2.dist-info/METADATAVoF ~_Kj]<,]q>z>~_T;Jtp H'G;1ZΓ/?DXgN$ m+<-gOA"cj%Kq&@|,Ͷ 0 -[t%7ֶZ &'aβcKIIT+:㲋0l 3b6p3w &A *r21a} |#X7uDu$U;v-|-<㐧AV|au G iZaѪ/@|0Nh1B6=j#c3b\֩n.r LJ8DB'JP402=^m^Pκa!8Zd ?9OׁupLLQzejtNa,3nAQoǡ2p_g-<خaYJ5Vsr [p8~UMb.)  !2x+|x=2mU!wIph1ސ0kuUGGloorTg,fl^@{c6Իszj]kq;uxZ?O!R*e0$yڀv ( }?(h i}՚[k3 EncL` vWɧFtCV|OzyL ^5sU4Q!J1U2L.@] 磫0N  ]GVy~Gdܘfhnݛ"Kz\##kc'}L1o4;ʛivYۂ% ]T, & Z=̠Bt޷1ʇUYq}Dv<ڗeL A;dlލL7g%l5l=]VaSdޖg9yRgWp x^]1zXbY jkROZGi=x[Ss2n)YGX㿡~zƄDyI,f0l'Q$"ng9i,ײ^i(g#9iZjUjS~X̍cSc@O Db?ďuТͺ"Iy6h.0zRiRO+ %Wg uG0Ji:s SJvv4KH{?PK!xJJprosemirror/__init__.pyPK!׵prosemirror/model/__init__.pyPK!c** oprosemirror/model/comparedeep.pyPK!lLU1U1prosemirror/model/content.pyPK!7j֫ f5prosemirror/model/diff.pyPK!y>H@prosemirror/model/fragment.pyPK!+s^prosemirror/model/mark.pyPK!XZ((gprosemirror/model/node.pyPK!+fސprosemirror/model/replace.pyPK!Lqq prosemirror/model/resolvedpos.pyPK!tT(T(prosemirror/model/schema.pyPK!E$$$)prosemirror/schema/basic/__init__.pyPK!l9 (prosemirror/schema/basic/schema_basic.pyPK!,###|prosemirror/schema/list/__init__.pyPK!T&prosemirror/schema/list/schema_list.pyPK!s }i$prosemirror/test_builder/__init__.pyPK!;+ !prosemirror/test_builder/build.pyPK!C  !prosemirror/transform/__init__.pyPK!3sprosemirror/transform/map.pyPK!3prosemirror/transform/mark.pyPK!y"Y3prosemirror/transform/mark_step.pyPK! >> IFprosemirror/transform/replace.pyPK!*qu %:prosemirror/transform/replace_step.pyPK!3Gwprosemirror/transform/step.pyPK!"yprosemirror/transform/structure.pyPK!"S3<3<"prosemirror/transform/transform.pyPK! __prosemirror/utils.pyPK!HڽTU$prosemirror_py-0.1.2.dist-info/WHEELPK!Hs¯'8prosemirror_py-0.1.2.dist-info/METADATAPK!HeN_ %, prosemirror_py-0.1.2.dist-info/RECORDPK