import os
import importlib
import chevron
from datetime import datetime
from time import time
from cimpy.cgmes_v2_4_15.Base import Profile
import logging
import sys
from cimpy.cgmes_v2_4_15.Base import Base
cgmesProfile = Base.cgmesProfile
from pathlib import Path
import copy
logger = logging.getLogger(__name__)
# This function gets all attributes of an object and resolves references to other objects
def _get_class_attributes_with_references(import_result, version):
class_attributes_list = []
# extract topology and urls
topology = import_result['topology']
urls = import_result['meta_info']['urls']
for key in topology.keys():
class_dict = dict(name=topology[key].__class__.__name__)
class_dict['mRID'] = key
# array containing all attributes, attribute references to objects
attributes_dict = _get_attributes(topology[key])
# change attribute references to mRID of the object, res needed because classes like SvPowerFlow does not have
# mRID as an attribute. Therefore the corresponding class has to be searched in the res dictionary
class_dict['attributes'] = _get_reference_uuid(attributes_dict, version, topology, key, urls)
class_attributes_list.append(class_dict)
del class_dict
return class_attributes_list
# This function resolves references to objects
def _get_reference_uuid(attr_dict, version, topology, mRID, urls):
reference_list = []
base_class_name = 'cimpy.' + version + '.Base'
base_module = importlib.import_module(base_class_name)
base_class = getattr(base_module, 'Base')
for key in attr_dict:
if key in ['serializationProfile', 'possibleProfileList']:
reference_list.append({key: attr_dict[key]})
continue
attributes = {}
if isinstance(attr_dict[key], list): # many
array = []
for elem in attr_dict[key]:
if issubclass(type(elem), base_class):
# classes like SvVoltage does not have an attribute called mRID, the mRID is only stored as a key
# for this object in the res dictionary
# The % added before the mRID is used in the lambda _set_attribute_or_reference
if not hasattr(elem, 'mRID'):
# search for the object in the res dictionary and return the mRID
UUID = '%' + _search_mRID(elem, topology)
if UUID == '%':
logger.warning('Object of type {} not found as reference for object with UUID {}.'.format(
elem.__class__.__name__, mRID))
else:
UUID = '%' + elem.mRID
array.append(UUID)
else:
logger.warning('Reference object not subclass of Base class for object with UUID {}.'.format(mRID))
if len(array) == 1:
attributes['value'] = array[0]
else:
attributes['value'] = array
elif issubclass(type(attr_dict[key]), base_class): # 0..1, 1..1
# resource = key + ' rdf:resource='
if not hasattr(attr_dict[key], 'mRID'):
# search for object in res dict and return mRID
# The % added before the mRID is used in the lambda _set_attribute_or_reference
UUID = '%' + _search_mRID(attr_dict[key], topology)
if UUID == '%':
logger.warning('Object of type {} not found as reference for object with UUID {}.'.format(
attr_dict[key].__class__.__name__, mRID))
else:
UUID = '%' + attr_dict[key].mRID
attributes['value'] = UUID
elif attr_dict[key] == "" or attr_dict[key] is None:
pass
else:
# attribute in urls dict?
if key.split('.')[1] in urls.keys():
# value in urls dict? should always be true
if attr_dict[key] in urls[key.split('.')[1]].keys():
attributes['value'] = '%URL%' + urls[key.split('.')[1]][attr_dict[key]]
else:
logger.warning('URL reference for attribute {} and value {} not found!'.format(
key.split('.')[1], attr_dict[key]))
else:
attributes['value'] = attr_dict[key]
attributes['attr_name'] = key
if 'value' in attributes.keys():
if isinstance(attributes['value'], list):
for reference_item in attributes['value']:
# ignore default values
if reference_item not in ['', None, 0.0, 0]:
reference_list.append({'value': reference_item, 'attr_name': key})
# ignore default values
elif attributes['value'] not in ['', None, 0.0, 0, 'list']:
reference_list.append(attributes)
return reference_list
# This function searches a class_object in the res dictionary and returns the corresponding key (the mRID). Necessary
# for classes without mRID as attribute like SvVoltage
def _search_mRID(class_object, topology):
for mRID, class_obj in topology.items():
if class_object == class_obj:
return mRID
return ""
# Lambda function for chevron renderer to decide whether the current element is a reference or an attribute
def _set_attribute_or_reference(text, render):
result = render(text)
result = result.split('@')
value = result[0]
attr_name = result[1]
if '%URL%' in value:
reference = value.split('%')[2]
return ' rdf:resource="' + reference + '"/>'
elif '%' in value:
reference = value.split('%')[1]
return ' rdf:resource="#' + reference + '"/>'
else:
return '>' + value + '</cim:' + attr_name + '>'
# Lambda function for chevron renderer to set an attribute or a reference in the model description.
def _set_attribute_or_reference_model(text, render):
result = render(text)
result = result.split('@')
value = result[0]
attr_name = result[1]
if '%' in value:
reference = value.split('%')[1]
return ' rdf:resource="' + reference + '"/>'
else:
return '>' + value + '</md:Model.' + attr_name + '>'
# Restructures the namespaces dict into a list. The template engine writes each entry in the RDF header
def _create_namespaces_list(namespaces_dict):
namespaces_list = []
for key in namespaces_dict:
namespace = dict(key=key, url=namespaces_dict[key])
namespaces_list.append(namespace)
return namespaces_list
# This function sorts the classes and their attributes to the corresponding profiles. Either the classes/attributes are
# imported or they are set afterwards. In the first case the serializationProfile is used to determine from which
# profile this class/attribute was read. If an entry exists the class/attribute is added to this profile. In the
# possibleProfileList dictionary the possible origins of the class/attributes is stored. All profiles have a different
# priority which is stored in the enum cgmesProfile. As default the smallest entry in the dictionary is used to
# determine the profile for the class/attributes.
def _sort_classes_to_profile(class_attributes_list, activeProfileList):
export_dict = {}
export_about_dict = {}
# iterate over classes
for klass in class_attributes_list:
same_package_list = []
about_dict = {}
# store serializationProfile and possibleProfileList
# serializationProfile class attribute, same for multiple instances of same class, only last origin of variable stored
serializationProfile = copy.deepcopy(klass['attributes'][0]['serializationProfile'])
possibleProfileList = copy.deepcopy(klass['attributes'][1]['possibleProfileList'])
class_serializationProfile = ''
if 'class' in serializationProfile.keys():
# class was imported
if Profile[serializationProfile['class']] in activeProfileList:
# else: class origin profile not active for export, get active profile from possibleProfileList
if Profile[serializationProfile['class']].value in possibleProfileList[klass['name']]['class']:
# profile active and in possibleProfileList
# else: class should not have been imported from this profile, get allowed profile
# from possibleProfileList
class_serializationProfile = serializationProfile['class']
else:
logger.warning('Class {} was read from profile {} but this profile is not possible for this class'
.format(klass['name'], serializationProfile['class']))
else:
logger.info('Class {} was read from profile {} but this profile is not active for the export. Use'
'default profile from possibleProfileList.'.format(klass['name'], serializationProfile['class']))
if class_serializationProfile == '':
# class was created
if klass['name'] in possibleProfileList.keys():
if 'class' in possibleProfileList[klass['name']].keys():
possibleProfileList[klass['name']]['class'].sort()
for klass_profile in possibleProfileList[klass['name']]['class']:
if Profile(klass_profile).name in activeProfileList:
# active profile for class export found
class_serializationProfile = Profile(klass_profile).name
break
if class_serializationProfile == '':
# no profile in possibleProfileList active
logger.warning('All possible export profiles for class {} not active. Skip class for export.'
.format(klass['name']))
continue
else:
logger.warning('Class {} has no profile to export to.'.format(klass['name']))
else:
logger.warning('Class {} has no profile to export to.'.format(klass['name']))
# iterate over attributes
for attribute in klass['attributes']:
if 'attr_name' in attribute.keys():
attribute_class = attribute['attr_name'].split('.')[0]
attribute_name = attribute['attr_name'].split('.')[1]
# IdentifiedObject.mRID is not exported as an attribute
if attribute_name == 'mRID':
continue
attribute_serializationProfile = ''
if attribute_name in serializationProfile.keys():
# attribute was imported
if Profile[serializationProfile[attribute_name]] in activeProfileList:
attr_value = Profile[serializationProfile[attribute_name]].value
if attr_value in possibleProfileList[attribute_class][attribute_name]:
attribute_serializationProfile = serializationProfile[attribute_name]
if attribute_serializationProfile == '':
# attribute was added
if attribute_class in possibleProfileList.keys():
if attribute_name in possibleProfileList[attribute_class].keys():
possibleProfileList[attribute_class][attribute_name].sort()
for attr_profile in possibleProfileList[attribute_class][attribute_name]:
if Profile(attr_profile) in activeProfileList:
# active profile for class export found
attribute_serializationProfile = Profile(attr_profile).name
break
if attribute_serializationProfile == '':
# no profile in possibleProfileList active, skip attribute
logger.warning('All possible export profiles for attribute {}.{} of class {} '
'not active. Skip attribute for export.'
.format(attribute_class, attribute_name, klass['name']))
continue
else:
logger.warning('Attribute {}.{} of class {} has no profile to export to.'.
format(attribute_class, attribute_name, klass['name']))
else:
logger.warning('The class {} for attribute {} is not in the possibleProfileList'.format(
attribute_class, attribute_name))
if attribute_serializationProfile == class_serializationProfile:
# class and current attribute belong to same profile
same_package_list.append(attribute)
else:
# class and current attribute does not belong to same profile -> rdf:about in
# attribute origin profile
if attribute_serializationProfile in about_dict.keys():
about_dict[attribute_serializationProfile].append(attribute)
else:
about_dict[attribute_serializationProfile] = [attribute]
# add class with all attributes in the same profile to the export dict sorted by the profile
if class_serializationProfile in export_dict.keys():
export_class = dict(name=klass['name'], mRID=klass['mRID'], attributes=same_package_list)
export_dict[class_serializationProfile]['classes'].append(export_class)
del export_class
else:
export_class = dict(name=klass['name'], mRID=klass['mRID'], attributes=same_package_list)
export_dict[class_serializationProfile] = {'classes': [export_class]}
# add class with all attributes defined in another profile to the about_key sorted by the profile
for about_key in about_dict.keys():
if about_key in export_about_dict.keys():
export_about_class = dict(name=klass['name'], mRID=klass['mRID'], attributes=about_dict[about_key])
export_about_dict[about_key]['classes'].append(export_about_class)
else:
export_about_class = dict(name=klass['name'], mRID=klass['mRID'], attributes=about_dict[about_key])
export_about_dict[about_key] = {'classes': [export_about_class]}
return export_dict, export_about_dict
[docs]def cim_export(import_result, file_name, version, activeProfileList):
"""Function for serialization of cgmes classes
This function serializes cgmes classes with the template engine chevron. The classes are separated by their profile
and one xml file for each profile is created. The package name is added after the file name. The
set_attributes_or_reference function is a lamda function for chevron to decide whether the value of an attribute is
a reference to another class object or not.
:param import_result: a dictionary containing the topology and meta information. The topology can be extracted via \
:func:`~cimpy.cimimport.cim_import()`. The topology dictionary contains all objects accessible via their mRID. The meta \
information can be extracted via import_result['meta_info']. The meta_info dictionary contains a new dictionary with \
the keys: 'author', 'namespaces' and 'urls'. The last two are also dictionaries. 'urls' contains a mapping \
between references to URLs and the extracted value of the URL, e.g. 'absoluteValue': \
'http://iec.ch/TC57/2012/CIM-schema-cim16#OperationalLimitDirectionKind.absoluteValue' These mappings are accessible \
via the name of the attribute, e.g. import_result['meta_info']['urls'}[attr_name] = {mapping like example above}. \
'namespaces' is a dictionary containing all RDF namespaces used in the imported xml files.
:param file_name: a string with the name of the xml files which will be created
:param version: cgmes version, e.g. version = "cgmes_v2_4_15"
:param activeProfileList: a list containing the strings of all short names of the profiles used for serialization
"""
t0 = time()
logger.info('Start export procedure.')
profile_list = list(map(lambda a: Profile[a], activeProfileList))
# iterate over all profiles
for profile in profile_list:
# File name
full_file_name = file_name + '_' + profile.long_name() + '.xml'
if not os.path.exists(full_file_name):
output = generate_xml(import_result, version, file_name, profile, profile_list)
with open(full_file_name, 'w') as file:
logger.info('Write file \"%s\"', full_file_name)
file.write(output)
else:
logger.error('File {} already exists. Delete file or change file name to serialize CGMES '
'classes.'.format(full_file_name))
print('[ERROR:] File {} already exists. Delete file or change file name to serialize CGMES '
'classes.'.format(full_file_name), file=sys.stderr)
exit(-1)
logger.info('End export procedure. Elapsed time: {}'.format(time() - t0))
[docs]def generate_xml(cim_data, version, model_name, profile, available_profiles):
"""Function for serialization of cgmes classes
This function serializes cgmes classes with the template engine chevron and returns them as a string.
:param cim_data: a dictionary containing the topology and meta information. It can be created via :func:`~cimpy.cimimport.cim_import()`
:param version: cgmes version, e.g. ``version="cgmes_v2_4_15"``
:param profile: The :class:`~cimpy.cgmes_v2_4_15.Base.Profile` for which the serialization should be generated.
:param model_name: a string with the name of the model.
:param available_profiles: a list of all :class:`~cimpy.cgmes_v2_4_15.Base.Profile`s in `cim_data`
"""
# returns all classes with their attributes and resolved references
class_attributes_list = _get_class_attributes_with_references(
cim_data, version)
# determine class and attribute export profiles. The export dict contains all classes and their attributes where
# the class definition and the attribute definitions are in the same profile. Every entry in about_dict generates
# a rdf:about in another profile
export_dict, about_dict = _sort_classes_to_profile(
class_attributes_list, available_profiles)
namespaces_list = _create_namespaces_list(
cim_data['meta_info']['namespaces'])
if profile.name not in export_dict.keys() and profile.name not in about_dict.keys():
raise RuntimeError("Profile " + profile.name + " not available for export, export_dict=" + str(export_dict.keys()) + ' and about_dict='+ str(about_dict.keys()) + '.')
# extract class lists from export_dict and about_dict
if profile.name in export_dict.keys():
classes = export_dict[profile.name]['classes']
else:
classes = False
if profile.name in about_dict.keys():
about = about_dict[profile.name]['classes']
else:
about = False
#Model header
model_description = {
'mRID': model_name,
'description': [
{'attr_name': 'created', 'value': datetime.now().strftime(
"%d/%m/%Y %H:%M:%S")},
{'attr_name': 'modelingAuthoritySet',
'value': 'www.sogno.energy'},
{'attr_name': 'profile',
'value': profile.long_name()}
]
}
template_path = Path(os.path.join(os.path.dirname(__file__), 'export_template.mustache')).resolve()
with open(template_path) as f:
output = chevron.render(f, {"classes": classes,
"about": about,
"set_attributes_or_reference": _set_attribute_or_reference,
"set_attributes_or_reference_model": _set_attribute_or_reference_model,
"namespaces": namespaces_list,
"model": [model_description]})
del model_description
return output
# This function extracts all attributes from class_object in the form of Class_Name.Attribute_Name
def _get_attributes(class_object):
inheritance_list = [class_object]
class_type = type(class_object)
parent = class_object
# get parent classes
while 'Base.Base' not in str(class_type):
parent = parent.__class__.__bases__[0]()
# insert parent class at beginning of list, classes inherit from top to bottom
inheritance_list.insert(0, parent)
class_type = type(parent)
# dictionary containing all attributes with key: 'Class_Name.Attribute_Name'
attributes_dict = dict(serializationProfile=class_object.serializationProfile, possibleProfileList={})
# __dict__ of a subclass returns also the attributes of the parent classes
# to avoid multiple attributes create list with all attributes already processed
attributes_list = []
# iterate over parent classes from top to bottom
for parent_class in inheritance_list:
# get all attributes of the current parent class
parent_attributes_dict = parent_class.__dict__
class_name = parent_class.__class__.__name__
# check if new attribute or old attribute
for key in parent_attributes_dict.keys():
if key not in attributes_list:
attributes_list.append(key)
attributes_name = class_name + '.' + key
attributes_dict[attributes_name] = getattr(class_object, key)
else:
continue
# get all possibleProfileLists from all parent classes except the Base class (no attributes)
# the serializationProfile from parent classes is not needed because entries in the serializationProfile
# are only generated for the inherited class
if class_name != 'Base':
attributes_dict['possibleProfileList'][class_name] = parent_class.possibleProfileList
return attributes_dict