PK!django_pgtree/__init__.pyPK!¯ÊH<ˆˆdjango_pgtree/fields.pyfrom django.db.models import Field, Lookup from django.utils.translation import gettext_lazy as _ class LtreeField(Field): description = _("Dotted label path") def db_type(self, connection): return "ltree" def cast_db_type(self, connection): if self.max_length is None: return connection.ops.cast_char_field_without_max_length return super().cast_db_type(connection) def get_internal_type(self): return "CharField" def to_python(self, value): if isinstance(value, list) or value is None: return value if isinstance(value, str): return value.split(".") raise ValueError("Don't know how to handle {!r}".format(value)) def get_prep_value(self, value): if isinstance(value, str) or value is None: return value return ".".join(value) def from_db_value(self, value, expression, connection): if not value: return [] return value.split(".") class BinaryLookup(Lookup): def as_sql(self, compiler, connection): lhs, lhs_params = self.process_lhs(compiler, connection) rhs, rhs_params = self.process_rhs(compiler, connection) params = lhs_params + rhs_params return " ".join((lhs, self.operator, rhs)), params @LtreeField.register_lookup class AncestorOf(BinaryLookup): lookup_name = "ancestor_of" operator = "@>" @LtreeField.register_lookup class DescendantOf(BinaryLookup): lookup_name = "descendant_of" operator = "<@" @LtreeField.register_lookup class MatchesLquery(BinaryLookup): lookup_name = "matches_lquery" operator = "~" PK! ŽwÖÖ(django_pgtree/migrations/0001_initial.py# Generated by Django 2.1.2 on 2018-10-25 04:58 from django.db import migrations class Migration(migrations.Migration): dependencies = [] operations = [ migrations.RunSQL( """ CREATE OR REPLACE FUNCTION djpgtree_next( tbl regclass, prefix ltree, gap bigint, pad_length int ) RETURNS ltree AS $function$ DECLARE sibling_query lquery; previous_highest ltree; previous_rightmost_label text; next_rightmost_segment text; BEGIN -- Generate a lquery that matches all would-be siblings of -- the new row IF prefix = ''::ltree THEN sibling_query = '*{1}'; ELSE sibling_query = prefix::text || '.*{1}'; END IF; -- Find the existing sibling with the highest tree_path EXECUTE format($$ SELECT tree_path FROM %s WHERE tree_path ~ %L ORDER BY tree_path DESC LIMIT 1 $$, tbl, sibling_query) INTO previous_highest; IF previous_highest IS NULL THEN -- If there is no such row, start at the gap we were passed in, -- to allow room for other rows to be moved above us RETURN prefix || LPAD(gap::text, pad_length, '0'); ELSE -- Otherwise, parse the rightmost label as a number, adding -- the gap to it, and reattach to the prefix previous_rightmost_label = subpath(previous_highest, -1); next_rightmost_segment = previous_rightmost_label::bigint + gap; RETURN prefix || LPAD(next_rightmost_segment, pad_length, '0'); END IF; END $function$ LANGUAGE plpgsql; """, "DROP FUNCTION djpgtree_next(regclass, ltree, bigint, int)", ) ] PK!$django_pgtree/migrations/__init__.pyPK!â¸7 dddjango_pgtree/models.pyfrom django.contrib.postgres.indexes import GistIndex from django.db import models from django.db.transaction import atomic from .fields import LtreeField GAP = 10 ** 9 PAD_LENGTH = 18 class LtreeConcat(models.Func): arg_joiner = "||" template = "%(expressions)s" class Subpath(models.Func): function = "subpath" class Text2Ltree(models.Func): function = "text2ltree" class DjPgTreeNext(models.Func): function = "djpgtree_next" class TreeQuerySet(models.QuerySet): def roots(self): return self.filter(tree_path__matches_lquery=["*{1}"]) UNCHANGED = object() class TreeNode(models.Model): __new_parent = UNCHANGED tree_path = LtreeField(unique=True) objects = TreeQuerySet.as_manager() class Meta: abstract = True indexes = (GistIndex(fields=["tree_path"], name="tree_path_idx"),) ordering = ("tree_path",) def __init__(self, *args, parent=None, **kwargs): if parent is not None: self.__new_parent = parent super().__init__(*args, **kwargs) @property def parent(self): if self.__new_parent is not UNCHANGED: return self.__new_parent parent_path = self.tree_path[:-1] # pylint: disable=unsubscriptable-object return self.__class__.objects.get(tree_path=parent_path) @parent.setter def parent(self, new_parent): if new_parent is not None and new_parent.tree_path is None: raise ValueError("Parent node must be saved before receiving children") # Replace our tree_path with a new one that has our new parent's self.__new_parent = new_parent def __next_tree_path_qx(self, prefix=()): return DjPgTreeNext( models.Value(self._meta.db_table), models.Value(".".join(prefix)), GAP, PAD_LENGTH, ) def relocate(self, *, after=None, before=None): if after is None and before is None: raise ValueError("You must supply at least one of before or after") new_prev_child = after new_next_child = before if new_prev_child is None: new_prev_child = ( new_next_child.siblings.filter(tree_path__lt=new_next_child.tree_path) .order_by("tree_path") .last() ) # nb: if we are trying to move into the first position, after will be none if new_next_child is None: new_next_child = ( new_prev_child.siblings.filter(tree_path__gt=new_prev_child.tree_path) .order_by("tree_path") .first() ) if new_next_child is None: # this is the case where we want to move to the last position # we can just (re-)set the parent, since that's its default # behaviour self.parent = after.parent return if ( new_next_child is not None and new_prev_child.tree_path[:-1] != new_next_child.tree_path[:-1] ): raise ValueError("Before and after nodes aren't actually siblings") next_v = int(new_next_child.tree_path[-1]) if new_prev_child is None: self.tree_path = new_next_child.tree_path[:-1] + [ str(next_v // 2).zfill(PAD_LENGTH) ] else: prev_v = int(new_prev_child.tree_path[-1]) this_v = prev_v + (next_v - prev_v) // 2 self.tree_path = new_prev_child.tree_path[:-1] + [ str(this_v).zfill(PAD_LENGTH) ] def save(self, *args, **kwargs): # pylint: disable=arguments-differ tree_path_needs_refresh = False old_tree_path = None if self.__new_parent is None: old_tree_path = self.tree_path or None self.tree_path = self.__next_tree_path_qx([]) elif self.__new_parent is not UNCHANGED: tree_path_needs_refresh = True old_tree_path = self.tree_path or None self.tree_path = self.__next_tree_path_qx(self.__new_parent.tree_path) if not self.tree_path: tree_path_needs_refresh = True self.tree_path = self.__next_tree_path_qx() # If we haven't changed the parent, save as normal. if old_tree_path is None: rv = super().save(*args, **kwargs) # If we have, use a transaction to avoid other contexts seeing the intermediate # state where our descendants aren't connected to us. else: with atomic(): rv = super().save(*args, **kwargs) # Move all of our descendants along with us, by substituting our old # ltree prefix with our new one, in every descendant that # has that prefix. self.refresh_from_db(fields=("tree_path",)) tree_path_needs_refresh = False self.__class__.objects.filter( tree_path__descendant_of=old_tree_path ).update( tree_path=LtreeConcat( models.Value(".".join(self.tree_path)), Subpath(models.F("tree_path"), len(old_tree_path)), ) ) if tree_path_needs_refresh: self.refresh_from_db(fields=("tree_path",)) print( "for object {!r}, old_tree_path is {!r}, tree_path is {!r}".format( self, old_tree_path, self.tree_path ) ) return rv @property def ancestors(self): return self.__class__.objects.filter( tree_path__ancestor_of=self.tree_path ).exclude(pk=self.pk) @property def descendants(self): return self.__class__.objects.filter( tree_path__descendant_of=self.tree_path ).exclude(pk=self.pk) @property def children(self): return self.__class__.objects.filter( tree_path__matches_lquery=[*self.tree_path, "*{1}"] ) @property def family(self): return self.__class__.objects.filter( models.Q(tree_path__ancestor_of=self.tree_path) | models.Q(tree_path__descendant_of=self.tree_path) ) @property def siblings(self): return self.__class__.objects.filter( tree_path__matches_lquery=[*self.tree_path[:-1], "*{1}"] ).exclude(pk=self.pk) PK!ÓkIß ß django_pgtree/tests.pyimport pytest from testproject.testapp.models import TestModel as T pytestmark = pytest.mark.django_db @pytest.fixture def animal(): animal = T.objects.create(name="Animal") mammal = T.objects.create(name="Mammal", parent=animal) T.objects.create(name="Cat", parent=mammal) T.objects.create(name="Dog", parent=mammal) T.objects.create(name="Seal", parent=mammal) T.objects.create(name="Bear", parent=mammal) marsupial = T.objects.create(name="Marsupial", parent=animal) T.objects.create(name="Koala", parent=marsupial) T.objects.create(name="Kangaroo", parent=marsupial) T.objects.create(name="Plant") return animal def test_descendants(animal): assert [x.name for x in animal.descendants] == [ "Mammal", "Cat", "Dog", "Seal", "Bear", "Marsupial", "Koala", "Kangaroo", ] def test_ancestors(animal): koala = T.objects.get(name="Koala") assert [x.name for x in koala.ancestors] == ["Animal", "Marsupial"] def test_parent(animal): mammal = T.objects.get(name="Mammal") assert mammal.parent == animal def test_children(animal): assert [x.name for x in animal.children] == ["Mammal", "Marsupial"] def test_family(animal): mammal = T.objects.get(name="Mammal") assert [x.name for x in mammal.family] == [ "Animal", "Mammal", "Cat", "Dog", "Seal", "Bear", ] def test_reparent(animal): marsupial = T.objects.get(name="Marsupial") mammal = T.objects.get(name="Mammal") dog = T.objects.get(name="Dog") dog_tree_path = dog.tree_path plant = T.objects.get(name="Plant") plant_tree_path = plant.tree_path marsupial.parent = mammal marsupial.save() assert marsupial.tree_path[:2] == mammal.tree_path koala = T.objects.get(name="Koala") assert koala.parent == marsupial assert koala.tree_path[:2] == mammal.tree_path assert mammal in koala.ancestors dog.refresh_from_db() assert dog.tree_path == dog_tree_path plant.refresh_from_db() assert plant.tree_path == plant_tree_path def test_reparent_at_root(animal): marsupial = T.objects.get(name="Marsupial") dog = T.objects.get(name="Dog") dog_tree_path = dog.tree_path plant = T.objects.get(name="Plant") plant_tree_path = plant.tree_path marsupial.parent = None marsupial.save() assert len(marsupial.tree_path) == 1 koala = T.objects.get(name="Koala") assert koala.parent == marsupial assert len(koala.tree_path) == 2 assert animal not in koala.ancestors dog.refresh_from_db() assert dog.tree_path == dog_tree_path plant.refresh_from_db() assert plant.tree_path == plant_tree_path def test_roots(animal): roots = T.objects.roots() assert [x.name for x in roots] == ["Animal", "Plant"] def test_relocate_in_between(animal): seal = T.objects.get(name="Seal") cat = T.objects.get(name="Cat") seal.relocate(after=cat) seal.save() assert [x.name for x in cat.parent.children] == ["Cat", "Seal", "Dog", "Bear"] def test_relocate_in_between_at_root(animal): seal = T.objects.get(name="Seal") plant = T.objects.get(name="Plant") seal.relocate(before=plant) seal.save() assert [x.name for x in T.objects.roots()] == ["Animal", "Seal", "Plant"] def test_ordering_past_10(): for i in range(1, 12): T.objects.create(name=str(i)) assert [x.name for x in T.objects.all()] == [str(x) for x in range(1, 12)] PK!H$½ýTT#django_pgtree-0.1.2.dist-info/WHEEL Ê= €0 н§èRêÏÔ ˆ›ˆè\áC ’HŒCo¯o~Û \´BŸ"œ|¢ÀÐl¢ÉßÓêchÚлYÄh|hzWÙ“7}á–|ü±vÄ ÌÕ}PK!Hª¨Æ‘ÜÕ&django_pgtree-0.1.2.dist-info/METADATA•ŽÝNÃ0 Fïó~Fì"TÆÅ&FWQ×Vë¥KSœ´·'„Wôβ¿süÝSÄ#fO$}g`®gª@GšvÖg½B¤~gz¦çªœC9X Ö-ãFì½Àí' ï[(}ˆV(hµö޲mR?wÅî¹P[®© i±©V*bëÅ@Þ ƒ¡ŽêVô:pâ³ò”ÎéÿõÕB/Õêˆ!ðž)ß0vÕò¾ÿFͯL)Þ :Ç…mj7¤#ðe§ÅÔüŸÿ!Î'“‰KõPK!HÕ8ì|”Ø$django_pgtree-0.1.2.dist-info/RECORD­ÐËŽ¢@…á}? ( ¸˜J‚  HèM©‹ûpµ}úI&éÄîÅôf^àûO.’&oQ—=!k„hCG„VÝ3ÜH¿DYƒ^W(æî’°(8Ô ¬=™<â—5pΊÞÍý2é!ýá/\FI…‡WŒ*] ëËÐZœ›nÇ¥ÔÓ©@MfÐò˜Ã§ÝïtK /ÉÂ7¯¦yŸŒ´m†5Çqüß±4©^ÏNõä°9FQžäð‚ã38„ŽîÈeŽn¸*øåIo2‚ ý#🯨[Lª×+ÿ·únX%wGêý(ziŠOœ––Í#ØH1Þ=ð–/ pß½‘ ã+‡ýI‹÷¹1âÒ¿Vm•«™›fôDÃl0îXA!s00ø¯Ë­ø•°ÂtYÚdí:2!<~Ú¦üÑë…rµ]‚ÙeÚØ±Ë0K11CþüxN¹ärâ%à1ŠøƒìÀ@ÕÔ@ýÄ{Ëææ `cqz^°m"Í ¥¹‘ÏQ-h+Ö¬\³uQÚþ ûpò5†yûPK!¤django_pgtree/__init__.pyPK!¯ÊH<ˆˆ¤7django_pgtree/fields.pyPK! ŽwÖÖ(¤ôdjango_pgtree/migrations/0001_initial.pyPK!$¤django_pgtree/migrations/__init__.pyPK!â¸7 dd¤Rdjango_pgtree/models.pyPK!ÓkIß ß ¤ë)django_pgtree/tests.pyPK!H$½ýTT#€þ7django_pgtree-0.1.2.dist-info/WHEELPK!Hª¨Æ‘ÜÕ&€“8django_pgtree-0.1.2.dist-info/METADATAPK!HÕ8ì|”Ø$€³9django_pgtree-0.1.2.dist-info/RECORDPK ´‰;