Source code for cimpy.cimimport

from lxml import etree
from time import time
import importlib
import logging
from cimpy.cgmes_v2_4_15.CGMESProfile import short_profile_name

logger = logging.getLogger(__name__)


[docs] def cim_import(xml_files, cgmes_version, start_dict=None): """Function to read cimgen files and instantiate the classes This function parses xml files containing a cgmes topology and instantiates these classes with their attributes. The instantiation is done in two steps. In the first step all classes are instantiated with default values and in a second step the attributes contained in the xml files are set. The origin of all classes and attributes are stored in the class attribute serializationProfile. :param xml_files: CIM RDF/XML file. :param cgmes_version: cgmes version, e.g. "cgmes_v2_4_15" :param start_dict: a list of classes which indicates which classes will be read e.g. elements=["BaseVoltage", "ACLineSegment"] * If start_dict=None the complete file will be read :return: import_result: a dictionary containing the topology and meta information. The topology can be extracted \ via import_result['topology']. The topology dictionary contains all objects accessible via their mRID. The meta \ information can be extracted via import_result['meta_info']. The meta_info dictionary contains a new dictionary \ with the keys: 'author', 'namespaces' and 'urls'. The last two are also dictionaries. 'urls' contains a mapping \ between references to URLs and the extracted value of the URL, e.g. 'absoluteValue': \ 'http://iec.ch/TC57/2012/CIM-schema-cim16#OperationalLimitDirectionKind.absoluteValue'. These mappings are \ accessible via the name of the attribute, \ e.g. import_result['meta_info']['urls'}[attr_name] = {mapping like example above}. \ 'namespaces' is a dictionary containing all RDF namespaces used in the imported xml files. """ # Import cim version class cgmes_version_path = "cimpy." + cgmes_version # Start the clock. t0 = time() # Map used to group errors and infos logger_grouped = dict(errors={}, info={}) # Create a dict which will contain meta information and the topology import_result = start_dict if start_dict is not None else dict(meta_info={}, topology={}) # Create sub-dictionaries import_result["meta_info"] = dict(namespaces=_get_namespaces(xml_files[0]), urls={}) namespace_rdf = _get_rdf_namespace(import_result["meta_info"]["namespaces"]) # CIM element tag base (e.g. {http://iec.ch/TC57/2012/CIM-schema-cim16#} ) base = "{" + import_result["meta_info"]["namespaces"]["cim"] + "}" import_result, logger_grouped = _instantiate_classes( import_result, xml_files, cgmes_version_path, namespace_rdf, base, logger_grouped, ) import_result, logger_grouped = _set_attributes(import_result, xml_files, namespace_rdf, base, logger_grouped) if logger_grouped["errors"]: for error, count in logger_grouped["errors"].items(): logger.warning("%s: %d times", error, count) if logger_grouped["info"]: for info, count in logger_grouped["info"].items(): logger.info("%s: %d times", info, count) elapsed_time = time() - t0 logger.info("Created totally %s CIM objects in %.2f s\n\n", len(import_result["topology"]), elapsed_time) return import_result
# This function instantiates the classes defined in all RDF files. All attributes are set to default values. # The only exception is the mRID which is set for all classes that have this attribute. The attributes of a class # are set in the _set_attributes function because some attributes might be stored in one package and the class in # another. Since after this function all classes are instantiated, there should be no problem in setting the attributes. # Also the information from which package file a class was read is stored in the serializationProfile dictionary. def _instantiate_classes(import_result, xml_files, cgmes_version_path, namespace_rdf, base, logger_grouped): # Extract topology from import_result topology = import_result["topology"] # Length of element tag base m = len(base) # First step: create the dict res{uuid}=instance_of_the_cim_class for xml_file in xml_files: logger.info('START of parsing file "%s"', xml_file) # Reset stream if hasattr(xml_file, "seek"): xml_file.seek(0) # Get an iterable context = etree.iterparse(xml_file, ("start", "end")) # Turn it into an iterator (required for cElementTree). context = iter(context) # Get the root element ({http://www.w3.org/1999/02/22-rdf-syntax-ns#}RDF). _, root = next(context) package = "" for event, elem in context: # Process 'end' elements in the CGMES namespace. if event == "end" and elem.tag[:m] == base: # Check if the element has the attribute "rdf:ID" --> CGMES class located uuid = elem.get("{%s}ID" % namespace_rdf) if uuid is not None: # CIM class # Element tag without namespace (e.g. VoltageLevel). tag = elem.tag[m:] try: # Import the module for the CGMES object. module_name = cgmes_version_path + "." + tag module = importlib.import_module(module_name) except ModuleNotFoundError: error_msg = "Module {} not implemented".format(tag) try: logger_grouped["errors"][error_msg] += 1 except KeyError: logger_grouped["errors"][error_msg] = 1 root.clear() continue # Get the CGMES class from the module. klass = getattr(module, tag) # Instantiate the class and map it to the uuid. topology[uuid] = klass() info_msg = "CIM object {} created".format(module_name.split(".")[-1]) try: logger_grouped["info"][info_msg] += 1 except KeyError: logger_grouped["info"][info_msg] = 1 # Check if the class has the attribute mRID and set the mRID to the read in UUID. If the class # does not has this attribute, the UUID is only stored in the res dictionary. if hasattr(topology[uuid], "mRID"): topology[uuid].mRID = uuid if package != "": topology[uuid].serializationProfile["class"] = short_profile_name[package] else: error_msg = "Package information not found for class {}".format(klass.__class__.__name__) try: logger_grouped["errors"][error_msg] += 1 except KeyError: logger_grouped["errors"][error_msg] = 1 # Check which package is read elif event == "end": if "Model.profile" in elem.tag: for package_key in short_profile_name.keys(): if package_key in elem.text: package = package_key break # The author of all imported files should be the same, avoid multiple entries elif "author" in import_result["meta_info"].keys(): pass # Extract author elif "Model.createdBy" in elem.tag: import_result["meta_info"]["author"] = elem.text elif "Model.modelingAuthoritySet" in elem.tag: import_result["meta_info"]["author"] = elem.text # Clear children of the root element to minimise memory usage. root.clear() return import_result, logger_grouped # This function sets all attributes after the classes are instantiated by _instanciate_classes. Cyclic attributes like # PowerTransformerEnd <-> PowerTransformer are set. This function also stores the information from which package file # the attributes are read in the serializationProfile dictionary. def _set_attributes(import_result, xml_files, namespace_rdf, base, logger_grouped): topology = import_result["topology"] urls = import_result["meta_info"]["urls"] m = len(base) # Second step pass sets attributes and references. for xml_file in xml_files: # Get an iterable and turn it into an iterator (required for cElementTree). context = iter(etree.iterparse(xml_file, ("start", "end"))) # Reset stream if hasattr(xml_file, "seek"): xml_file.seek(0) # Get the root element ({http://www.w3.org/1999/02/22-rdf-syntax-ns#}RDF). _, root = next(context) package = "" for event, elem in context: # Process 'start' elements in the CGMES namespace. if event == "start" and elem.tag[:m] == base: uuid = elem.get("{%s}ID" % namespace_rdf) if uuid is None: uuid = elem.get("{%s}about" % namespace_rdf) if uuid is not None: uuid = uuid[1:] if uuid is not None: # Locate the CGMES object using the uuid. try: obj = topology[uuid] except KeyError: error_msg = "Missing {} object with uuid: {}".format(elem.tag[m:], uuid) try: logger_grouped["errors"][error_msg] += 1 except KeyError: logger_grouped["errors"][error_msg] = 1 root.clear() continue # Iterate over attributes/references. for event, elem in context: # Process end events with elements in the CIM namespace. if event == "end" and elem.tag[:m] == base: # Break if class closing element (e.g. </cim:Terminal>). if ( elem.get("{%s}ID" % namespace_rdf) is None and elem.get("{%s}about" % namespace_rdf) is None ): # Get the attribute/reference name. attr = elem.tag[m:].rsplit(".")[-1] if not hasattr(obj, attr): error_msg = "'%s' has not attribute '%s'" % ( obj.__class__.__name__, attr, ) try: logger_grouped["errors"][error_msg] += 1 except KeyError: logger_grouped["errors"][error_msg] = 1 continue # Use the rdf:resource attribute to distinguish between attributes and references/enums. uuid2 = elem.get("{%s}resource" % namespace_rdf) if uuid2 is None: # attribute # Convert value type using the default value. try: typ = type(getattr(obj, attr)) if isinstance(getattr(obj, attr), bool): # if typ==<class 'bool'> # The function bool("false") returns True, # because it is called upon non-empty string! # This means that it wrongly reads "false" value as boolean True. # This is why this special case testing is necessary. if str.title(elem.text) == "True": setattr(obj, attr, True) else: setattr(obj, attr, False) else: setattr(obj, attr, typ(elem.text)) except TypeError: try: setattr(obj, attr, elem.text) except TypeError: pass else: # reference or enum (uuid2 is not None) # Use the '#' prefix to distinguish between references and enumerations. if uuid2[0] == "#": # reference try: val = topology[uuid2[1:]] # remove '#' prefix except KeyError: error_msg = "Referenced {} [{}] object missing.".format( obj.__class__.__name__, uuid2[1:] ) try: logger_grouped["errors"][error_msg] += 1 except KeyError: logger_grouped["errors"][error_msg] = 1 continue default = getattr(obj, attr) if default is None: # 1..1 or 0..1 # Rely on properties to set any bi-directional references. setattr(obj, attr, val) elif default == "list": # Many setattr(obj, attr, [val]) elif isinstance(default, list): # Many attribute = getattr(obj, attr) if val not in attribute: attribute.append(val) setattr(obj, attr, attribute) elif default == val: # Attribute reference already resolved pass else: # Note here error_msg = ( "Multiplicity Error for class {} [{}], attribute {}. ".format( obj.__class__.__name__, uuid, attr ) + "Multiplicity should be 1..1 or 0..1" ) try: logger_grouped["errors"][error_msg] += 1 except KeyError: logger_grouped["errors"][error_msg] = 1 if hasattr(val, obj.__class__.__name__): default1 = getattr(val, obj.__class__.__name__) if default1 is None: setattr(val, obj.__class__.__name__, obj) elif default1 == "list": # Many setattr(val, obj.__class__.__name__, [obj]) elif isinstance(default1, list): # Many attribute2 = getattr(val, obj.__class__.__name__) if obj not in attribute2: attribute2.append(obj) setattr( val, obj.__class__.__name__, attribute2, ) elif default1 == obj: pass else: error_msg = ( "Multiplicity Error for class {} [{}], attribute {}. ".format( val.__class__.__name__, uuid2[1:], obj.__class__.__name__, ) + "Multiplicity should be 1..1 or 0..1" ) try: logger_grouped["errors"][error_msg] += 1 except KeyError: logger_grouped["errors"][error_msg] = 1 else: # Enum # if http in uuid2 reference to URL, create mapping if "http" in uuid2: if attr in urls.keys(): if uuid2.rsplit(".", 1)[1] not in urls[attr].keys(): urls[attr][uuid2.rsplit(".", 1)[1]] = uuid2 else: urls[attr] = {uuid2.rsplit(".", 1)[1]: uuid2} # url_reference_dict[uuid2.rsplit(".", 1)[1]] = uuid2 val = uuid2.rsplit(".", 1)[1] setattr(obj, attr, val) if package != "": obj.serializationProfile[attr] = short_profile_name[package] else: error_msg = "Package information not found for class {}, attribute {}".format( obj.__class__.__name__, attr ) try: logger_grouped["errors"][error_msg] += 1 except KeyError: logger_grouped["errors"][error_msg] = 1 else: # if elem.get("{%s}ID" % nd_rdf is not None: # Finished setting object attributes. break # Check which package is read elif event == "end" and "Model.profile" in elem.tag: for package_key in short_profile_name.keys(): if package_key in elem.text: package = package_key break # Clear children of the root element to minimise memory usage. root.clear() logger.info('END of parsing file "%s"', xml_file) return import_result, logger_grouped # Returns a map of prefix to namespace for the given XML file. def _get_namespaces(source): namespaces = {} events = ("end", "start-ns", "end-ns") for event, elem in etree.iterparse(source, events): if event == "start-ns": prefix, ns = elem namespaces[prefix] = ns elif event == "end": break # Reset stream if hasattr(source, "seek"): source.seek(0) return namespaces # Returns the RDF Namespace from the namespaces dictionary def _get_rdf_namespace(namespaces): try: namespace = namespaces["rdf"] except KeyError: ns = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" logger.warning("No rdf namespace found. Using %s", ns) return namespace