from lxml import etree
from time import time
import importlib
import logging
from cimpy.cgmes_v2_4_15.CGMESProfile import short_profile_name
logger = logging.getLogger(__name__)
[docs]
def cim_import(xml_files, cgmes_version, start_dict=None):
"""Function to read cimgen files and instantiate the classes
This function parses xml files containing a cgmes topology and instantiates these classes with their attributes.
The instantiation is done in two steps. In the first step all classes are instantiated with default values and
in a second step the attributes contained in the xml files are set. The origin of all classes and attributes are
stored in the class attribute serializationProfile.
:param xml_files: CIM RDF/XML file.
:param cgmes_version: cgmes version, e.g. "cgmes_v2_4_15"
:param start_dict: a list of classes which indicates which classes will be read
e.g. elements=["BaseVoltage", "ACLineSegment"]
* If start_dict=None the complete file will be read
:return: import_result: a dictionary containing the topology and meta information. The topology can be extracted \
via import_result['topology']. The topology dictionary contains all objects accessible via their mRID. The meta \
information can be extracted via import_result['meta_info']. The meta_info dictionary contains a new dictionary \
with the keys: 'author', 'namespaces' and 'urls'. The last two are also dictionaries. 'urls' contains a mapping \
between references to URLs and the extracted value of the URL, e.g. 'absoluteValue': \
'http://iec.ch/TC57/2012/CIM-schema-cim16#OperationalLimitDirectionKind.absoluteValue'. These mappings are \
accessible via the name of the attribute, \
e.g. import_result['meta_info']['urls'}[attr_name] = {mapping like example above}. \
'namespaces' is a dictionary containing all RDF namespaces used in the imported xml files.
"""
# Import cim version class
cgmes_version_path = "cimpy." + cgmes_version
# Start the clock.
t0 = time()
# Map used to group errors and infos
logger_grouped = dict(errors={}, info={})
# Create a dict which will contain meta information and the topology
import_result = start_dict if start_dict is not None else dict(meta_info={}, topology={})
# Create sub-dictionaries
import_result["meta_info"] = dict(namespaces=_get_namespaces(xml_files[0]), urls={})
namespace_rdf = _get_rdf_namespace(import_result["meta_info"]["namespaces"])
# CIM element tag base (e.g. {http://iec.ch/TC57/2012/CIM-schema-cim16#} )
base = "{" + import_result["meta_info"]["namespaces"]["cim"] + "}"
import_result, logger_grouped = _instantiate_classes(
import_result,
xml_files,
cgmes_version_path,
namespace_rdf,
base,
logger_grouped,
)
import_result, logger_grouped = _set_attributes(import_result, xml_files, namespace_rdf, base, logger_grouped)
if logger_grouped["errors"]:
for error, count in logger_grouped["errors"].items():
logger.warning("%s: %d times", error, count)
if logger_grouped["info"]:
for info, count in logger_grouped["info"].items():
logger.info("%s: %d times", info, count)
elapsed_time = time() - t0
logger.info("Created totally %s CIM objects in %.2f s\n\n", len(import_result["topology"]), elapsed_time)
return import_result
# This function instantiates the classes defined in all RDF files. All attributes are set to default values.
# The only exception is the mRID which is set for all classes that have this attribute. The attributes of a class
# are set in the _set_attributes function because some attributes might be stored in one package and the class in
# another. Since after this function all classes are instantiated, there should be no problem in setting the attributes.
# Also the information from which package file a class was read is stored in the serializationProfile dictionary.
def _instantiate_classes(import_result, xml_files, cgmes_version_path, namespace_rdf, base, logger_grouped):
# Extract topology from import_result
topology = import_result["topology"]
# Length of element tag base
m = len(base)
# First step: create the dict res{uuid}=instance_of_the_cim_class
for xml_file in xml_files:
logger.info('START of parsing file "%s"', xml_file)
# Reset stream
if hasattr(xml_file, "seek"):
xml_file.seek(0)
# Get an iterable
context = etree.iterparse(xml_file, ("start", "end"))
# Turn it into an iterator (required for cElementTree).
context = iter(context)
# Get the root element ({http://www.w3.org/1999/02/22-rdf-syntax-ns#}RDF).
_, root = next(context)
package = ""
for event, elem in context:
# Process 'end' elements in the CGMES namespace.
if event == "end" and elem.tag[:m] == base:
# Check if the element has the attribute "rdf:ID" --> CGMES class located
uuid = elem.get("{%s}ID" % namespace_rdf)
if uuid is not None: # CIM class
# Element tag without namespace (e.g. VoltageLevel).
tag = elem.tag[m:]
try:
# Import the module for the CGMES object.
module_name = cgmes_version_path + "." + tag
module = importlib.import_module(module_name)
except ModuleNotFoundError:
error_msg = "Module {} not implemented".format(tag)
try:
logger_grouped["errors"][error_msg] += 1
except KeyError:
logger_grouped["errors"][error_msg] = 1
root.clear()
continue
# Get the CGMES class from the module.
klass = getattr(module, tag)
# Instantiate the class and map it to the uuid.
topology[uuid] = klass()
info_msg = "CIM object {} created".format(module_name.split(".")[-1])
try:
logger_grouped["info"][info_msg] += 1
except KeyError:
logger_grouped["info"][info_msg] = 1
# Check if the class has the attribute mRID and set the mRID to the read in UUID. If the class
# does not has this attribute, the UUID is only stored in the res dictionary.
if hasattr(topology[uuid], "mRID"):
topology[uuid].mRID = uuid
if package != "":
topology[uuid].serializationProfile["class"] = short_profile_name[package]
else:
error_msg = "Package information not found for class {}".format(klass.__class__.__name__)
try:
logger_grouped["errors"][error_msg] += 1
except KeyError:
logger_grouped["errors"][error_msg] = 1
# Check which package is read
elif event == "end":
if "Model.profile" in elem.tag:
for package_key in short_profile_name.keys():
if package_key in elem.text:
package = package_key
break
# The author of all imported files should be the same, avoid multiple entries
elif "author" in import_result["meta_info"].keys():
pass
# Extract author
elif "Model.createdBy" in elem.tag:
import_result["meta_info"]["author"] = elem.text
elif "Model.modelingAuthoritySet" in elem.tag:
import_result["meta_info"]["author"] = elem.text
# Clear children of the root element to minimise memory usage.
root.clear()
return import_result, logger_grouped
# This function sets all attributes after the classes are instantiated by _instanciate_classes. Cyclic attributes like
# PowerTransformerEnd <-> PowerTransformer are set. This function also stores the information from which package file
# the attributes are read in the serializationProfile dictionary.
def _set_attributes(import_result, xml_files, namespace_rdf, base, logger_grouped):
topology = import_result["topology"]
urls = import_result["meta_info"]["urls"]
m = len(base)
# Second step pass sets attributes and references.
for xml_file in xml_files:
# Get an iterable and turn it into an iterator (required for cElementTree).
context = iter(etree.iterparse(xml_file, ("start", "end")))
# Reset stream
if hasattr(xml_file, "seek"):
xml_file.seek(0)
# Get the root element ({http://www.w3.org/1999/02/22-rdf-syntax-ns#}RDF).
_, root = next(context)
package = ""
for event, elem in context:
# Process 'start' elements in the CGMES namespace.
if event == "start" and elem.tag[:m] == base:
uuid = elem.get("{%s}ID" % namespace_rdf)
if uuid is None:
uuid = elem.get("{%s}about" % namespace_rdf)
if uuid is not None:
uuid = uuid[1:]
if uuid is not None:
# Locate the CGMES object using the uuid.
try:
obj = topology[uuid]
except KeyError:
error_msg = "Missing {} object with uuid: {}".format(elem.tag[m:], uuid)
try:
logger_grouped["errors"][error_msg] += 1
except KeyError:
logger_grouped["errors"][error_msg] = 1
root.clear()
continue
# Iterate over attributes/references.
for event, elem in context:
# Process end events with elements in the CIM namespace.
if event == "end" and elem.tag[:m] == base:
# Break if class closing element (e.g. </cim:Terminal>).
if (
elem.get("{%s}ID" % namespace_rdf) is None
and elem.get("{%s}about" % namespace_rdf) is None
):
# Get the attribute/reference name.
attr = elem.tag[m:].rsplit(".")[-1]
if not hasattr(obj, attr):
error_msg = "'%s' has not attribute '%s'" % (
obj.__class__.__name__,
attr,
)
try:
logger_grouped["errors"][error_msg] += 1
except KeyError:
logger_grouped["errors"][error_msg] = 1
continue
# Use the rdf:resource attribute to distinguish between attributes and references/enums.
uuid2 = elem.get("{%s}resource" % namespace_rdf)
if uuid2 is None: # attribute
# Convert value type using the default value.
try:
typ = type(getattr(obj, attr))
if isinstance(getattr(obj, attr), bool): # if typ==<class 'bool'>
# The function bool("false") returns True,
# because it is called upon non-empty string!
# This means that it wrongly reads "false" value as boolean True.
# This is why this special case testing is necessary.
if str.title(elem.text) == "True":
setattr(obj, attr, True)
else:
setattr(obj, attr, False)
else:
setattr(obj, attr, typ(elem.text))
except TypeError:
try:
setattr(obj, attr, elem.text)
except TypeError:
pass
else: # reference or enum (uuid2 is not None)
# Use the '#' prefix to distinguish between references and enumerations.
if uuid2[0] == "#": # reference
try:
val = topology[uuid2[1:]] # remove '#' prefix
except KeyError:
error_msg = "Referenced {} [{}] object missing.".format(
obj.__class__.__name__, uuid2[1:]
)
try:
logger_grouped["errors"][error_msg] += 1
except KeyError:
logger_grouped["errors"][error_msg] = 1
continue
default = getattr(obj, attr)
if default is None: # 1..1 or 0..1
# Rely on properties to set any bi-directional references.
setattr(obj, attr, val)
elif default == "list": # Many
setattr(obj, attr, [val])
elif isinstance(default, list): # Many
attribute = getattr(obj, attr)
if val not in attribute:
attribute.append(val)
setattr(obj, attr, attribute)
elif default == val:
# Attribute reference already resolved
pass
else:
# Note here
error_msg = (
"Multiplicity Error for class {} [{}], attribute {}. ".format(
obj.__class__.__name__, uuid, attr
)
+ "Multiplicity should be 1..1 or 0..1"
)
try:
logger_grouped["errors"][error_msg] += 1
except KeyError:
logger_grouped["errors"][error_msg] = 1
if hasattr(val, obj.__class__.__name__):
default1 = getattr(val, obj.__class__.__name__)
if default1 is None:
setattr(val, obj.__class__.__name__, obj)
elif default1 == "list": # Many
setattr(val, obj.__class__.__name__, [obj])
elif isinstance(default1, list): # Many
attribute2 = getattr(val, obj.__class__.__name__)
if obj not in attribute2:
attribute2.append(obj)
setattr(
val,
obj.__class__.__name__,
attribute2,
)
elif default1 == obj:
pass
else:
error_msg = (
"Multiplicity Error for class {} [{}], attribute {}. ".format(
val.__class__.__name__,
uuid2[1:],
obj.__class__.__name__,
)
+ "Multiplicity should be 1..1 or 0..1"
)
try:
logger_grouped["errors"][error_msg] += 1
except KeyError:
logger_grouped["errors"][error_msg] = 1
else: # Enum
# if http in uuid2 reference to URL, create mapping
if "http" in uuid2:
if attr in urls.keys():
if uuid2.rsplit(".", 1)[1] not in urls[attr].keys():
urls[attr][uuid2.rsplit(".", 1)[1]] = uuid2
else:
urls[attr] = {uuid2.rsplit(".", 1)[1]: uuid2}
# url_reference_dict[uuid2.rsplit(".", 1)[1]] = uuid2
val = uuid2.rsplit(".", 1)[1]
setattr(obj, attr, val)
if package != "":
obj.serializationProfile[attr] = short_profile_name[package]
else:
error_msg = "Package information not found for class {}, attribute {}".format(
obj.__class__.__name__, attr
)
try:
logger_grouped["errors"][error_msg] += 1
except KeyError:
logger_grouped["errors"][error_msg] = 1
else: # if elem.get("{%s}ID" % nd_rdf is not None:
# Finished setting object attributes.
break
# Check which package is read
elif event == "end" and "Model.profile" in elem.tag:
for package_key in short_profile_name.keys():
if package_key in elem.text:
package = package_key
break
# Clear children of the root element to minimise memory usage.
root.clear()
logger.info('END of parsing file "%s"', xml_file)
return import_result, logger_grouped
# Returns a map of prefix to namespace for the given XML file.
def _get_namespaces(source):
namespaces = {}
events = ("end", "start-ns", "end-ns")
for event, elem in etree.iterparse(source, events):
if event == "start-ns":
prefix, ns = elem
namespaces[prefix] = ns
elif event == "end":
break
# Reset stream
if hasattr(source, "seek"):
source.seek(0)
return namespaces
# Returns the RDF Namespace from the namespaces dictionary
def _get_rdf_namespace(namespaces):
try:
namespace = namespaces["rdf"]
except KeyError:
ns = "http://www.w3.org/1999/02/22-rdf-syntax-ns#"
logger.warning("No rdf namespace found. Using %s", ns)
return namespace