#!c:\Program Files\Python39\python.exe # -*- coding: UTF-8 -*- ''' $RCSfile: mapping.py,v $ $Revision$ $Author: markus $ $Date$ The BioCASE querytool ''' import os # ***** include the biocase.lib directory in the python sys path for importing ***** exec(open(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir, os.path.pardir, 'lib', 'biocase', 'adjustpath.py'))).read()) exec(open(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir, os.path.pardir, 'lib', 'biocase', 'appinit.py'))).read()) from biocase.configtool.general import * from biocase.wrapper.cmf_base import CMFClass from biocase.wrapper.cmf_element import CMFMappingClass from biocase.tools.various_functions import unique, flatten, isTrue from biocase.tools.caching import getPickleFilename from biocase.wrapper.typeconverter import DBAttributeClass from biocase.cmfinfo import CMFTemplateListClass from biocase import __version__ # ------------------------------------------------------------------------------------ def parseForm(form, cmfObj, newConcepts=[]): '''parse form, update cmfObj and return list of concepts (xpaths) to be shown additionally.''' # form header data cmfObj.setRootTableAlias( form.getfirst('root_tablealias', None) ) cmfObj.setStaticTableAliases( [form.getfirst('static_tablealias', None)] ) # mapping mapAction=form.getfirst('cmf_action',None) log.debug("CMF ACTION=%s"%mapAction) if mapAction is not None: pos = int( form.getfirst('map_pos',-1) ) -1 concept= form.getfirst('concept',None) node = cmfObj.getConceptByPath(concept) # create mapping object from form items = [] for i in [1,2,3,4,5]: # literal lit = form.getfirst('lit%i'%i, '') if len(lit) > 0: log.debug( "Literal found for mapping: %s"%lit ) items.append(lit) #db attribute dbaString = form.getfirst('dba%i'%i, '') log.info("blubb") log.info(dbaString) if len(dbaString) > 3: dbaObj = DBAttributeClass() (dbaObj.table,dbaObj.attr,dbaObj.type) = dbaString.split('.') if len(dbaObj.attr) > 0: log.debug( "DBAttribute found for mapping: %s"%str(dbaObj)) items.append(dbaObj) if len(items) > 0: mapObj = CMFMappingClass(items) else: mapObj = None # to things with it if mapAction == 'edit' and mapObj is not None: node.setMapping(pos, mapObj) elif mapAction == 'del': log.debug( "Removed mapping at pos: %i"%(pos)) node.delMapping(pos) elif mapAction == 'insert' and mapObj is not None: log.debug( "Insert new mapping") node.addMapping(mapObj) elif mapAction == 'activate': log.debug( "Activate supressed rep node") node.RepeatDont = 0 elif mapAction == 'deactivate': log.debug( "Deactivate active rep node") node.RepeatDont = 1 log.debug( "New XPaths: %s"%str(newConcepts)) return newConcepts # ------------------------------------------------------------------------------------ def createNewCMF(filename): import shutil global dsaObj origF = os.path.join(cfg.rawCmfTemplateLocator, filename) targetF = os.path.join(dsaObj.getDsaDir(), filename) if not os.path.isfile(targetF): shutil.copyfile(origF, targetF) return True return False # ------------------------------------------------------------------------------------ def getSharedPrefix(a,b,startLen=0): shared = '' if len(a) > len(b): tmp = a a = b b = tmp try: a = a[startLen:] b = b[startLen:] for part in a.split('/'): #print " part='%s', sh='%s'"%(part, shared) if b.startswith(shared+part+'/'): shared = shared+part+'/' else: break except: # catch errors when lists are smaller than startLen. No shared prefix then pass return shared # ------------------------------------------------------------------------------------ def getMinSharedPrefix(lastShared, path, pathList): nextSharedList = [getSharedPrefix(p,path,len(lastShared)) for p in pathList] #print nextSharedList # remove empty strings and make list unique nextSharedList = unique( [sh for sh in nextSharedList if len(sh)>0] ) nextSharedList = sorted(nextSharedList, key=len) if len(nextSharedList) > 0: nextShared = nextSharedList[0] else: nextShared = '' return nextShared # ------------------------------------------------------------------------------------ def minimalizeTreePaths(pathTuples): '''Identifies the shared path prefix for a list of tuples (path, any obj). Returns a new list of tuples (path, obj) with the common shared part of the path acting as just 1 element. The element seperators "/" of the shared path are replaced by a simple space.''' log.debug( "MINIMALIZE TREE PATHS") lastPath = '/' class Stable: def __init__(self,path,obj): self.path=path self.val =obj objList = [] for p,obj in pathTuples: objList.append(Stable(p,obj)) for obj in objList: p = obj.path #log.debug("--> %s (lastPath=%s)"%(p,lastPath)) # find shared part with last path lastShared = getSharedPrefix(lastPath, p) rest = p[len(lastShared):] # find shared part of rest with others in the group with the same shared prefix group = [o for o,garbage in pathTuples if o.startswith(lastShared) and o != p] nextShared = getMinSharedPrefix(lastShared, p, group) # update all paths if len(nextShared) > 1: old = lastShared+nextShared new = lastShared+nextShared.replace('/',' ')[:-1]+'/' for tmp in objList: # temporarily add a slash to the last element, cause leaf nodes wont be found otherwise tmp.path += "/" if tmp.path.startswith(old): tmp.path = tmp.path.replace(old,new,1) # remove temp slash at the end tmp.path = tmp.path[:-1] # set last path lastPath = obj.path log.debug( str([o.path for o in objList])) return [(o.path, o.val) for o in objList] # ------------------------------------------------------------------------------------ def replaceAttributeCharacters(x): return x.replace('@',' ').replace ('[',' ') def formatNodeList(cmfObjList, path2mappings): '''Takes a list of cmf nodes and produces a html tree representation for the configtool.''' tree = [] displayedPaths = {} # display tree in order of xpaths. sort objects by their paths cmfObjList = sorted(cmfObjList, key=lambda x: replaceAttributeCharacters(x.xpath)) log.debug( "SORTED CMF nodes:") for n in cmfObjList: log.debug( n.xpath) # calculate new paths with the shared path part having no "/" seperating the elements but spaces. pathTuple = minimalizeTreePaths( [(obj.xpath,obj) for obj in cmfObjList] ) preTree=[] for path, cmfObj in pathTuple: # get parts of the path, with shared paths covering potentially several elements (seperated by spaces) parts = path.split('/') currPath='' for part in parts: if len(part)==0: continue currPath += '/'+part if currPath not in displayedPaths: displayedPaths[currPath]=1 # append non-leaf node preTree.append( currPath ) # append leaf node preTree.append(cmfObj) cIdx = 0 # the concept index for naming iframes, etc... lastMinimizedPath = "R O O T . . ." # shouldnt be displayed anywhere, cause list shouldnt start with an cmfObj log.debug( "The preTREE...") for p in preTree: try: log.debug( "OBJ: %s" % p.xpath) except: log.debug("LIT: %s" % p) cIdx += 1 log.info(type(p)) if not isinstance(p, str): # If it's a node, and not a leaf: The Node has been added before # We just need to check whether this node is mandatory (fix for LIDO) # if so, change its style to red (but only if it doesn't have any mapped descendants) if not p.hasW3CType(): if p.RepeatMin > 0 and not p.hasMappedDescendants(): tree.append(tree.pop().replace('', '', 1)) # CONCEPT (leaf node) # remove last entry with only the path if its a leaf or rep node without any filter if not p.hasW3CType() and not p.isRepNode() and not p.hasFilter(): continue elif not p.hasW3CType() and not p.isRepNode() and p.hasFilter(): # if its non of the above but with a filter, leave path entry and only add filter info tree.append(getFilterHtml(p,level)) continue # Now we have the whole object for more details. Remove last paths entry tree.pop() style='' links=''%p.xpath # MANDATORY ELEMENT ? if p.RepeatMin > 0 and not p.hasMappings() and p.hasW3CType(): style='color:red' elif p.hasMappings(): style='color:green' # DEBUG if p.isRepNode() is not False: log.debug( "potential REP NODE FOUND: %s"%p.xpath) # active or supressed REP NODE ? if p.isRepNode(): # rep node. is it active? if p.isRepNode() == 1: # ACTIVE rep node log.debug("active REP NODE FOUND: %s"%p.xpath) style='color:#4C7197' # deactivate link links += ''''''%p.xpath elif p.isRepNode() == 2: # SUPRESSED rep node log.debug("supressed REP NODE FOUND: %s"%p.xpath) style='color:#9DB7D3' # activate link links += ''''''%p.xpath else: # INACTIVE rep node pass # LEAF NODE WITH W3C TYPE SET AND READY FOR MAPPINGS if p.hasW3CType(): # editing links links += ''''''%p.xpath # MAIN ENTRY GIVING THE ELEMENT NAME, entire html row # trick: rename attributes to start with @ only: displayName = lastMinimizedPath if p.isAttribute(): displayName = displayName[displayName.find('@'):-1] level += 1 tree.append('
%s%s %s
' %(" "*level, style, displayName, links )) # MAPPINGS if p.hasMappings(): mapIdx=1 for mapObj in p.getMappings(): # editing links links = """   """%(p.xpath, mapIdx, p.xpath, mapIdx) # entire html row tree.append( '
%s%s %s
' %(" "*(level+1), "Mapping%i: %s"%(mapIdx, str(mapObj)), links )) mapIdx +=1 # FILTER EXISTS? if p.hasFilter(): tree.append(getFilterHtml(p,level)) else: # NODE: only the new path is given. object might follow with next entry-> pop() # display node. only show last part of path, replacing all spaces with slashes again level = p.count('/')-1 name = p.split('/')[-1].replace(' ','/') lastMinimizedPath = name # check whether it's a node that has a mapping somewhere in the sub-tree has_mapping = 0 for xpath in path2mappings: if xpath.startswith(p): has_mapping = 1 break # always append the name. if its a real concept with cmfObj, then this item is removed from the list via tree.pop() and more detailed data is inserted tree.append('
%s- %s
' % (" "*level, has_mapping, name)) return tree def getFilterHtml(p, level): return '
%s%s
' %(" "*level, "Filter: %s"%str(p.getFilter()) ) # ------------------------------------------------------------------------------------ def getNodeList(cmfObj, reqPaths=[], showAll=False): '''Returns the list of relevant cmfNode objects.''' # show all concepts? if showAll: log.debug("Show all concepts!") #return cmfObj.listConcepts(returnObjects=True) nodes = cmfObj.listConcepts(returnObjects=True) else: # all mapped nodes nodes = cmfObj.getMappedConcepts() nodes.append( cmfObj.rootCMFElementObj ) # all requested nodes (e.g. new ones) for path in reqPaths: obj = cmfObj.getConceptByPath(path) if obj is not None: nodes.append( obj ) # get all parental nodes of them too nodes += unique(flatten([n.getParents() for n in nodes])) # get all mandatory children nodes += unique(flatten([n.getMandatoryDescendants() for n in nodes])) # Last, if not all concepts are shown, add all descendants for empty mandatory nodes (ease mapping of mandatory nodes when starting a fresh LIDO mapping) if not showAll: for node in [n for n in nodes if n.RepeatMin > 0 and not n.hasW3CType()]: # check if there will be any descendants of that node be shown in the tree if len([n.xpath for n in nodes if n.xpath != node.xpath and node.xpath in n.xpath]) == 0: # if not, just add all descendants log.debug("Adding descendants for mandatory node %s." % node.xpath) nodes += node.getDescendants() # select every node only once log.debug( "Number of selected nodes %i"%len(nodes)) nodes = unique(nodes) #log.debug( "\n\n

UNIQUE SELECTED NODES...") #for n in nodes: # log.debug( str(n.xpath)) log.debug( "Number of unique selected nodes %i"%len(nodes)) # only select important nodes - removed in preparation for LIDO #nodes = [n for n in nodes if n.hasFilter() or n.hasMappings() or n.hasW3CType() or ( not n.isAttribute() and (n.isRepNode() or n.hasMandatoryAttributes() ))] log.debug( "Number of interesting unique selected nodes %i"%len(nodes)) # check that all attributes have their parent node selected attrs = [n for n in nodes if n.isAttribute()] nodes = unique( nodes + [a.parent for a in attrs] ) log.debug( "Number of interesting unique selected nodes incl all attribute parents %i"%len(nodes)) #for n in nodes: # log.debug(str(n)) return nodes # ------------------------------------------------------------------------------------ def getDisplayTree(cmfObj, newConcepts=[], showAll=False): nodes = getNodeList(cmfObj, newConcepts, showAll) try: # Replace first slash separator to indicate shared path - removed in preparation for LIDO # path2mappings = ['/' + p[1:].replace('/', ' ', 1) for p in cmfObj.path2mappings.keys()] path2mappings = list(cmfObj.path2mappings.keys()) return "\n".join(formatNodeList(nodes, path2mappings)) except UnicodeDecodeError: revert() tmpl['show_encoding_error'] = "show_encoding_error();" def revert(): ###################################### readOriginalCmFile(cmfObj, cmFile=cmFile, psfObj=psfObj) log.debug( "altRootTableAliasSPICE = %s" % str(cmfObj.altRootTableAliasSPICE)) # try to remove pickled file if it exists ... newConcepts = [] newConceptsXPaths=[] if os.path.isfile(cmFilePickle): # delete existing tmp pickle os.remove(cmFilePickle) log.debug( "Removed pickled tmp cmFile %s"%cmFilePickle) # remove real pickle cmFileOrigPickle = getPickleFilename(cmFile) if os.path.isfile(cmFileOrigPickle): # delete existing tmp pickle os.remove(cmFileOrigPickle) log.debug("Removed cmFile pickle %s"%cmFileOrigPickle) else: log.debug("There was no cmFile pickle to be removed %s"%cmFileOrigPickle) ###################################### ############################################################################################################ # # MAIN # #=========================================================================================================== # check datasource availablility if dsaObj is None: exec(open(os.path.abspath(os.path.join( os.path.dirname( __file__ ), 'main.cgi' ))).read()) sys.exit() # check authentication! authorize(form, form.getfirst("dsa"), form.getfirst("schema")) # # create a new CMF for this dsa? # # load template tmpl = PageMacro('Content', PageMacro.DELMODE) if action == 'create': schema = form.getfirst('newschema', None).strip() schema_name = schema.replace("cmf_", "") s = CMFTemplateListClass().getSchemaByName(schema_name) cmf_list = dsaObj.getSchemaListObj() # If schema is already defined for this dsa, display error and go back to previous page. if s.NS in [cmf.NS for cmf in cmf_list]: tmpl.load('Content', os.path.join(templateDir, '_mappingError.html')) tmpl['dsa'] = dsaObj.name tmpl['schema'] = schema_name tmpl['returnUrl'] = "datasource.cgi" tmpl['message'] = "A schema with the namespace " + s.NS + " is already defined for datasource '" + dsaObj.name + "'.
You can map a given namespace only once per datasource; please remove the schema with the conflicting namespace and try again." printOverHTTP(tmpl) sys.exit() createNewCMF(filename=schema) # get psfObj psfObj = dsaObj.getPSFObj(tmp=False) # load template #tmpl = PageMacro('Content', PageMacro.DELMODE) tmpl.load('Content', os.path.join(templateDir, '_mapping.html')) # # READ CMF OBJ # cmfObj = CMFClass() # path to real CMF XML file cmFile = os.path.join(dsaObj.getDsaDir(), schema) # read preferrably pickled tmp file cmFilePickle = readTmpCmFile(cmfObj, cmFile, psfObj) # # look for new form values # tree = '' newConceptsXPaths = [] # keep the concepts being added already before newConcepts = form.getfirst('old_new_concepts','') + ' ' + form.getfirst('new_concepts','') # rebuild string. remove duplicates and empty whitespace newConcepts = unique(newConcepts.split(' ')) # check if buttons were pressed cmfAction = form.getfirst('cmf_action',None) if cmfAction is not None and cmfAction : # parse form values and process cmf_action newConceptsXPaths = parseForm(form, cmfObj, newConcepts) # process other action if cmfAction == 'insert' or cmfAction == 'save': # write to real CMF try: cmfObj.writeCMF() revert() log.debug( "WRITTEN CMF XML FILE.") except UnicodeDecodeError: log.error( "UnicodeDecodeError while trying to write cmf xml file:") revert() tmpl['show_encoding_error'] = "show_encoding_error();" elif cmfAction == 'del': # write to real CMF cmfObj.writeCMF() revert() log.debug( "WRITTEN CMF XML FILE.") elif cmfAction == 'activate': # write changed CMF and reload it to analyze new rep nodes log.debug( "REANALYZE REP NODES: WRITE and RELOAD CMF XML FILE.") cmfObj.writeCMF() readOriginalCmFile(cmfObj, cmFile=cmFile, psfObj=psfObj) elif cmfAction == 'deactivate': # write changed CMF and reload it to analyze new rep nodes log.debug( "REANALYZE REP NODES: WRITE and RELOAD CMF XML FILE.") cmfObj.writeCMF() readOriginalCmFile(cmfObj, cmFile=cmFile, psfObj=psfObj) elif cmfAction == 'revert': # swap bakfile with real cmf cmfObj.revertCMF(cmFile) log.debug( "REVERT to original xml file.") # read original xml cmfile. readOriginalCmFile(cmfObj, cmFile=cmFile, psfObj=psfObj) log.debug( "altRootTableAliasSPICE = %s" % str(cmfObj.altRootTableAliasSPICE)) # try to remove pickled file if it exists ... newConcepts = [] newConceptsXPaths=[] if os.path.isfile(cmFilePickle): # delete existing tmp pickle os.remove(cmFilePickle) log.debug( "Removed pickled tmp cmFile %s"%cmFilePickle) # remove real pickle cmFileOrigPickle = getPickleFilename(cmFile) if os.path.isfile(cmFileOrigPickle): # delete existing tmp pickle os.remove(cmFileOrigPickle) log.debug("Removed cmFile pickle %s"%cmFileOrigPickle) else: log.debug("There was no cmFile pickle to be removed %s"%cmFileOrigPickle) else: # pickle CMF cmfObj.__pickle__(cmFilePickle) # # DISPLAY: fill template # # general tmpl['dsa'] = dsa tmpl['wrapper_url'] = dsaObj.getBioCASeAccessPoint() tmpl['ServiceTitle'] = 'BioCASe Provider Software %s' % __version__ # CMF read only data tmpl['schema'] = schema tmpl['schema_display'] = os.path.split(cmfObj.absFilename)[1][4:] tmpl['schema_ns'] = cmfObj.getRootNamespace() tmpl['generated_when'] = cmfObj.generated_when tmpl['generated_by'] = cmfObj.generated_by tmpl['modified_when'] = str(cmfObj.modified_when) tmpl['recID'] = '
'.join(cmfObj.recIdents) # select prefix of the schema namespace for ratingsweb tmpl['ratingsweb_schema'] = cmfObj.getRootNamespace() # # simple CMF data # # old new concepts tmpl['old_new_concepts'] = ' '.join(newConcepts) # root table alias default = cmfObj.getRootTableAlias() dropDown = getDropDownOptionHtml([ta.alias for ta in psfObj.getTableAliasList()], default) tmpl['root_tablealias_options'] = dropDown # static table aliases default = cmfObj.getStaticTableAliases() if len(default) == 0: default = '' else: default = default[0] dropDown = getDropDownOptionHtml(['']+[ta.alias for ta in psfObj.getTableAliasList()], default) tmpl['static_tablealias_options'] = dropDown # CREATE CONCEPT TREE show_all_concepts = isTrue(form.getfirst('show_all_concepts',False)) tmpl['tree'] = getDisplayTree(cmfObj, newConceptsXPaths, show_all_concepts ) if show_all_concepts: tmpl['show_all_concepts'] = " checked" # # print HTML ! # printOverHTTP( tmpl )