PK ! Qmg cimpyorm/Model/Elements/Base.pyfrom typing import Union
from lxml import etree
from lxml.etree import XPath
from sqlalchemy import Column, String, ForeignKey
from cimpyorm.auxiliary import log
from cimpyorm.Model import auxiliary as aux
def prefix_ns(func):
"""
Prefixes a property return value with the elements xml-namespace (if its not the default namespace "cim").
Creates unique labels for properties and classes.
"""
def wrapper(obj):
"""
:param obj: Object that implements the namespace property (E.g. CIMClass/CIMProp)
:return: Representation with substituted namespace
"""
s = func(obj)
res = []
if s and isinstance(s, list):
for element in s:
if element.startswith("#"):
element = "".join(element.split("#")[1:])
for key, value in obj.nsmap.items():
if value in element:
element = element.replace(value, key+"_")
res.append(element)
elif s:
if s.startswith("#"):
s = "".join(s.split("#")[1:])
for key, value in obj.nsmap.items():
if value in s:
s = s.replace(value, key + "_")
res = s
else:
res = None
return res
return wrapper
class SchemaElement(aux.Base):
"""
ABC for schema entities.
"""
__tablename__ = "SchemaElement"
nsmap = None
XPathMap = None
name = Column(String(80), primary_key=True)
label = Column(String(50))
namespace = Column(String(30))
type_ = Column(String(50))
#comment = Column(String(300))
__mapper_args__ = {
"polymorphic_on": type_,
"polymorphic_identity": __tablename__
}
def __init__(self, description=None):
"""
The ABC's constructor
:param description: the (merged) xml node element containing the class's description
"""
if description is None:
log.error(f"Initialisation of CIM model entity without associated "
f"description invalid.")
raise ValueError(f"Initialisation of CIM model entity without "
f"associated description invalid.")
self.description = description
self.Attributes = self._raw_Attributes()
self.name = self._name
self.label = self._label
self.namespace = self._namespace
self.Map = None
@staticmethod
def _raw_Attributes():
return {"name": None, "label": None, "namespace": None}
@classmethod
def _generateXPathMap(cls):
"""
Generator for compiled XPath expressions (those require a namespace map to be present, hence they are compiled
at runtime)
:return: None
"""
cls.XPathMap = {"label": XPath(r"rdfs:label/text()", namespaces=cls.nsmap)}
return cls.XPathMap
@property
@prefix_ns
def _label(self):
"""
Return the class' label
:return: str
"""
return self._raw_property("label")
@property
def _namespace(self) -> Union[str, None]:
if not self.Attributes["namespace"]:
if not any(self.name.startswith(ns+"_") for ns in self.nsmap.keys()):
self.Attributes["namespace"] = "cim"
else:
self.Attributes["namespace"] = self.name.split("_")[0]
return self.Attributes["namespace"]
@property
def _comment(self):
"""
Return the class' label
:return: str
"""
# Fixme: This is very slow and not very nice (each string contains the entire xml header - parsing xpath(
# "*/text()) doesn't work due to the text containing xml tags). Therefore, this is currently disabled
str_ = "".join(str(etree.tostring(content, pretty_print=True)) for content in self.description.xpath(
"rdfs:comment", namespaces=self.nsmap))
return str_
@property
@prefix_ns
def _name(self) -> Union[str, None]:
"""
Accessor for an entities name (with cache)
:return: The entities name as defined in its description
"""
if self.Attributes["name"]:
pass
else:
_n = self.description.values()[0]
self.Attributes["name"] = _n
self.name = self.Attributes["name"]
return self.Attributes["name"]
def _raw_property(self, property_identifier) -> Union[list, str, None]:
"""
Extract a property from the CIM entity
:param property_identifier: property name
:return: The CIM entity's property as a list, a string, or None
"""
if self.Attributes[property_identifier] is None:
xp = self.XPathMap
if property_identifier not in xp.keys():
raise KeyError(f"Invalid property_identifier name {property_identifier}.")
results = xp[property_identifier](self.description) # pylint: disable=unsubscriptable-object
if len(set(results)) == 1:
self.Attributes[property_identifier] = results[0]
elif not results:
self.Attributes[property_identifier] = None
else:
log.warning(f"Ambiguous class property_identifier ({property_identifier}) for {self.name}.")
self.Attributes[property_identifier] = [result for result in set(results)]
return self.Attributes[property_identifier]
def describe(self, fmt="psql"):
print(self)
class CIMPackage(SchemaElement):
__tablename__ = "CIMPackage"
name = Column(String(80), ForeignKey(SchemaElement.name), primary_key=True)
__mapper_args__ = {
"polymorphic_identity": __tablename__
}
def __init__(self, description):
"""
Class constructor
:param description: the (merged) xml node element containing the package's description
"""
super().__init__(description)
PK ! NqW
, , cimpyorm/Model/Elements/Class.pyfrom collections import OrderedDict, defaultdict
import pandas as pd
from lxml.etree import XPath
from sqlalchemy import Column, String, ForeignKey, Integer
from sqlalchemy.orm import relationship
from tabulate import tabulate
from cimpyorm.auxiliary import log, shorten_namespace
from cimpyorm.Model.Elements import SchemaElement, CIMPackage, CIMEnum, prefix_ns
from cimpyorm.Model.Parseable import Parseable
from cimpyorm.auxiliary import chunks
class CIMClass(SchemaElement):
"""
Class representing a CIM Model Class
"""
__tablename__ = "CIMClass"
name = Column(String(80), ForeignKey(SchemaElement.name), primary_key=True)
package_name = Column(String(50), ForeignKey(CIMPackage.name))
package = relationship(CIMPackage, foreign_keys=package_name, backref="classes")
parent_name = Column(String(50), ForeignKey("CIMClass.name"))
parent = relationship("CIMClass", foreign_keys=parent_name, backref="children", remote_side=[name])
__mapper_args__ = {
"polymorphic_identity": __tablename__
}
def __init__(self, description=None):
"""
Class constructor
:param description: the (merged) xml node element containing the class's description
"""
super().__init__(description)
self.class_ = None
self.Attributes = self._raw_Attributes()
self.package_name = self._belongsToCategory if not \
isinstance(self._belongsToCategory, list) \
else self._belongsToCategory[0] # pylint: disable=unsubscriptable-object
self.parent_name = self._parent_name
self.props = []
@staticmethod
def _raw_Attributes():
return {**SchemaElement._raw_Attributes(),
**{"parent": None,
"category": None,
"namespace": None}
}
@classmethod
def _generateXPathMap(cls):
"""
Compile XPath Expressions for later use (better performance than tree.xpath(...))
:return: None
"""
super()._generateXPathMap()
Map = {
"parent": XPath(r"rdfs:subClassOf/@rdf:resource", namespaces=cls.nsmap),
"category": XPath(r"cims:belongsToCategory/@rdf:resource", namespaces=cls.nsmap)
}
if not cls.XPathMap:
cls.XPathMap = Map
else:
cls.XPathMap = {**cls.XPathMap, **Map}
@property
@prefix_ns
def _belongsToCategory(self):
"""
Return the class' category as determined from the schema
:return: str
"""
return self._raw_property("category")
@property
@prefix_ns
def _parent_name(self):
"""
Return the class' parent as determined from the schema
:return: str
"""
return self._raw_property("parent")
def init_type(self, base):
"""
Initialize ORM type using the CIMClass object
:return: None
"""
log.debug(f"Initializing class {self.name}.")
attrs = OrderedDict()
attrs["__tablename__"] = self.name
self.Map = dict()
if self.parent:
attrs["id"] = Column(String(50), ForeignKey(f"{self.parent.name}.id",
ondelete="CASCADE"), primary_key=True)
log.debug(f"Created id column on {self.name} with FK on {self.parent.name}.")
attrs["__mapper_args__"] = {
"polymorphic_identity": self.name
}
else: # Base class
attrs["type_"] = Column(String(50))
attrs["_source_id"] = Column(Integer, ForeignKey("SourceInfo.id"))
attrs["_source"] = relationship("SourceInfo", foreign_keys=attrs["_source_id"])
attrs["id"] = Column(String(50), primary_key=True)
log.debug(f"Created id column on {self.name} with no inheritance.")
attrs["__mapper_args__"] = {
"polymorphic_on": attrs["type_"],
"polymorphic_identity": self.name}
attrs["_schema_class"] = self
if self.parent:
self.class_ = type(self.name, (self.parent.class_,), attrs)
else: # Base class
self.class_ = type(self.name, (Parseable, base,), attrs)
log.debug(f"Defined class {self.name}.")
def generate(self, nsmap):
for prop in self.props:
prop.generate(nsmap)
def _generate_map(self):
"""
Generate the parse-map so it finds all properties (even those named after the ancestor in the hierarchy)
:return: None
"""
# Make sure the CIM Parent Class is always first in __bases__
if self.parent:
self.Map = {**self.parent._generate_map(), **self.Map} # pylint: disable=no-member
return self.Map
@property
def prop_keys(self):
if self.parent:
return self.parent.prop_keys + [prop.key for prop in self.props]
else:
return [prop.key for prop in self.props]
@property
def all_props(self):
_all_props = {}
for prop in self.props:
if prop.namespace is None or prop.namespace == "cim":
_all_props[prop.label] = prop
else:
_all_props[f"{prop.namespace}_{prop.label}"] = prop
if self.parent:
return {**self.parent.all_props, **_all_props}
else:
return _all_props
def parse_values(self, el, session):
if not self.parent:
argmap = {}
insertables = []
else:
argmap, insertables = self.parent.parse_values(el, session)
props = [prop for prop in self.props if prop.used]
for prop in props:
value = prop.xpath(el)
if prop.many_remote and prop.used:
_id = [el.attrib.values()[0]]
_remote_ids = []
if len(set(value)) > 1:
for raw_value in value:
_remote_ids = _remote_ids + [v for v in raw_value.split("#") if len(v)]
else:
_remote_ids = [v for v in value[0].split("#") if len(v)]
_ids = _id * len(_remote_ids)
# Insert tuples in chunks of 400 elements max
for chunk in chunks(list(zip(_ids, _remote_ids)), 400):
_ins = prop.association_table.insert(
[{f"{prop.domain.label}_id": _id,
f"{prop.range.label}_id": _remote_id}
for (_id, _remote_id) in chunk])
insertables.append(_ins)
elif len(value) == 1 or len(set(value)) == 1:
value = value[0]
if isinstance(prop.range, CIMEnum):
argmap[prop.key] = shorten_namespace(value, self.nsmap)
else:
try:
t = prop.mapped_datatype
if t == "Float":
argmap[prop.key] = float(value)
elif t == "Boolean":
argmap[prop.key] = value.lower() == "true"
elif t == "Integer":
argmap[prop.key] = int(value)
elif len([v for v in value.split("#") if v]) > 1:
log.warning(
f"Ambiguous data values for {self.name}:{prop.key}: {len(set(value))} unique values. "
f"(Skipped)")
# If reference doesn't resolve value is set to None (Validation
# has to catch missing obligatory values)
else:
argmap[prop.key] = value.replace("#", "")
except ValueError:
argmap[prop.key] = value.replace("#", "")
elif len(value) > 1:
log.warning(f"Ambiguous data values for {self.name}:{prop.key}: {len(set(value))} unique values. "
f"(Skipped)")
# If reference doesn't resolve value is set to None (Validation
# has to catch missing obligatory values)
return argmap, insertables
def to_html(self, **kwargs):
df = self.property_table()
return df.to_html(**kwargs)
def describe(self, fmt="psql"):
df = self.property_table()
tab = tabulate(df, headers="keys", showindex=False, tablefmt=fmt, stralign="right")
c = self
inh = dict()
inh["Hierarchy"] = [c.name]
inh["Number of native properties"] = [len(c.props)]
while c.parent:
inh["Hierarchy"].append(c.parent.name)
inh["Number of native properties"].append(len(c.parent.props))
c = c.parent
for val in inh.values():
val.reverse()
inh = tabulate(pd.DataFrame(inh),
headers="keys", showindex=False, tablefmt=fmt, stralign="right")
print(inh + "\n" + tab)
def property_table(self):
table = defaultdict(list)
for key, prop in self.all_props.items():
table["Label"].append(key)
table["Domain"].append(prop.domain.name)
table["Multiplicity"].append(prop.multiplicity)
table["Optional"].append(prop.optional)
try:
table["Datatype"].append(prop.datatype.label)
except AttributeError:
table["Datatype"].append(f"*{prop.range.label}")
try:
nominator_unit = prop.datatype.unit.symbol.label
if nominator_unit.lower() == "none":
nominator_unit = None
except AttributeError:
nominator_unit = None
try:
denominator_unit = prop.datatype.denominator_unit.symbol.label
if denominator_unit.lower() == "none":
denominator_unit = None
except AttributeError:
denominator_unit = None
if nominator_unit and denominator_unit:
table["Unit"].append(f"{nominator_unit}/{denominator_unit}")
elif nominator_unit:
table["Unit"].append(f"{nominator_unit}")
elif denominator_unit:
table["Unit"].append(f"1/{denominator_unit}")
else:
table["Unit"].append("-")
try:
nominator_mpl = prop.datatype.multiplier.value.label
if nominator_mpl.lower() == "none":
nominator_mpl = None
except AttributeError:
nominator_mpl = None
try:
denominator_mpl = prop.datatype.denominator_multiplier.value.label
if denominator_mpl.lower() == "none":
denominator_mpl = None
except AttributeError:
denominator_mpl = None
if nominator_mpl and denominator_mpl:
table["Multiplier"].append(f"{nominator_mpl}/{denominator_mpl}")
elif nominator_mpl:
table["Multiplier"].append(f"{nominator_mpl}")
elif denominator_mpl:
table["Multiplier"].append(f"1/{denominator_mpl}")
else:
table["Multiplier"].append("-")
table["Inferred"].append(not prop.used)
df = pd.DataFrame(table)
return df
PK ! J cimpyorm/Model/Elements/Enum.pyfrom collections import defaultdict
import pandas as pd
from lxml.etree import XPath
from sqlalchemy import Column, String, ForeignKey
from sqlalchemy.orm import relationship
from tabulate import tabulate
from cimpyorm.Model.Elements import SchemaElement, prefix_ns
class CIMEnum(SchemaElement):
__tablename__ = "CIMEnum"
name = Column(String(80), ForeignKey(SchemaElement.name), primary_key=True)
__mapper_args__ = {
"polymorphic_identity": __tablename__
}
def __init__(self, description):
"""
Class constructor
:param description: the (merged) xml node element containing the enums's description
"""
super().__init__(description)
self.Attributes = self._raw_Attributes()
@staticmethod
def _raw_Attributes():
return {**SchemaElement._raw_Attributes(),
**{"category": None}}
@classmethod
def _generateXPathMap(cls):
super()._generateXPathMap()
Map = {"category": XPath(r"cims:belongsToCategory/@rdf:resource", namespaces=cls.nsmap)}
if not cls.XPathMap:
cls.XPathMap = Map
else:
cls.XPathMap = {**cls.XPathMap, **Map}
@property
@prefix_ns
def _category(self):
"""
Return the enums' category as determined from the schema
:return: str
"""
return self._raw_property("category")
def describe(self, fmt="psql"):
table = defaultdict(list)
for value in self.values:
table["Value"].append(value.label)
df = pd.DataFrame(table)
print(tabulate(df, headers="keys", showindex=False, tablefmt=fmt, stralign="right"))
class CIMEnumValue(SchemaElement):
__tablename__ = "CIMEnumValue"
name = Column(String(80), ForeignKey(SchemaElement.name), primary_key=True)
enum_name = Column(String(50), ForeignKey(CIMEnum.name))
enum = relationship(CIMEnum, foreign_keys=enum_name, backref="values")
__mapper_args__ = {
"polymorphic_identity": __tablename__
}
def __init__(self, description):
"""
Class constructor
:param description: the (merged) xml node element containing the enums's description
"""
super().__init__(description)
self.Attributes = self._raw_Attributes()
self.enum_name = self._enum_name
@staticmethod
def _raw_Attributes():
return {**SchemaElement._raw_Attributes(),
**{"type": None}}
@classmethod
def _generateXPathMap(cls):
super()._generateXPathMap()
Map = {"type": XPath(r"rdf:type/@rdf:resource", namespaces=cls.nsmap)}
if not cls.XPathMap:
cls.XPathMap = Map
else:
cls.XPathMap = {**cls.XPathMap, **Map}
@property
@prefix_ns
def _enum_name(self):
"""
Return the enums' category as determined from the schema
:return: str
"""
return self._raw_property("type")
PK ! UGV V # cimpyorm/Model/Elements/Property.pyfrom collections import OrderedDict
from typing import Union
from lxml.etree import XPath
from sqlalchemy import Column, String, ForeignKey, Boolean, Float, Integer, Table
from sqlalchemy.orm import relationship, backref
from cimpyorm.auxiliary import log
from cimpyorm.Model import auxiliary as aux
from cimpyorm.Model.Elements import SchemaElement, CIMPackage, CIMClass, CIMEnumValue, CIMEnum, prefix_ns
class CIMDT(SchemaElement):
__tablename__ = "CIMDT"
name = Column(String(80), ForeignKey(SchemaElement.name), primary_key=True)
package_name = Column(String(50), ForeignKey(CIMPackage.name))
package = relationship(CIMPackage, foreign_keys=package_name, backref="datatypes")
stereotype = Column(String(30))
__mapper_args__ = {
"polymorphic_identity": __tablename__
}
def __init__(self, description):
"""
Class constructor
:param description: the (merged) xml node element containing the enums's description
"""
super().__init__(description)
self.Attributes = self._raw_Attributes()
self.package_name = self._category
self.stereotype = self._stereotype
@staticmethod
def _raw_Attributes():
return {**SchemaElement._raw_Attributes(),
**{"category": None,
"stereotype": None}}
@classmethod
def _generateXPathMap(cls):
super()._generateXPathMap()
Map = {
"category": XPath(r"cims:belongsToCategory/@rdf:resource", namespaces=cls.nsmap),
"stereotype": XPath(r"cims:stereotype/text()", namespaces=cls.nsmap)
}
if not cls.XPathMap:
cls.XPathMap = Map
else:
cls.XPathMap = {**cls.XPathMap, **Map}
@property
@prefix_ns
def _stereotype(self):
"""
Return the enums' category as determined from the schema
:return: str
"""
return self._raw_property("stereotype")
@property
@prefix_ns
def _category(self):
"""
Return the enums' category as determined from the schema
:return: str
"""
return self._raw_property("category")
@property
def mapped_datatype(self):
return self.value.datatype.name
class CIMDTProperty(SchemaElement):
__tablename__ = "CIMDTProperty"
name = Column(String(80), ForeignKey(SchemaElement.name), primary_key=True)
belongs_to_name = Column(String(50), ForeignKey(CIMDT.name))
belongs_to = relationship(CIMDT, foreign_keys=belongs_to_name, backref="props")
multiplicity = Column(String(10))
many_remote = Column(Boolean)
__mapper_args__ = {
"polymorphic_identity": __tablename__
}
def __init__(self, description):
"""
Class constructor
:param description: the (merged) xml node element containing the property's description
"""
super().__init__(description)
self.associated_class = None
self._inverseProperty = None
self.Attributes = self._raw_Attributes()
self.belongs_to_name = self._domain
self.multiplicity = self._multiplicity
self.many_remote = self._many_remote
@staticmethod
def _raw_Attributes():
return {**SchemaElement._raw_Attributes(),
**{"namespace": None, "domain": None, "multiplicity": None}}
@classmethod
def _generateXPathMap(cls):
super()._generateXPathMap()
Map = {
"domain": XPath(r"rdfs:domain/@rdf:resource", namespaces=cls.nsmap),
"multiplicity": XPath(r"cims:multiplicity/@rdf:resource", namespaces=cls.nsmap)
}
if not cls.XPathMap:
cls.XPathMap = Map
else:
cls.XPathMap = {**cls.XPathMap, **Map}
@property
@prefix_ns
def _domain(self):
"""
Return the class' category as determined from the schema
:return: str
"""
return self._raw_property("domain")
@property
@prefix_ns
def _multiplicity(self):
mp = self._raw_property("multiplicity")
return mp.split("M:")[-1] if not isinstance(mp, list) \
else mp[0].split("M:")[-1] # pylint: disable=unsubscriptable-object
@property
def _many_remote(self):
if isinstance(self._multiplicity, list):
return any([mp[-1] in ["2", "n"] for mp in self._multiplicity]) # pylint: disable=not-an-iterable
else:
return self._multiplicity[-1] in ["2", "n"]
class CIMDTUnit(CIMDTProperty):
__tablename__ = "CIMDTUnit"
name = Column(String(80), ForeignKey(CIMDTProperty.name), primary_key=True)
belongs_to = relationship(CIMDT, foreign_keys=CIMDTProperty.belongs_to_name, backref=backref("unit", uselist=False))
symbol_name = Column(String(50), ForeignKey(CIMEnumValue.name))
symbol = relationship(CIMEnumValue, foreign_keys=symbol_name)
__mapper_args__ = {
"polymorphic_identity": __tablename__
}
def __init__(self, description):
"""
Class constructor
:param description: the (merged) xml node element containing the enums's description
"""
super().__init__(description)
self.Attributes = self._raw_Attributes()
self.symbol_name = self._symbol
@staticmethod
def _raw_Attributes():
return {**CIMDTProperty._raw_Attributes(),
**{"isFixed": None}}
@classmethod
def _generateXPathMap(cls):
super()._generateXPathMap()
Map = {"isFixed": XPath(r"cims:isFixed/@rdfs:Literal", namespaces=cls.nsmap)}
if not cls.XPathMap:
cls.XPathMap = Map
else:
cls.XPathMap = {**cls.XPathMap, **Map}
@property
@prefix_ns
def _symbol(self):
"""
Return the enums' category as determined from the schema
:return: str
"""
return f"UnitSymbol.{self._raw_property('isFixed')}"
class CIMDTValue(CIMDTProperty):
__tablename__ = "CIMDTValue"
name = Column(String(80), ForeignKey(CIMDTProperty.name), primary_key=True)
belongs_to = relationship(CIMDT, foreign_keys=CIMDTProperty.belongs_to_name,
backref=backref("value", uselist=False))
datatype_name = Column(String(50), ForeignKey(CIMDT.name))
datatype = relationship(CIMDT, foreign_keys=datatype_name, backref="values")
__mapper_args__ = {
"polymorphic_identity": __tablename__
}
def __init__(self, description):
"""
Class constructor
:param description: the (merged) xml node element containing the property's description
"""
super().__init__(description)
self.Attributes = self._raw_Attributes()
self.datatype_name = self._datatype
@staticmethod
def _raw_Attributes():
return {**CIMDTProperty._raw_Attributes(), **{"datatype": None}}
@classmethod
def _generateXPathMap(cls):
super()._generateXPathMap()
Map = {"datatype": XPath(r"cims:dataType/@rdf:resource", namespaces=cls.nsmap)}
if not cls.XPathMap:
cls.XPathMap = Map
else:
cls.XPathMap = {**cls.XPathMap, **Map}
@property
@prefix_ns
def _datatype(self):
return self._raw_property("datatype")
class CIMDTMultiplier(CIMDTProperty):
__tablename__ = "CIMDTMultiplier"
name = Column(String(80), ForeignKey(CIMDTProperty.name), primary_key=True)
belongs_to = relationship(CIMDT, foreign_keys=CIMDTProperty.belongs_to_name,
backref=backref("multiplier", uselist=False))
value_name = Column(String(50), ForeignKey(CIMEnumValue.name))
value = relationship(CIMEnumValue, foreign_keys=value_name)
__mapper_args__ = {
"polymorphic_identity": __tablename__
}
def __init__(self, description):
"""
Class constructor
:param description: the (merged) xml node element containing the enums's description
"""
super().__init__(description)
self.Attributes = self._raw_Attributes()
self.value_name = self._value
@staticmethod
def _raw_Attributes():
return {**CIMDTProperty._raw_Attributes(),
**{"isFixed": None}}
@classmethod
def _generateXPathMap(cls):
super()._generateXPathMap()
Map = {"isFixed": XPath(r"cims:isFixed/@rdfs:Literal", namespaces=cls.nsmap)}
if not cls.XPathMap:
cls.XPathMap = Map
else:
cls.XPathMap = {**cls.XPathMap, **Map}
@property
@prefix_ns
def _value(self):
"""
Return the enums' category as determined from the schema
:return: str
"""
return f"UnitMultiplier.{self._raw_property('isFixed')}"
class CIMDTDenominatorUnit(CIMDTProperty):
__tablename__ = "CIMDTDenominatorUnit"
name = Column(String(80), ForeignKey(CIMDTProperty.name), primary_key=True)
belongs_to = relationship(CIMDT, foreign_keys=CIMDTProperty.belongs_to_name,
backref=backref("denominator_unit", uselist=False))
symbol_name = Column(String(50), ForeignKey(CIMEnumValue.name))
symbol = relationship(CIMEnumValue, foreign_keys=symbol_name)
__mapper_args__ = {
"polymorphic_identity": __tablename__
}
def __init__(self, description):
"""
Class constructor
:param description: the (merged) xml node element containing the enums's description
"""
super().__init__(description)
self.Attributes = self._raw_Attributes()
self.symbol_name = self._symbol
@staticmethod
def _raw_Attributes():
return {**CIMDTProperty._raw_Attributes(),
**{"isFixed": None}}
@classmethod
def _generateXPathMap(cls):
super()._generateXPathMap()
Map = {"isFixed": XPath(r"cims:isFixed/@rdfs:Literal", namespaces=cls.nsmap)}
if not cls.XPathMap:
cls.XPathMap = Map
else:
cls.XPathMap = {**cls.XPathMap, **Map}
@property
@prefix_ns
def _symbol(self):
"""
Return the enums' category as determined from the schema
:return: str
"""
return f"UnitSymbol.{self._raw_property('isFixed')}"
class CIMDTDenominatorMultiplier(CIMDTProperty):
__tablename__ = "CIMDTDenominatorMultiplier"
name = Column(String(80), ForeignKey(CIMDTProperty.name), primary_key=True)
belongs_to = relationship(CIMDT, foreign_keys=CIMDTProperty.belongs_to_name,
backref=backref("denominator_multiplier", uselist=False))
value_name = Column(String(50), ForeignKey(CIMEnumValue.name))
value = relationship(CIMEnumValue, foreign_keys=value_name)
__mapper_args__ = {
"polymorphic_identity": __tablename__
}
def __init__(self, description):
"""
Class constructor
:param description: the (merged) xml node element containing the enums's description
"""
super().__init__(description)
self.Attributes = self._raw_Attributes()
self.value_name = self._value
@staticmethod
def _raw_Attributes():
return {**CIMDTProperty._raw_Attributes(),
**{"isFixed": None}}
@classmethod
def _generateXPathMap(cls):
super()._generateXPathMap()
Map = {"isFixed": XPath(r"cims:isFixed/@rdfs:Literal", namespaces=cls.nsmap)}
if not cls.XPathMap:
cls.XPathMap = Map
else:
cls.XPathMap = {**cls.XPathMap, **Map}
@property
@prefix_ns
def _value(self):
"""
Return the enums' category as determined from the schema
:return: str
"""
return f"UnitMultiplier.{self._raw_property('isFixed')}"
class CIMProp(SchemaElement):
"""
Class representing a CIM Model property
"""
# pylint: disable=too-many-instance-attributes
__tablename__ = "CIMProp"
XPathMap = None
name = Column(String(80), ForeignKey(SchemaElement.name), primary_key=True)
prop_name = Column(String(50))
cls_name = Column(String(50), ForeignKey(CIMClass.name))
cls = relationship(CIMClass, foreign_keys=cls_name, backref="props")
datatype_name = Column(String(50), ForeignKey(CIMDT.name))
datatype = relationship(CIMDT, foreign_keys=datatype_name, backref="usedby")
inverse_property_name = Column(String(80), ForeignKey("CIMProp.name"))
inverse = relationship("CIMProp", foreign_keys=inverse_property_name, uselist=False)
domain_name = Column(String(50), ForeignKey(CIMClass.name))
domain = relationship(CIMClass, foreign_keys=domain_name, backref="domain_elements")
range_name = Column(String(50), ForeignKey(CIMClass.name))
range = relationship(CIMClass, foreign_keys=range_name, backref="range_elements")
used = Column(Boolean)
multiplicity = Column(String(10))
many_remote = Column(Boolean)
optional = Column(Boolean)
__mapper_args__ = {
"polymorphic_identity": __tablename__
}
def __init__(self, description):
"""
Class constructor
:param description: the (merged) xml node element containing the property's description
"""
super().__init__(description)
self._inverseProperty = None
self.Attributes = self._raw_Attributes()
self.cls_name = self._domain
self.prop_name = self.name.split(".")[-1]
self.datatype_name = self._datatype
self.inverse_property_name = self._inversePropertyName
self.domain_name = self._domain
self.range_name = self._range
self.used = self._used
self.multiplicity = self._multiplicity
self.many_remote = self._many_remote
self.optional = self._optional
self.key = None
self.var_key = None
self.xpath = None
self.association_table = None
@staticmethod
def _raw_Attributes():
return {**SchemaElement._raw_Attributes(),
**{"range": None, "used": None, "association": None, "domain": None, "inverseRoleName": None,
"multiplicity": None, "datatype": None, "namespace": None}}
@classmethod
def _generateXPathMap(cls):
super()._generateXPathMap()
Map = {
"label": XPath(r"rdfs:label/text()", namespaces=cls.nsmap),
"association": XPath(r"cims:AssociationUsed/text()", namespaces=cls.nsmap),
"inverseRoleName": XPath(r"cims:inverseRoleName/@rdf:resource", namespaces=cls.nsmap),
"datatype": XPath(r"cims:dataType/@rdf:resource", namespaces=cls.nsmap),
"multiplicity": XPath(r"cims:multiplicity/@rdf:resource", namespaces=cls.nsmap),
"type": XPath(r"rdf:type/@rdf:resource", namespaces=cls.nsmap),
"domain": XPath(r"rdfs:domain/@rdf:resource", namespaces=cls.nsmap),
"range": XPath(r"rdfs:range/@rdf:resource", namespaces=cls.nsmap)
}
if not cls.XPathMap:
cls.XPathMap = Map
else:
cls.XPathMap = {**cls.XPathMap, **Map}
@property
def _used(self):
"""
Determine whether the property needs to be added to the SQLAlchemy declarative class (i.e. it is not an
inverseProperty of an existing mapper or it maps to a value, not a reference).
:return: True if property should be represented in the SQLAlchemy declarative model.
"""
return bool(self._association) or self._inversePropertyName is None
@property
@prefix_ns
def _datatype(self):
return self._raw_property("datatype")
@property
@prefix_ns
def _multiplicity(self):
mp = self._raw_property("multiplicity")
return mp.split("M:")[-1] if not isinstance(mp, list) \
else mp[0].split("M:")[-1] # pylint: disable=unsubscriptable-object
@property
def _association(self) -> Union[bool, None]:
association = self._raw_property("association")
if not association:
return None
elif isinstance(association, list):
if len(set(association)) == 1:
return association[0] == "Yes" # pylint: disable=E1136
elif not set(association):
return None
else:
raise ValueError(f"Ambiguous association used parameter for property {self.name}.")
else:
return association == "Yes"
@property
@prefix_ns
def _inversePropertyName(self):
return self._raw_property("inverseRoleName")
@property
@prefix_ns
def _range(self):
return self._raw_property("range")
@property
@prefix_ns
def _domain(self):
return self._raw_property("domain")
@property
def mapped_datatype(self): # pylint: disable=inconsistent-return-statements
if self.datatype:
if self.datatype.stereotype == "Primitive":
return self.datatype.name
elif self.datatype.stereotype == "CIMDatatype":
return self.datatype.mapped_datatype
else:
return None
@property
def _many_remote(self):
if isinstance(self._multiplicity, list):
return any([mp[-1] in ["2", "n"] for mp in self._multiplicity]) # pylint: disable=not-an-iterable
else:
return self._multiplicity[-1] in ["2", "n"]
@property
def _optional(self):
if isinstance(self._multiplicity, list):
return any([mp.startswith("0") for mp in self._multiplicity]) # pylint: disable=not-an-iterable
else:
return self._multiplicity.startswith("0")
def generate(self, nsmap):
attrs = OrderedDict()
dt = self.mapped_datatype
if self.used:
if isinstance(self.range, CIMEnum):
var, query_base = self.name_query()
attrs[f"{var}_name"] = Column(String(120), ForeignKey(CIMEnumValue.name), name=f"{var}_name")
attrs[var] = relationship(CIMEnumValue,
foreign_keys=attrs[f"{var}_name"])
self.key = f"{var}_name"
self.xpath = XPath(query_base + "/@rdf:resource", namespaces=nsmap)
elif self.range:
self.generate_relationship(nsmap)
elif not self.range:
var, query_base = self.name_query()
log.debug(f"Generating property for {var} on {self.name}")
self.key = var
self.xpath = XPath(query_base + "/text()", namespaces=nsmap)
if dt:
if dt == "String":
attrs[var] = Column(String(50), name=f"{var}")
elif dt in ("Float", "Decimal"):
attrs[var] = Column(Float, name=f"{var}")
elif dt == "Integer":
attrs[var] = Column(Integer, name=f"{var}")
elif dt == "Boolean":
attrs[var] = Column(Boolean, name=f"{var}")
else:
attrs[var] = Column(String(30), name=f"{var}")
else:
# Fallback to parsing as String(50)
attrs[var] = Column(String(50), name=f"{var}")
for attr, attr_value in attrs.items():
setattr(self.cls.class_, attr, attr_value)
def set_var_key(self):
end = ""
if isinstance(self.range, CIMEnum):
end = "_name"
elif self.range:
end = "_id"
self.var_key = self.namespace + "_" + self.label if self.namespace != "cim" else self.label + end
def name_query(self):
var = self.namespace + "_" + self.label if self.namespace != "cim" else self.label
query_base = f"{self.domain.label}.{self.label}" if self.domain.label.startswith(self.namespace) else \
f"{self.namespace}:{self.domain.label}.{self.label}"
return var, query_base
def generate_relationship(self, nsmap=None):
var, query_base = self.name_query()
attrs = {}
Map = {}
log.debug(f"Generating relationship for {var} on {self.name}")
if self.many_remote:
if self.inverse:
br = self.inverse.label if self.namespace == "cim" else self.namespace + "_" + self.inverse.label
tbl = self.generate_association_table()
self.association_table = tbl
attrs[var] = relationship(self.range.label,
secondary=tbl,
backref=br)
else:
tbl = self.generate_association_table()
attrs[var] = relationship(self.range.label,
secondary=tbl)
else:
attrs[f"{var}_id"] = Column(String(50),
ForeignKey(f"{self.range.label}.id"),
name=f"{var}_id")
if self.inverse:
br = self.inverse.label if self.namespace == "cim" else self.namespace+"_"+self.inverse.label
attrs[var] = relationship(self.range.label,
foreign_keys=attrs[f"{var}_id"],
backref=br)
else:
attrs[var] = relationship(self.range.label,
foreign_keys=attrs[f"{var}_id"])
self.key = f"{var}_id"
self.xpath = XPath(query_base + "/@rdf:resource", namespaces=nsmap)
class_ = self.cls.class_
for attr, attr_value in attrs.items():
setattr(class_, attr, attr_value)
return Map
def generate_association_table(self):
association_table = Table(f".asn_{self.domain.label}_{self.range.label}", aux.Base.metadata,
Column(f"{self.range.label}_id", String(50), ForeignKey(f"{self.range.label}.id")),
Column(f"{self.domain.label}_id", String(50), ForeignKey(f"{self.domain.label}.id")))
return association_table
PK ! yfU U # cimpyorm/Model/Elements/__init__.pyfrom .Base import *
from .Enum import *
from .Class import *
from .Property import *
PK ! ; ; cimpyorm/Model/Parseable.py#
# Copyright (c) 2018 - 2018 Thomas Offergeld (offergeld@ifht.rwth-aachen.de)
# Institute for High Voltage Technology
# RWTH Aachen University
#
# This module is part of cimpyorm.
#
# cimpyorm is licensed under the BSD-3-Clause license.
# For further information see LICENSE in the project's root directory.
#
from lxml.etree import XPath
class Parseable:
"""
Base class for CIM classes that are to be parsed from CIM instance, providing
parse methods for static (rdf:ID) objects and supplementary (rdf:about)
information.
"""
Map = {}
_about_ref = None
ObjectName = None
_schema_class = None
@classmethod
def compile_map(cls, nsmap):
"""
Compile the XPath map for the parsing run
:param nsmap: The .xml nsmap
:return: None
"""
attribute_map = cls.Map
for key, element in cls.Map.items():
if key not in cls.__bases__[0].Map: # pylint: disable=no-member
attribute_map[key] = XPath(element, namespaces=nsmap)
cls.Map = attribute_map
@classmethod
def fields(cls):
"""
Print information about available fields in Class
:return: None
"""
print(f"Fields available for class {cls.__name__}")
[print(var) for var in vars(cls).keys() if not var.startswith("_")] # pylint: disable=expression-not-assigned
@classmethod
def describe(cls, fmt="psql"):
cls._schema_class.describe(fmt)
@classmethod
def to_html(cls, **kwargs):
return cls._schema_class.to_html(**kwargs)
PK ! e- - cimpyorm/Model/Schema.py#
# Copyright (c) 2018 - 2018 Thomas Offergeld (offergeld@ifht.rwth-aachen.de)
# Institute for High Voltage Technology
# RWTH Aachen University
#
# This module is part of cimpyorm.
#
# cimpyorm is licensed under the BSD-3-Clause license.
# For further information see LICENSE in the project's root directory.
#
import json
from argparse import Namespace
from collections import defaultdict
import lxml.etree as et
from lxml.etree import XPath
import networkx as nx
from networkx import DiGraph, bfs_tree
from networkx.exception import NetworkXNoPath
from sqlalchemy import Column, TEXT, Integer
from sqlalchemy.exc import InvalidRequestError
from cimpyorm.auxiliary import log, merge, HDict, merge_descriptions, find_rdfs_path
import cimpyorm.Model.auxiliary as aux
from cimpyorm.Model.Elements import CIMPackage, CIMClass, CIMProp, CIMDT, CIMEnum, CIMEnumValue, \
CIMDTUnit, CIMDTValue, CIMDTMultiplier, CIMDTDenominatorUnit, SchemaElement, CIMDTProperty, \
CIMDTDenominatorMultiplier
from cimpyorm.backends import InMemory
class Schema:
def __init__(self, session=None, version: str = "16"):
"""
Initialize a Backend object, containing information about the schema elements
:param file_or_tree: The schema file or a parsed root
"""
self.g = None
if not session:
backend = InMemory()
backend.reset()
session = backend.session
rdfs_path = find_rdfs_path(version)
if not rdfs_path:
raise FileNotFoundError("Failed to find schema file. Please provide one.")
tree = merge(rdfs_path)
log.info(f"Dynamic code generation.")
if session.query(SchemaElement).count():
# A schema is already present, so just load it instead of recreating
self.session = session
self.Element_classes = {c.__name__: c for c in
[CIMPackage, CIMClass, CIMProp, CIMDT, CIMEnum, CIMEnumValue, CIMDTUnit,
CIMDTValue, CIMDTMultiplier, CIMDTDenominatorUnit, CIMDTDenominatorMultiplier]}
self.Elements = {c.__name__: {cim_class.name: cim_class for cim_class in session.query(c).all()}
for c in self.Element_classes.values()}
else:
self.session = session
if isinstance(tree, type(et.ElementTree())):
self.file = None
self.root = tree.getroot()
else:
self.file = tree
self.root = et.parse(tree).getroot()
self.Element_classes = {c.__name__: c for c in
[CIMPackage, CIMClass, CIMProp, CIMDT, CIMEnum, CIMEnumValue, CIMDTUnit,
CIMDTValue, CIMDTMultiplier, CIMDTDenominatorUnit, CIMDTDenominatorMultiplier]}
self.Elements = {c.__name__: defaultdict(list) for c in self.Element_classes.values()}
self._init_parser()
self._generate()
for _, Cat_Elements in self.Elements.items():
self.session.add_all(list(Cat_Elements.values()))
self.session.commit()
log.debug(f"Backend generated")
session.add(SchemaInfo(self.root.nsmap))
self.init_model(session)
@property
def inheritance_graph(self):
"""
Determine the class inheritance hierarchy (class definition needs to adhere to strict inheritance hierarchy)
:param classes: dict of CIMClass objects
:return: g - A networkx DiGraph of the class hierarchy, with a common ancestor __root__
"""
# Determine class inheritance hierarchy (bfs on a directed graph)
g = DiGraph()
g.add_node("__root__")
class_list = list(self.session.query(CIMClass).all())
while class_list:
for element in class_list:
if element:
parent = element.parent
if not parent:
g.add_edge("__root__", element)
else:
g.add_edge(parent, element)
class_list.remove(element)
return g
def _init_parser(self):
SchemaElement.nsmap = HDict(self.root.nsmap)
for c in self.Element_classes.values():
c._generateXPathMap()
@staticmethod
def _isclass(type_res):
return type_res and type_res[0].endswith("#Class")
@staticmethod
def _isenum(stype_res):
return stype_res and stype_res[0].endswith("#enumeration")
@staticmethod
def _isdt(stype_txt):
return stype_txt and stype_txt[0] in ["CIMDatatype", "Primitive"]
@staticmethod
def _isprop(type_res):
return type_res and type_res[0].endswith("#Property")
@staticmethod
def _ispackage(type_res):
return type_res and type_res[0].endswith("#ClassCategory")
@property
def model(self):
for class_ in self.session.query(CIMClass).all():
class_.p = Namespace(**class_.all_props)
for enum_ in self.session.query(CIMEnum).all():
enum_.v = Namespace(**{value.label: value for value in enum_.values})
return Namespace(**{c.name: c.class_ for c in self.session.query(CIMClass).all()},
**{"dt": Namespace(**{c.name: c for c in self.session.query(CIMDT).all()})},
**{"classes": Namespace(**{c.name: c for c in self.session.query(CIMClass).all()})},
**{"enum": Namespace(**{c.name: c for c in self.session.query(CIMEnum).all()})})
def _generate(self):
xp_type_res = XPath(f"rdf:type/@rdf:resource", namespaces=self.root.nsmap)
xp_stype_res = XPath(f"cims:stereotype/@rdf:resource", namespaces=self.root.nsmap)
xp_stype_txt = XPath(f"cims:stereotype/text()", namespaces=self.root.nsmap)
postponed = []
for element in self.root:
type_res = xp_type_res(element)
stype_res = xp_stype_res(element)
stype_txt = xp_stype_txt(element)
if Schema._isclass(type_res):
if Schema._isenum(stype_res):
obj = CIMEnum(element)
self.Elements["CIMEnum"][obj.name].append(obj)
elif Schema._isdt(stype_txt):
obj = CIMDT(element)
self.Elements["CIMDT"][obj.name].append(obj)
else:
obj = CIMClass(element)
self.Elements["CIMClass"][obj.name].append(obj)
elif Schema._isprop(type_res):
postponed.append(element)
elif Schema._ispackage(type_res):
obj = CIMPackage(element)
self.Elements["CIMPackage"][obj.name].append(obj)
elif type_res:
postponed.append(element)
else:
obj = SchemaElement(element)
log.warning(f"Element skipped: {obj.name}")
for element in postponed:
type_res = xp_type_res(element)
if Schema._isprop(type_res):
obj = CIMProp(element)
if obj._domain in self.Elements["CIMDT"].keys():
if obj.name.endswith(".unit"):
obj = CIMDTUnit(element)
self.Elements["CIMDTUnit"][obj.name].append(obj)
elif obj.name.endswith(".value"):
obj = CIMDTValue(element)
self.Elements["CIMDTValue"][obj.name].append(obj)
elif obj.name.endswith(".multiplier"):
obj = CIMDTMultiplier(element)
self.Elements["CIMDTMultiplier"][obj.name].append(obj)
elif obj.name.endswith(".denominatorUnit"):
obj = CIMDTDenominatorUnit(element)
self.Elements["CIMDTDenominatorUnit"][obj.name].append(obj)
elif obj.name.endswith(".denominatorMultiplier"):
obj = CIMDTDenominatorMultiplier(element)
self.Elements["CIMDTDenominatorMultiplier"][obj.name].append(obj)
else:
obj = CIMDTProperty(element)
self.Elements["CIMDTProperty"][obj.name].append(obj)
else:
self.Elements["CIMProp"][obj.name].append(obj)
continue
obj = CIMEnumValue(element)
if obj._enum_name and obj._enum_name in self.Elements["CIMEnum"].keys():
self.Elements["CIMEnumValue"][obj.name].append(obj)
else:
log.debug(f"Failed to identify purpose for {type_res}")
self._merge_elements()
for key, value in self.Elements.items():
if value:
log.debug(f"Generated {len(value)} {key}.")
@property
def map(self):
if not self.g:
g = DiGraph()
classes = self.session.query(CIMClass).all()
enums = self.session.query(CIMEnum).all()
g.add_nodes_from(classes)
g.add_nodes_from(enums)
g.add_nodes_from(self.session.query(CIMProp).all())
for node in classes + enums:
try:
for prop in node.all_props.values():
if prop.range:
g.add_edge(node, prop.range, label=prop.label)
else:
g.add_edge(node, prop, label=prop.label)
except AttributeError:
pass
self.g = g
return self.g
def path(self, source, destination):
if source == destination:
return
try:
path = nx.shortest_path(self.map, source, destination)
except NetworkXNoPath:
log.error(f"No path between {source.name} and {destination.name}.")
return
way = []
for iter in range(1, len(path)):
way.append(self.map.edges[path[iter-1], path[iter]]["label"])
return way
def _merge_elements(self):
for Category, CatElements in self.Elements.items():
log.debug(f"Merging {Category}.")
for NodeName, NodeElements in CatElements.items():
CatElements[NodeName] = self.Element_classes[Category](
merge_descriptions([e.description for e in NodeElements]))
self.Elements[Category] = dict(CatElements)
def init_model(self, session):
g = self.inheritance_graph
additionalNodes = list(bfs_tree(g, "__root__"))
additionalNodes.remove("__root__")
hierarchy = additionalNodes
try:
for c in hierarchy:
c.init_type(aux.Base)
except InvalidRequestError:
pass
session.commit()
session.flush()
nsmap = session.query(SchemaInfo).one().nsmap
for c in hierarchy:
c.generate(nsmap)
log.info(f"Generated {len(hierarchy)} classes")
class SchemaInfo(aux.Base):
__tablename__ = "SchemaInfo"
namespaces = Column(TEXT)
id = Column(Integer, primary_key=True, autoincrement=True)
def __init__(self, nsmap):
"""
Initialize SchemaInfo object
:param source_file: Path to the file containing the model data
"""
self.namespaces = json.dumps(nsmap)
@property
def nsmap(self):
"""
Return the source's nsmap
:return: dict - The source's nsmap
"""
nsmap = json.loads(self.namespaces)
return nsmap
PK !
cimpyorm/Model/Source.py#
# Copyright (c) 2018 - 2018 Thomas Offergeld (offergeld@ifht.rwth-aachen.de)
# Institute for High Voltage Technology
# RWTH Aachen University
#
# This module is part of cimpyorm.
#
# cimpyorm is licensed under the BSD-3-Clause license.
# For further information see LICENSE in the project's root directory.
#
import json
import re
from collections import defaultdict
from pathlib import Path
from typing import Union
from functools import lru_cache
import lxml.etree as et
from sqlalchemy import Column, Integer, String, TEXT
from cimpyorm.auxiliary import HDict
import cimpyorm.Model.auxiliary as aux
class SourceInfo(aux.Base):
"""
Class for storing source metadata in the database
"""
__tablename__ = "SourceInfo"
id = Column(Integer, primary_key=True, autoincrement=True)
filename = Column(String(50))
uuid = Column(String(50))
FullModel = Column(TEXT)
namespaces = Column(TEXT)
def __init__(self, source_file):
"""
Initialize DataSource object
:param source_file: Path to the file containing the model data
"""
self.source = source_file
self._parse_meta()
def __repr__(self):
"""
Unique representation
:return: str
"""
fm = json.loads(self.FullModel)
str_ = f"source uuid: {self.uuid} | filename: {self.filename} | profiles: {fm['profile']}"
return str_
@property
def cim_version(self):
"""
Return the source's cim_version
:return: str - The source's cim version
"""
nsmap = HDict(json.loads(self.namespaces))
return _get_cimrdf_version(nsmap["cim"])
@property
@lru_cache()
def nsmap(self):
"""
Return the source's nsmap
:return: dict - The source's nsmap
"""
nsmap = HDict(json.loads(self.namespaces))
return nsmap
def _parse_meta(self):
try:
self.filename = Path(self.source).name
except TypeError:
self.filename = self.source.name
self.tree = et.parse(self.source)
root = self.tree.getroot()
nsmap = root.nsmap
uuid, metadata = self._generate_metadata()
self.uuid = uuid
self.FullModel = json.dumps(metadata)
self.namespaces = json.dumps(nsmap)
def _generate_metadata(self):
"""
Determine the data source's metadata (such as CIM version)
:return: (data source uuid, data source metadata)
"""
tree = self.tree
nsmap = tree.getroot().nsmap
source = tree.xpath("md:FullModel", namespaces=nsmap)[0]
uuid = source.xpath("@rdf:about", namespaces=nsmap)[0].split("urn:uuid:")[-1]
metadata = defaultdict(list)
for element in source:
entry = element.tag.split("Model.")[-1]
value = element.text if entry != "DependentOn" else element.attrib.values()[0].split("urn:uuid:")[-1]
if value not in metadata[entry]:
metadata[entry].append(value)
return uuid, metadata
def _get_cimrdf_version(cim_ns) -> Union[None, str]:
"""
Parse the cim namespace into a version number
:param cim_ns: cim namespace
:return: double, version number, or None if no version could be identified
"""
match = re.search(r"(?<=CIM-schema-cim)\d{0,2}?(?=#)", cim_ns)
if match:
return match.group()
else:
return None
PK ! wE= = cimpyorm/Model/__init__.py#
# Copyright (c) 2018 - 2018 Thomas Offergeld (offergeld@ifht.rwth-aachen.de)
# Institute for High Voltage Technology
# RWTH Aachen University
#
# This module is part of cimpyorm.
#
# cimpyorm is licensed under the BSD-3-Clause license.
# For further information see LICENSE in the project's root directory.
#
PK ! cimpyorm/Model/auxiliary.py#
# Copyright (c) 2018 - 2018 Thomas Offergeld (offergeld@ifht.rwth-aachen.de)
# Institute for High Voltage Technology
# RWTH Aachen University
#
# This module is part of cimpyorm.
#
# cimpyorm is licensed under the BSD-3-Clause license.
# For further information see LICENSE in the project's root directory.
#
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
PK ! &
cimpyorm/Parser.py#
# Copyright (c) 2018 - 2018 Thomas Offergeld (offergeld@ifht.rwth-aachen.de)
# Institute for High Voltage Technology
# RWTH Aachen University
#
# This module is part of cimpyorm.
#
# cimpyorm is licensed under the BSD-3-Clause license.
# For further information see LICENSE in the project's root directory.
#
from sys import stdout
from itertools import chain
from collections import defaultdict
from functools import lru_cache
from tqdm import tqdm
from cimpyorm.auxiliary import HDict, log, parseable_files, shorten_namespace
def get_files(dataset):
if isinstance(dataset, list):
files = chain(*[parseable_files(path) for path in dataset])
else:
files = parseable_files(dataset)
return files
def merge_sources(sources):
d_ = defaultdict(dict)
from lxml.etree import XPath
xp = {"id": XPath("@rdf:ID", namespaces=get_nsmap(sources)),
"about": XPath("@rdf:about", namespaces=get_nsmap(sources))}
for source in sources:
for element in source.tree.getroot():
try:
uuid = determine_uuid(element, xp)
classname = shorten_namespace(element.tag, HDict(get_nsmap(sources)))
if classname not in d_ or uuid not in d_[classname].keys():
d_[classname][uuid] = element
else:
[d_[classname][uuid].append(sub) for sub in element] # pylint: disable=expression-not-assigned
except ValueError:
log.warning(f"Skipped element during merge: {element}.")
return d_
def parse_entries(entries, schema):
classes = dict(schema.session.query(
schema.Element_classes["CIMClass"].name,
schema.Element_classes["CIMClass"]
).all())
created = []
for classname, elements in entries.items():
if classname in classes.keys():
for uuid, element in tqdm(elements.items(), desc=f"Reading {classname}", leave=False):
argmap, insertables = classes[classname].parse_values(element, schema.session)
created.append(classes[classname].class_(id="_"+uuid,
**argmap))
for insertable in insertables:
schema.session.execute(insertable)
else:
log.info(f"{classname} not implemented. Skipping.")
return created
def determine_uuid(element, xp):
uuid = None
try:
_id = xp["id"](element)[0]
if _id.startswith("_"):
_id = _id[1:]
uuid = _id
except IndexError:
pass
try:
about = xp["about"](element)[0].split("urn:uuid:")[-1].split("#_")[-1]
uuid = about
except IndexError:
pass
return uuid
@lru_cache()
def get_nsmap(sources: frozenset):
"""
Return the merged namespace map for a list of data sources
:param sources: frozenset of DataSource objects (so its hashable)
:return: dict, merged nsmap of all DataSource objects
"""
nsmaps = [source.nsmap for source in sources]
nsmaps = {k: v for d in nsmaps for k, v in d.items()}
return HDict(nsmaps)
def get_cim_version(sources):
"""
Return the (unambiguous) DataSource cim versions
:param sources: DataSources
:return:
"""
cim_versions = [source.cim_version for source in sources]
if len(set(cim_versions)) > 1:
log.error(f"Ambiguous cim_versions: {cim_versions}.")
return cim_versions[0]
PK ! - cimpyorm/Test/Integration/MariaDB/__init__.pyPK ! >
; cimpyorm/Test/Integration/MariaDB/test_integration_tests.pyimport cimpyorm.auxiliary
from cimpyorm.api import load, parse
import pytest
import cimpyorm
from cimpyorm.backends import MariaDB
def test_parse_load(full_grid):
try:
cimpyorm.auxiliary.get_path("SCHEMAROOT")
except KeyError:
pytest.skip(f"Schemata not configured")
path = "integration_test"
session, m = parse(full_grid, MariaDB(path=path, host="localhost"))
session.close()
session, m = load(MariaDB(path=path, host="localhost"))
session.close()
MariaDB(path=path, host="localhost").drop()
def test_parse_parse(full_grid):
try:
cimpyorm.auxiliary.get_path("SCHEMAROOT")
except KeyError:
pytest.skip(f"Schemata not configured")
path = "integration_test"
session, m = parse(full_grid, MariaDB(path=path, host="localhost"))
session.close()
session, m = parse(full_grid, MariaDB(path=path, host="localhost"))
assert session.query(m.Terminal).first().ConductingEquipment
session.close()
MariaDB(path=path, host="localhost").drop()
PK ! + cimpyorm/Test/Integration/MySQL/__init__.pyPK ! y 9 cimpyorm/Test/Integration/MySQL/test_integration_tests.pyimport cimpyorm.auxiliary
from cimpyorm.api import load, parse
import pytest
import cimpyorm
from cimpyorm.backends import MySQL
def test_parse_load(full_grid):
try:
cimpyorm.auxiliary.get_path("SCHEMAROOT")
except KeyError:
pytest.skip(f"Schemata not configured")
path = "integration_test"
session, m = parse(full_grid, MySQL(path=path, host="localhost"))
session.close()
session, m = load(MySQL(path=path, host="localhost"))
session.close()
MySQL(path=path, host="localhost").drop()
def test_parse_parse(full_grid):
try:
cimpyorm.auxiliary.get_path("SCHEMAROOT")
except KeyError:
pytest.skip(f"Schemata not configured")
path = "integration_test"
session, m = parse(full_grid, MySQL(path=path, host="localhost"))
session.close()
session, m = parse(full_grid, MySQL(path=path, host="localhost"))
assert session.query(m.Terminal).first().ConductingEquipment
session.close()
MySQL(path=path, host="localhost").drop()
PK ! , cimpyorm/Test/Integration/SQLite/__init__.pyPK ! " : cimpyorm/Test/Integration/SQLite/test_integration_tests.pyimport cimpyorm.auxiliary
from cimpyorm.api import load, parse
import os
import pytest
import cimpyorm
from cimpyorm.backends import SQLite, InMemory
def test_parse_inmemory(full_grid):
try:
cimpyorm.auxiliary.get_path("SCHEMAROOT")
except KeyError:
pytest.skip(f"Schemata not configured")
session, m = parse(full_grid, InMemory())
session.close()
def test_parse_load(full_grid):
try:
cimpyorm.auxiliary.get_path("SCHEMAROOT")
except KeyError:
pytest.skip(f"Schemata not configured")
path = os.path.join(full_grid, ".integration_test.db")
session, m = parse(full_grid, SQLite(path=path))
session.close()
session, m = load(path)
session.close()
os.remove(path)
def test_parse_parse(full_grid):
try:
cimpyorm.auxiliary.get_path("SCHEMAROOT")
except KeyError:
pytest.skip(f"Schemata not configured")
path = os.path.join(full_grid, ".integration_test.db")
session, m = parse(full_grid, SQLite(path=path))
session.close()
session, m = parse(full_grid, SQLite(path=path))
assert session.query(m.Terminal).first().ConductingEquipment
session.close()
os.remove(path)
PK ! % cimpyorm/Test/Integration/__init__.pyPK ! > > cimpyorm/Test/__init__.py#
# Copyright (c) 2018 - 2018 Thomas Offergeld (offergeld@ifht.rwth-aachen.de)
# Institute for High Voltage Technology
# RWTH Aachen University
#
# This module is part of cimpyorm.
#
# cimpyorm is licensed under the BSD-3-Clause license.
# For further information see LICENSE in the project's root directory.
#
PK ! (yaGU U cimpyorm/Test/conftest.pyimport pytest
import os
# Keep import for _CONFIGPATH - otherwise get_path fails because cimpyorm/__init__.py locals aren't present
from cimpyorm.auxiliary import log, get_path
@pytest.fixture(scope="session")
def full_grid():
try:
path = os.path.join(get_path("DATASETROOT"), "FullGrid")
except KeyError:
pytest.skip(f"Dataset path not configured")
if not os.path.isdir(path) or not os.listdir(path):
pytest.skip("Dataset 'FullGrid' not present.")
else:
return path
@pytest.fixture(scope="module")
def acquire_db():
import cimpyorm.backends
backend = cimpyorm.backends.SQLite()
engine = backend.engine
session = backend.session
return engine, session
@pytest.fixture(scope="session")
def load_test_db():
"""
Returns a session and a model for a database that's only supposed to be read from
:return: session, m
"""
from cimpyorm.api import load
path = os.path.join(get_path("DATASETROOT"), "FullGrid", "StaticTest.db")
if not os.path.isfile(path):
pytest.skip("StaticTest.db not present.")
session, m = load(path)
return session, m
@pytest.fixture(scope="session")
def dummy_source():
try:
path = os.path.join(get_path("DATASETROOT"), "FullGrid", "20171002T0930Z_BE_EQ_4.xml")
except KeyError:
pytest.skip(f"Dataset path not configured")
if not os.path.isfile(path):
pytest.skip("Dataset 'FullGrid' not present.")
from cimpyorm.Model.Source import SourceInfo
ds = SourceInfo(source_file=path)
return ds
@pytest.fixture(scope="session")
def dummy_nsmap():
from cimpyorm.auxiliary import HDict
nsmap = HDict({'cim': 'http://iec.ch/TC57/2013/CIM-schema-cim16#',
'entsoe': 'http://entsoe.eu/CIM/SchemaExtension/3/1#',
'md': 'http://iec.ch/TC57/61970-552/ModelDescription/1#',
'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#'})
return nsmap
@pytest.fixture(scope="session")
def cgmes_schema():
from cimpyorm.Model.Schema import Schema
schema = Schema(version="16")
return schema
PK ! $ " cimpyorm/Test/test_aux_and_misc.pyfrom cimpyorm.auxiliary import shorten_namespace
def test_get_class_names_cim(dummy_nsmap):
assert shorten_namespace(
frozenset(['{http://iec.ch/TC57/2013/CIM-schema-cim16#}StaticVarCompensator']),
dummy_nsmap) == ["StaticVarCompensator"]
def test_get_class_names_md(dummy_nsmap):
assert shorten_namespace(
frozenset(["{http://iec.ch/TC57/61970-552/ModelDescription/1#}FullModel"]),
dummy_nsmap) == ["md_FullModel"]
def test_get_class_names_entsoe(dummy_nsmap):
assert shorten_namespace(
frozenset(['{http://entsoe.eu/CIM/SchemaExtension/3/1#}EnergySchedulingType']),
dummy_nsmap) == ["entsoe_EnergySchedulingType"]
PK ! Q $ cimpyorm/Test/test_data_integrity.pydef test_num_of_elements(load_test_db):
session, m = load_test_db
assert session.query(m.Terminal).count() == 144
def test_native_properties(load_test_db):
session, m = load_test_db
assert isinstance(session.query(m.ACDCConverter.idleLoss).filter(
m.ACDCConverter.id == "_0f05e270-37ea-471d-89fe-aee8a55b932b"
).one()[0], float)
assert session.query(m.ACDCConverter.idleLoss).filter(
m.ACDCConverter.id == "_0f05e270-37ea-471d-89fe-aee8a55b932b"
).one() == (1.0,)
def test_inherited_properties(load_test_db):
session, m = load_test_db
assert session.query(m.Terminal.name).filter(
m.Terminal.id == "_800ada75-8c8c-4568-aec5-20f799e45f3c"
).one() == ("BE-Busbar_2_Busbar_Section",)
def test_relationship(load_test_db):
session, m = load_test_db
assert isinstance(session.query(m.Terminal).filter(
m.Terminal.id == "_800ada75-8c8c-4568-aec5-20f799e45f3c"
).one().ConnectivityNode, m.ConnectivityNode)
def test_alter_data(load_test_db):
session, m = load_test_db
obj = session.query(m.IdentifiedObject).first()
obj.entsoe_energyIdentCodeEic = "YetAnotherCode"
session.commit()
assert session.query(m.IdentifiedObject).first().entsoe_energyIdentCodeEic == "YetAnotherCode"
PK ! x[۠6 6 % cimpyorm/Test/test_loadRDFS_pytest.pyimport os
import pytest
from cimpyorm.auxiliary import get_path, find_rdfs_path
@pytest.mark.parametrize("Version", [(16)])
def test_find_valid_rdfs_version(Version):
try:
os.path.isdir(get_path("SCHEMAROOT"))
except KeyError:
pytest.skip(f"Schema folder not configured")
version = f"{Version}"
rdfs_path = find_rdfs_path(version)
assert os.path.isdir(rdfs_path) and os.listdir(rdfs_path)
@pytest.mark.parametrize("Version", [(9), (153), ("foo"), ("ba")])
def test_find_invalid_rdfs_version(Version):
try:
os.path.isdir(get_path("SCHEMAROOT"))
except KeyError:
pytest.skip(f"Schema folder not configured")
with pytest.raises((ValueError, NotImplementedError)) as ex_info:
version = f"{Version}"
find_rdfs_path(version)
print(ex_info)
PK ! cimpyorm/Test/test_metadata.pydef test_parse_meta(acquire_db, dummy_source):
_, session = acquire_db
assert dummy_source.tree
assert dummy_source.nsmap == {'cim': 'http://iec.ch/TC57/2013/CIM-schema-cim16#',
'entsoe': 'http://entsoe.eu/CIM/SchemaExtension/3/1#',
'md': 'http://iec.ch/TC57/61970-552/ModelDescription/1#',
'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#'}
assert dummy_source.cim_version == "16"
PK ! 'yw/ cimpyorm/Test/test_parser.pyimport pytest
def test_single_object(cgmes_schema):
import lxml.etree as et
ACL = cgmes_schema.model.classes.ACLineSegment
literal = '' \
'' \
' ' \
' BE-Line_1' \
' ' \
' 2.200000' \
' 68.200000' \
' 0.0000829380' \
' 22.000000' \
' 0.0000308000' \
' false' \
' ' \
' 6.600000' \
' 204.600000' \
' 0.0000262637' \
' 0.0000308000' \
' 160.0000000000' \
' BE-L_1' \
' 10T-AT-DE-000061' \
' 10T-AT-DE-000061' \
' 17086487-56ba-4979-b8de-064025a6b4da' \
' ' \
''
map = {'mRID': '17086487-56ba-4979-b8de-064025a6b4da',
'name': 'BE-Line_1',
'description': '10T-AT-DE-000061',
'entsoe_energyIdentCodeEic': '10T-AT-DE-000061',
'entsoe_shortName': 'BE-L_1',
'EquipmentContainer_id': '_2b659afe-2ac3-425c-9418-3383e09b4b39',
'aggregate': False,
'BaseVoltage_id': '_7891a026ba2c42098556665efd13ba94',
'length': 22.0,
'bch': 8.2938e-05,
'gch': 3.08e-05,
'r': 2.2,
'x': 68.2,
'b0ch': 2.62637e-05,
'g0ch': 3.08e-05,
'r0': 6.6,
'shortCircuitEndTemperature': 160.0, 'x0': 204.6}
assert ACL.parse_values(et.fromstring(literal.encode("UTF-8"))[0], cgmes_schema.session)[0] == map
one_node = \
''\
''\
'TOP_NET_1'\
''\
''\
'7f28263d-4f21-c942-be2e-3c6b8d54c546'\
''\
''
multi_node = \
''\
''\
'TOP_NET_1'\
''\
''\
''\
''\
''\
''\
''\
''\
''\
''\
''\
''\
''\
''\
''\
''\
''\
''\
''\
''\
''\
'7f28263d-4f21-c942-be2e-3c6b8d54c546'\
''\
''
@pytest.mark.parametrize("literal", [one_node, multi_node], ids=["single_property_node", "multiple_property_nodes"])
def test_m2m_rel(cgmes_schema, literal):
import lxml.etree as et
TI = cgmes_schema.model.classes.TopologicalIsland
insertable = TI.parse_values(et.fromstring(literal.encode("UTF-8"))[0], cgmes_schema.session)[1][0]
values = insertable.parameters
assert "TopologicalIsland_id" in values[0].keys()
assert "TopologicalNode_id" in values[0].keys()
assert "_f6ee76f7-3d28-6740-aa78-f0bf7176cdad" in [value["TopologicalNode_id"] for value in values]
assert len(values) == 20PK ! h h cimpyorm/Test/test_schema.pyfrom cimpyorm.Model.Elements import CIMClass
def test_persisted_classes(cgmes_schema):
schema = cgmes_schema
# Make sure we have all CIMClasses
assert len(schema.Elements["CIMClass"]) == 397
assert schema.Elements["CIMClass"]["ACLineSegment"] is \
schema.session.query(CIMClass).filter(CIMClass.name == "ACLineSegment").one()
def test_summary(cgmes_schema):
schema = cgmes_schema
assert schema.model.classes.ACLineSegment.property_table().shape == (27, 8)
def test_description_CIMClass(cgmes_schema):
from cimpyorm import describe
describe(cgmes_schema.model.classes.TopologicalNode)
cgmes_schema.model.classes.TopologicalNode.describe()
def test_description_parseable(cgmes_schema):
from cimpyorm import describe
describe(cgmes_schema.model.TopologicalNode)
cgmes_schema.model.TopologicalNode.describe()
PK ! SF F cimpyorm/Test/test_xml_merge.pyimport pytest
import lxml.etree as et
import os
from cimpyorm.auxiliary import log, get_path, parseable_files, merge
schemata = []
datasets = []
try:
SCHEMAROOT = get_path("SCHEMAROOT")
if os.path.isdir(SCHEMAROOT) and os.listdir(SCHEMAROOT):
schemata = [os.path.join(SCHEMAROOT, f"CIM{version}")
for version in [16]]
except KeyError:
pass
try:
DATASETROOT = get_path("DATASETROOT")
if os.path.isdir(os.path.join(DATASETROOT)) and os.listdir(os.path.join(DATASETROOT)):
datasets = [os.path.join(DATASETROOT, dir_) for dir_ in os.listdir(os.path.join(DATASETROOT))
if os.path.isdir(os.path.join(DATASETROOT, dir_))]
except KeyError:
pass
tested_directories = schemata + datasets
@pytest.mark.parametrize("path", tested_directories)
def test_count_merged_elements(path):
"""
Make sure no information is lost during XMLMerge (Equal number of elements).
:param path: Path to folder containing xml files.
:return:
"""
files = os.listdir(path)
files = [os.path.join(path, file) for file in files if
file.endswith(".xml") or file.endswith(".rdf")]
elements = 0
for xmlfile in files:
elements += len(et.parse(xmlfile).getroot())
tree = merge(path)
assert len(tree.getroot()) == elements
@pytest.mark.parametrize("path", tested_directories)
def test_count_properties(path):
files = os.listdir(path)
files = [os.path.join(path, file) for file in files if
file.endswith(".xml") or file.endswith(".rdf")]
properties_unmerged = 0
for xmlfile in files:
for node in et.parse(xmlfile).getroot():
properties_unmerged += len(node)
tree = merge(path)
properties_merged = sum(len(node) for node in tree.getroot())
assert properties_merged == properties_unmerged
@pytest.mark.parametrize("path", tested_directories)
def test_merged_nsmaps(path):
expected = {}
for file in parseable_files(path):
for key, value in et.parse(file).getroot().nsmap.items():
expected[key] = value
tree = merge(path)
log.info(f"{len(expected.keys())} entries expected in nsmap. {len(tree.getroot().nsmap.keys())} found")
log.debug(f"Expected: {expected.keys()}")
log.debug(f"Found: {tree.getroot().nsmap.keys()}")
assert tree.getroot().nsmap == expected
PK ! y]} } cimpyorm/__init__.py#
# Copyright (c) 2018 - 2018 Thomas Offergeld (offergeld@ifht.rwth-aachen.de)
# Institute for High Voltage Technology
# RWTH Aachen University
#
# This module is part of cimpyorm.
#
# cimpyorm is licensed under the BSD-3-Clause license.
# For further information see LICENSE in the project's root directory.
#
"""
cimpyorm creates ORM representations of CIM datasets.
This module sets up and provides configuration and imports.
"""
# pylint: disable=ungrouped-imports
import os
from cimpyorm.auxiliary import log, get_path, _CONFIGPATH, CONFIG, _TESTROOT, _PACKAGEROOT
if not os.path.isfile(_CONFIGPATH):
with open(_CONFIGPATH, "w+") as f:
# Update config.ini
CONFIG.write(f)
try:
import pytest
def test_all(runslow=False):
if runslow:
pytest.main([os.path.join(_TESTROOT), "--runslow"])
else:
pytest.main([os.path.join(_TESTROOT)])
except ModuleNotFoundError:
pass
try:
# See if we already know a schemaroot
CONFIG["Paths"]["SCHEMAROOT"] = get_path("SCHEMAROOT")
if not os.path.isdir(CONFIG["Paths"]["SCHEMAROOT"]):
# Is schemaroot an actual directory?
log.warning(f"Invalid schema path in configuration.")
raise NotADirectoryError
except (KeyError, NotADirectoryError):
if os.path.isdir(os.path.join(_PACKAGEROOT, "res", "schemata")):
# Look in the default path
CONFIG["Paths"]["SCHEMAROOT"] = os.path.join(_PACKAGEROOT, "res", "schemata")
log.info(f"Found schemata in default location.")
else:
# Ask user to configure
log.warning(f"No schemata configured. Use cimpyorm.configure(path_to_schemata) to set-up.")
from cimpyorm.api import configure
try:
# See if we already know a datasetroot
CONFIG["Paths"]["DATASETROOT"] = get_path("DATASETROOT")
if not os.path.isdir(CONFIG["Paths"]["DATASETROOT"]):
# Is datasetroot an actual directory?
log.warning(f"Invalid dataset path in configuration.")
raise NotADirectoryError
except (KeyError, NotADirectoryError):
if os.path.isdir(os.path.join(_PACKAGEROOT, "res", "datasets")):
# Look in the default path
CONFIG["Paths"]["DATASETROOT"] = os.path.join(_PACKAGEROOT, "res", "datasets")
log.info(f"Found datasets in default location.")
else:
# Ask user to configure
log.info(f"No datasets configured. Use cimpyorm.configure(path_to_datasets) to set-up.")
from cimpyorm.api import configure
with open(_CONFIGPATH, "w+") as f:
# Update config.ini
CONFIG.write(f)
def describe(element, fmt="psql"):
element.describe(fmt)
try:
from cimpyorm.api import parse, load, describe # pylint: disable=wrong-import-position
from cimpyorm.Model.Schema import Schema # pylint: disable=wrong-import-position
except ModuleNotFoundError:
log.warning(f"Unfulfilled requirements. parse and load are not available.")
PK ! /TE cimpyorm/api.py#
# Copyright (c) 2018 - 2018 Thomas Offergeld (offergeld@ifht.rwth-aachen.de)
# Institute for High Voltage Technology
# RWTH Aachen University
#
# This module is part of cimpyorm.
#
# cimpyorm is licensed under the BSD-3-Clause license.
# For further information see LICENSE in the project's root directory.
#
import os
from pathlib import Path
import configparser
from typing import Union, Tuple
from argparse import Namespace
from sqlalchemy.orm.session import Session
from cimpyorm.auxiliary import log, get_path
from cimpyorm.Model.Schema import Schema
from cimpyorm.backends import SQLite, Engine, InMemory
def configure(schemata: Union[Path, str] = None, datasets: Union[Path, str] = None):
"""
Configure paths to schemata or update the DATASETROOT used for tests.
:param schemata: Path to a folder containing CIM schema descriptions.
:param datasets: Path to a folder containing test datasets.
"""
config = configparser.ConfigParser()
config.read(get_path("CONFIGPATH"))
if schemata:
config["Paths"]["SCHEMAROOT"] = os.path.abspath(schemata)
if datasets:
config["Paths"]["DATASETROOT"] = os.path.abspath(datasets)
with open(get_path("CONFIGPATH"), 'w') as configfile:
config.write(configfile)
def load(path_to_db: Union[Engine, str], echo: bool = False) -> Tuple[Session, Namespace]:
"""
Load an already parsed database from disk or connect to a server and yield a database session to start querying on
with the classes defined in the model namespace.
Afterwards, the database can be queried using SQLAlchemy query syntax, providing the CIM classes contained in the
:class:`~argparse.Namespace` return value.
:param path_to_db: Path to the cim snapshot or a :class:`~cimpyorm.backend.Engine`.
:param echo: Echo the SQL sent to the backend engine (SQLAlchemy option).
:return: :class:`sqlalchemy.orm.session.Session`, :class:`argparse.Namespace`
"""
import cimpyorm.Model.Schema as Schema
from cimpyorm.Model import Source
if isinstance(path_to_db, Engine):
_backend = path_to_db
_backend.echo = _backend.echo or echo
elif os.path.isfile(path_to_db):
_backend = SQLite(path_to_db, echo)
else:
raise NotImplementedError(f"Unable to connect to database {path_to_db}")
session = _backend.session
_backend.reset()
_si = session.query(Source.SourceInfo).first()
v = _si.cim_version
log.info(f"CIM Version {v}")
schema = Schema.Schema(session)
schema.init_model(session)
model = schema.model
return session, model
def parse(dataset: Union[str, Path], backend: Engine = SQLite()) -> Tuple[Session, Namespace]:
"""
Parse a database into a database backend and yield a database session to start querying on with the classes defined
in the model namespace.
Afterwards, the database can be queried using SQLAlchemy query syntax, providing the CIM classes contained in the
:class:`~argparse.Namespace` return value.
:param dataset: Path to the cim snapshot.
:param backend: Database backend to be used (defaults to a SQLite on-disk database in the dataset location).
:return: :class:`sqlalchemy.orm.session.Session`, :class:`argparse.Namespace`
"""
from cimpyorm import Parser
backend.update_path(dataset)
# Reset database
backend.drop()
backend.reset()
# And connect
engine, session = backend.connect()
files = Parser.get_files(dataset)
from cimpyorm.Model.Source import SourceInfo
sources = frozenset([SourceInfo(file) for file in files])
session.add_all(sources)
session.commit()
cim_version = Parser.get_cim_version(sources)
schema = Schema(version=cim_version, session=session)
backend.generate_tables(schema)
log.info(f"Parsing data.")
entries = Parser.merge_sources(sources)
elements = Parser.parse_entries(entries, schema)
log.info(f"Passing {len(elements):,} objects to database.")
session.bulk_save_objects(elements)
session.flush()
log.debug(f"Start commit.")
session.commit()
log.debug(f"Finished commit.")
if engine.dialect.name == "mysql":
log.debug("Enabling foreign key checks in mysql database.")
session.execute("SET foreign_key_checks='ON'")
log.info("Exit.")
model = schema.model
return session, model
def docker_parse() -> None:
"""
Dummy function for parsing in shared docker tmp directory.
"""
parse(r"/tmp")
def describe(element, fmt: str = "psql") -> None:
"""
Give a description of an object.
:param element: The element to describe.
:param fmt: Format string for tabulate package.
"""
try:
element.describe(fmt)
except AttributeError:
print(f"Element of type {type(element)} doesn't provide descriptions.")
if __name__ == "__main__":
root = get_path("DATASETROOT")
# db_session, m = parse([os.path.abspath(os.path.join(root, folder)) for folder in os.listdir(root) if
# os.path.isdir(os.path.join(root, folder)) or
# os.path.join(root, folder).endswith(".zip")])
db_session, m = parse(os.path.join(get_path("DATASETROOT"), "FullGrid"), InMemory())
print(db_session.query(m.IdentifiedObject).first().name) # pylint: disable=no-member
db_session.close()
PK ! / cimpyorm/auxiliary.py#
# Copyright (c) 2018 - 2018 Thomas Offergeld (offergeld@ifht.rwth-aachen.de)
# Institute for High Voltage Technology
# RWTH Aachen University
#
# This module is part of cimpyorm.
#
# cimpyorm is licensed under the BSD-3-Clause license.
# For further information see LICENSE in the project's root directory.
#
import os
from functools import lru_cache
from typing import Collection, Iterable
from pathlib import Path
import configparser
import logging
from zipfile import ZipFile
from itertools import chain
class HDict(dict):
"""Provide a hashable dict for use as cache key"""
def __hash__(self):
return hash(frozenset(self.items()))
def chunks(l: Collection, n: int) -> Iterable:
"""
Iteratively yield from an iterable at most n elements.
:param l: The iterable to yield from.
:param n: The maximum number of elements
:return: Yield elements from the iterable.
"""
for i in range(0, len(l), n):
yield l[i:i+n]
class CustomFormatter(logging.Formatter):
"""
Elapsed time logging formatter.
"""
def formatTime(self, record, datefmt=None):
return f"{round(record.relativeCreated/1000)}." \
f"{round(record.relativeCreated%1000)}"
log = logging.getLogger("cim_orm")
if not log.handlers:
log.setLevel(logging.INFO)
handler = logging.StreamHandler()
log.addHandler(handler)
formatter = CustomFormatter(fmt='T+%(asctime)10ss:%(levelname)8s: %(message)s')
handler.setFormatter(formatter)
log.debug("Logger configured.")
CONFIG = configparser.ConfigParser()
# Set default paths
CONFIG["Paths"] = {"PACKAGEROOT": Path(os.path.abspath(__file__)).parent,
"TESTROOT": os.path.join(Path(os.path.abspath(__file__)).parent, "Test"),
"CONFIGPATH": os.path.join(Path(os.path.abspath(__file__)).parent, "config.ini")}
_TESTROOT = CONFIG["Paths"]["TESTROOT"]
_PACKAGEROOT = CONFIG["Paths"]["PACKAGEROOT"]
_CONFIGPATH = CONFIG["Paths"]["CONFIGPATH"]
def get_path(identifier: str) -> str:
"""
Get the requested path from the package config.
:param identifier: Path-type identifier.
:return:
"""
config = configparser.ConfigParser()
config.read(_CONFIGPATH)
return config["Paths"][identifier]
def merge(source_path):
"""
Merges several ElementTrees into one.
:return: Merged Elementtree
"""
from lxml import etree as et
path = source_path
files = parseable_files(path)
base = et.parse(files[0])
root = base.getroot()
nsmap = root.nsmap
for file in files[1:]:
tree = et.parse(file)
for key, value in tree.getroot().nsmap.items():
if key in nsmap and value != nsmap[key]:
log.error("Incompatible namespaces in schema files")
nsmap[key] = value
for child in tree.getroot():
root.append(child)
tree = et.ElementTree(root)
et.cleanup_namespaces(tree, top_nsmap=nsmap, keep_ns_prefixes=nsmap.keys())
return tree
def parseable_files(path):
"""
Identify the parseable files within a directory (.xml/.rdf)
:param path: path to the directory
:return: list of files
"""
if path.endswith(".rdf") or path.endswith(".xml"):
files = [path]
elif path.endswith(".zip"):
dir_ = ZipFile(path, "r")
files = [dir_.open(name) for name in dir_.namelist() if name.endswith(
".xml") or name.endswith(".rdf")]
else:
files = os.listdir(os.path.abspath(path))
files = [os.path.join(path, file) for file in files if
file.endswith(".xml") or file.endswith(".rdf")]
if not files:
# There are no xml files in the folder - assume the first .zip
# is the zipped CIM
files = [os.path.join(path, file) for file in os.listdir(path) if
file.endswith(".zip") or file.endswith(".rdf")]
dir_ = ZipFile(files[0])
files = [dir_.open(name) for name in dir_.namelist() if name.endswith(
".xml") or name.endswith(".rdf")]
return files
@lru_cache()
def shorten_namespace(elements, nsmap):
"""
Map a list of XML tag class names on the internal classes (e.g. with shortened namespaces)
:param classes: list of XML tags
:param nsmap: XML nsmap
:return: List of mapped names
"""
names = []
_islist = True
if not isinstance(elements, (list, frozenset)):
elements = [elements]
_islist = False
for el in elements:
for key, value in nsmap.items():
if value in el:
if key == "cim":
names.append(el.split(value[-1]+"}")[-1])
else:
names.append(el.replace("{"+value+"}", key+"_"))
if el.startswith("#"):
names.append(el.split("#")[-1])
if not _islist and len(names) == 1:
names = names[0]
return names
def merge_descriptions(descriptions):
"""
Returns the descriptions for a CIM class merged into only one description
:param descriptions: Iterable of the descriptions
:return: Result of the merge
"""
if isinstance(descriptions, list):
description = descriptions[0]
# pylint: disable=expression-not-assigned
[description.append(value) for value in list(chain(*[list(descr) for descr in descriptions]))]
else:
description = descriptions
return description
def find_rdfs_path(version):
"""
Attempt to identify which schema to use from the model file header.
:param version: The CIM version.
:return: Path to the schema files on local file system
"""
if version:
log.info(f"Using CIM Version {version}.")
else:
raise ValueError(f"Failed to determine CIM Version")
if len(version) > 2:
raise ValueError(f"Unexpected CIM Version (v={version}).")
try:
rdfs_path = os.path.join(get_path("SCHEMAROOT"), f"CIM{version}")
except KeyError:
log.critical(f"Schema not defined.")
raise RuntimeError(f"Couldn't find CIM schemata. "
f"Please configure a schema repository using cimpyorm.configure.")
if not os.path.isdir(rdfs_path):
raise NotImplementedError(f"Unknown CIM Version for (v={version}). Add to "
f"schemata")
return rdfs_path
PK ! a6$ $ cimpyorm/backends.pyimport os
from importlib import reload
from abc import ABC
import sqlalchemy as sa
from sqlalchemy.engine import Engine as SA_Engine
from sqlalchemy.orm.session import Session as SA_Session
from sqlalchemy.exc import OperationalError, InternalError
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from networkx import bfs_tree
# pylint: disable=too-many-arguments
import cimpyorm.Model.auxiliary as aux
from cimpyorm.auxiliary import log
class Engine(ABC):
def __init__(self, dialect=None, echo=False, driver=None, path=None):
self.dialect = dialect
self.echo = echo
self.driver = driver
self.path = path
self._engine = None
@property
def engine(self) -> SA_Engine:
"""
:param echo:
:param database:
:return:
"""
if not self._engine:
log.info(f"Database: {self.path}")
engine = self._connect_engine()
self._engine = engine
return self._engine
@property
def session(self) -> SA_Session:
Session = sessionmaker(bind=self.engine)
session = Session()
return session
def connect(self):
return self.engine, self.session
def update_path(self, path):
pass
def _prefix(self):
if self.driver:
return f"{self.dialect}+{self.driver}"
else:
return f"{self.dialect}"
def _connect_engine(self):
raise NotImplementedError
def drop(self):
raise NotImplementedError
def reset(self) -> None:
"""
Reset the table metadata for declarative classes.
:param engine: A sqlalchemy db-engine to reset
:return: None
"""
import cimpyorm.Model.Elements as Elements
import cimpyorm.Model.Schema as Schema
import cimpyorm.Model.Source as Source
aux.Base = declarative_base(self.engine)
reload(Source)
reload(Elements)
reload(Schema)
Source.SourceInfo.metadata.create_all(self.engine)
Elements.SchemaElement.metadata.create_all(self.engine)
Schema.SchemaInfo.metadata.create_all(self.engine)
def generate_tables(self, schema):
g = schema.inheritance_graph
hierarchy = list(bfs_tree(g, "__root__"))
hierarchy.remove("__root__")
log.info(f"Creating map prefixes.")
for c in hierarchy:
c.class_.compile_map(c.nsmap)
# ToDo: create_all is quite slow, maybe this can be sped up. Currently low priority.
log.info(f"Creating table metadata.")
for child in g["__root__"]:
child.class_.metadata.create_all(self.engine)
log.info(f"Backend model ready.")
class SQLite(Engine):
def __init__(self, path="out.db", echo=False, driver=None, dataset_loc=None):
"""
Default constructor for SQLite backend instance
:param path: Storage location for the .db-file (default: "out.db" in cwd)
:param echo: SQLAlchemy "echo" parameter (default: False)
:param driver: Python SQLite driver (default: sqlite3)
:param dataset_loc: Dataset location used to automatically determine storage location (in the dataset folder)
"""
self.dialect = "sqlite"
super().__init__(self.dialect, echo, driver, path)
def drop(self):
try:
os.remove(self.path)
log.info(f"Removed old database {self.path}.")
self._engine = None
except FileNotFoundError:
pass
@property
def engine(self):
return super().engine
def update_path(self, path):
if path is None:
out_dir = os.getcwd()
elif isinstance(path, list):
try:
out_dir = os.path.commonpath([os.path.abspath(path) for path in path])
except ValueError:
# Paths are on different drives - default to cwd.
log.warning(f"Datasources have no common root. Database file will be saved to {os.getcwd()}")
out_dir = os.getcwd()
else:
out_dir = os.path.abspath(path)
if not os.path.isabs(self.path):
if os.path.isdir(out_dir):
db_path = os.path.join(out_dir, self.path)
else:
db_path = os.path.join(os.path.dirname(out_dir), "out.db")
else:
db_path = os.path.abspath(self.path)
self.path = db_path
def _connect_engine(self):
# ToDo: Disabling same_thread check is only treating the symptoms, however, without it, property changes
# can't be committed
return sa.create_engine(f"{self._prefix()}:///{self.path}",
echo=self.echo, connect_args={"check_same_thread": False})
class InMemory(Engine):
def __init__(self, echo=False, driver=None):
"""
Default constructor for In-Memory-SQLite instances
:param echo: SQLAlchemy "echo" parameter (default: False)
:param driver: Python SQLite driver (default: sqlite3)
"""
self.dialect = "sqlite"
super().__init__(self.dialect, echo, driver)
def drop(self):
log.info(f"Removed old database {self.path}.")
self._engine = None
def _connect_engine(self):
# ToDo: Disabling same_thread check is only treating the symptoms, however, without it, property changes
# can't be committed
return sa.create_engine(f"{self._prefix()}:///:memory:",
echo=self.echo, connect_args={"check_same_thread": False})
class ClientServer(Engine):
def __init__(self, username=None, password=None, driver=None,
host=None, port=None, path=None, echo=False):
super().__init__(None, echo, driver, path)
self.username = username
self.password = password
self.hostname = host
self.port = port
@property
def remote_path(self):
if self.path:
return f"{self.host}/{self.path}"
else:
return self.host
@property
def host(self):
return f"{self.hostname}:{self.port}"
def drop(self):
try:
log.info(f"Dropping database {self.path} at {self.host}.")
self.engine.execute(f"DROP DATABASE {self.path};")
except OperationalError:
pass
self._engine = None
def _credentials(self):
return f"{self.username}:{self.password}"
def _connect_engine(self):
engine = sa.create_engine(
f"{self._prefix()}://{self._credentials()}@{self.remote_path}", echo=self.echo)
try:
engine.connect()
# Pymysql error is raised as InternalError
except (OperationalError, InternalError):
engine = sa.create_engine(
f"{self._prefix()}://{self._credentials()}@{self.host}", echo=self.echo)
engine.execute(f"CREATE SCHEMA {self.path} DEFAULT CHARACTER SET utf8 COLLATE "
f"utf8_bin;")
engine = sa.create_engine(
f"{self._prefix()}://{self._credentials()}@{self.remote_path}", echo=self.echo)
return engine
class MariaDB(ClientServer):
def __init__(self, username="root", password="", driver="pymysql",
host="127.0.0.1", port=3306, path="cim", echo=False):
"""
Default constructor for MariaDB backend instance
:param username: Username for the MariaDB database (default: root)
:param password: Password for username (at) MariaDB database (default: "")
:param driver: Python MariaDB driver (default: mysqlclient)
:param host: Database host (default: localhost)
:param port: Database port (default: 3306)
:param path: Database name (default: "cim")
:param echo: SQLAlchemy "echo" parameter (default: False)
"""
super().__init__(username, password, driver, host,
port, path, echo)
self.dialect = "mysql"
@property
def session(self):
session = super().session
log.debug("Deferring foreign key checks in mysql database.")
session.execute("SET foreign_key_checks='OFF'")
return session
class MySQL(ClientServer):
def __init__(self, username="root", password="", driver="pymysql",
host="127.0.0.1", port=3306, path="cim", echo=False):
"""
Default constructor for MySQL backend instance
:param username: Username for the MySQL database (default: root)
:param password: Password for username (at) MySQL database (default: "")
:param driver: Python MariaDB driver (default: pymysql)
:param host: Database host (default: localhost)
:param port: Database port (default: 3306)
:param path: Database name (default: "cim")
:param echo: SQLAlchemy "echo" parameter (default: False)
"""
super().__init__(username, password, driver, host,
port, path, echo)
self.dialect = "mysql"
@property
def session(self):
session = super().session
log.debug("Deferring foreign key checks in mysql database.")
session.execute("SET foreign_key_checks='OFF'")
return session
PK ! &Pq q cimpyorm/res/LICENSEThe content of the "datasets" and "schemata" folder is owned and distributed by
ENTSO-E and is included for the users' convenience. At present, the files are
available for download for the ENTSO-E's CGMES website.
It should be noted, that the test cases should always be considered as such and
come with the limitations described in the documentation available online.PK ! :* = cimpyorm/res/datasets/FullGrid/20171002T0930Z_1D_BE_SSH_4.xml
2015-12-15T10:10:10
2017-10-02T09:30:00Z
4
CGMES Conformity Assessment: FullGridTestConfiguration (Node Breaker MAS BE with Short Circuit). The model is owned by ENTSO-E and is provided by ENTSO-E “as it is”. To the fullest extent permitted by law, ENTSO-E shall not be liable for any damages of any kind arising out of the use of the model (including any of its subsequent modifications). ENTSO-E neither warrants, nor represents that the use of the model will not infringe the rights of third parties. Any use of the model shall include a reference to ENTSO-E. ENTSO-E web site is the only official source of information related to the model.
http://elia.be/CAS2.0/FullGridTestConfiguration
http://entsoe.eu/CIM/SteadyStateHypothesis/1/1
true
true
true
true
true
true
true
true
0.99
0.99
false
false
false
false
false
false
false
false
false
false
false
false
false
false
false
false
false
false
false
false
false
false
0.01
0.01
99.99
9.99
0e+000
0e+000
0e+000
150.000000
0e+000
0e+000
0e+000
0e+000
0e+000
0e+000
0e+000
15.000000
0e+000
500.000000
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
false
200.000000
90.000000
200.000000
50.000000
1.000000
0e+000
9.99
0.99
-46.816625
79.193778
false
0.0
-43.687227
84.876604
false
0.0
-90.037005
148.603743
false
0.0
-27.365225
0.425626
false
0.0
-26.805006
1.489867
false
0.0
99.99
99.99
0
true
0e+000
0e+000
0e+000
true
0e+000
1
false
1
false
1
false
1
false
false
9.99
0.99
1
true
0e+000
10
true
6
false
true
6
6
false
7
false
7
false
10
false
14
true
17
false
0
true
-2
false
false
1.00000
true
225.500000
false
0.500000
true
115.500000
false
0.500000
true
21.987000
true
0.500000
false
110.000000
true
0.500000
false
380.000000
true
0.500000
false
0e+000
true
0.500000
false
0e+000
0e+000
9.99
true
0.0
0.0
false
-90.000000
-100.256000
0
true
-118.000000
-18.720301
0
true
-118.000000
-18.720301
0
true
-118.000000
-18.720301
0
true
-118.000000
-18.720301
0
true
-118.000000
-18.720301
0
true
-118.000000
-18.720301
0
true
true
0.500000
false
0e+000
true
0.500000
false
0e+000
true
35.00000
true
-65.000000
true
35.00000
true
-65.000000
true
35.00000
true
-65.000000
true
35.00000
true
-65.000000
true
0.500000
false
0e+000
true
0.500000
true
10.815000
true
0.500000
false
0e+000
true
0.500000
true
123.900000
true
0.500000
false
123.900000
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
true
0e+000
0e+000
0
0e+000
150.000000
0e+000
0e+000
1.000000
-40.000000
0e+000
150.000000
0
150.000000
0e+000
0e+000
0e+000
1.000000
40.000000
0e+000
0e+000
PK ! C] ] <