#!c:\Program Files\Python39\python.exe # -*- coding: UTF-8 -*- ''' $RCSfile: dbstructure.py,v $ $Revision$ $Author: markus $ $Date$ The BioCASE querytool ''' import os # ***** include the biocase.lib directory in the python sys path for importing ***** exec(open(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir, os.path.pardir, 'lib', 'biocase', 'adjustpath.py'))).read()) exec(open(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir, os.path.pardir, 'lib', 'biocase', 'appinit.py'))).read()) from biocase.configtool.general import * #from biocase.cmfinfo import getCMFTemplateList from biocase.wrapper.graph.tablegraph import tableAliasClass from biocase.wrapper.typeconverter import DBAttributeClass from biocase.tools.various_functions import flatten from biocase import __version__ import graphviz DB_ATTR_DATA_TYPES_HASH = {'text':'text','integer':'int','float':'float','date':'date'} # ------------------------------------------------------------------------------------ def displayMetaForm(tmpl, psfObj): pass # ------------------------------------------------------------------------------------ def displayNoMetaForm(tmpl, psfObj): # enter tablealias form-data into template newTypesList = genOptionList(DB_ATTR_DATA_TYPES_HASH, 'newtype_opt', 'newtype_val', 'int') newAliasList = genOptionList([str(ta.alias) for ta in psfObj.getTableAliasList()], 'newalias_opt', 'newalias_val', '--none--') i = list(range(2000)) i.reverse() ti = list(range(500)) ti.reverse() expandList = [ { 'tablelist.tables': getTableListOps(str(ta.table)), 't_num':str(ti.pop()), 't_alias':str(ta.alias), 't_name':str(ta.table), 'tablelist.newtypelist':newTypesList, 'tablelist.pklist':[ {'t_pk_num':str(i.pop()), 't_pk_attr':str(pk.attr), 'tablelist.pklist.aliastypelist':genOptionList(DB_ATTR_DATA_TYPES_HASH, 't_pk_type_opt', 't_pk_type_val', pk.type), 'tablelist.pklist.columns':getColumnListOps(str(ta.table), str(pk.attr), [_pk.attr for _pk in ta.pk], False) #, 'foo': pk.attr } for pk in ta.pk ], 'tablelist.newPrimaryKey':getNewPrimaryKeyOps(str(ta.table), [_pk.attr for _pk in ta.pk]), 'tablelist.newfklist':[o for o in newAliasList if str(o.get('newalias_opt')) != str(ta.alias) and None == find(lambda a: str(a.target) == str(o.get('newalias_opt')), ta.fks)], 'tablelist.fklist':[ { #'tablelist.fklist.newFkColumn':getPrimaryKeysFromAliasOps(find(lambda tableAlias: tableAlias.alias == fkObj.target, psfObj.getTableAliasList())), 'tablelist.fklist.newFkColumn':getColumnListOps(str(ta.table), "", [fk.attr for fk in fkObj.attrList], True), 't_fks_num':str(i.pop()), 't_fk_target':str(fkObj.target), 'tablelist.fklist.newtypelist':newTypesList, 'tablelist.fklist.fk2list':[ {'t_fk_num':str(i.pop()), 't_fk_attr':str(attrObj.attr), 'tablelist.fklist.fk2list.fktypelist': genOptionList(DB_ATTR_DATA_TYPES_HASH, 't_fk_type_opt', 't_fk_type_val', attrObj.type) } for attrObj in fkObj.attrList ] } for fkObj in ta.fks #if len(fkObj.attrList) > 0 and fkObj.target is not None ] } for ta in psfObj.getTableAliasList() #if len(ta.pk) > 0 ] #log.error(str(expandList)) #log.error(str(type(expandList))) tmpl.expand('Content', 'tablelist', expandList) #tmpl.expand('Content', 'tablelist', foo) # create graph aliases = psfObj.getTableAliasList() #log.error("aliases: " + str(aliases)) try: psfObj.buildGraph(aliases) except: #pass #psfStatus = "Inconsistent." log.error(str(sys.exc_info()[0])) log.error(str(sys.exc_info()[1])) log.error(str(sys.exc_info()[2])) return psfObj # ------------------------------------------------------------------------------------ def parseMetaForm(form, psfObj): pass # ------------------------------------------------------------------------------------ def parseNoMetaForm(form, psfObj): global message, psfStatus #log.error("hepp") f_keys = list(form.keys()) f_keys.sort() aliases = {} entryToDelete = -1 # DBAttributeClass for f in f_keys: val = form[f].value #log.error("f= " + str(f)) #log.error("val= " + str(val)) if f == '__deleteEntry': #log.error("Hepp, deleteEntry") entryToDelete = val if f[:3] == '__t' and (val != '--new--' and val != '--none--' and val != '--select--'): # table form part if f == '__taliasnew': # new alias tao = tableAliasClass() tao.alias = form[f].value aliases['new'] = tao elif f == '__ttablenew': # check if necessary values were supplied, eg __taliasnew is existing: if 'new' in aliases: aliases['new'].table = form[f].value elif f.find('_', 3) > 0: # existing table tblID = f[3:f.find('_', 3)] tblData = f[f.find('_', 3)+1:] if tblData == 'alias': tao = tableAliasClass() tao.alias = val tao._tmpFks = {} aliases[tblID] = tao elif tblData[:3] == 'fks': # FKs if tblData.find('_', 3) > 0: # existing fks fksID = tblData[3:tblData.find('_', 3)] fksData = tblData[tblData.find('_', 3)+1:] if fksData[:2] == 'fk': # existing foreign key if fksData.find('_', 2) > 0: # existing fk attribute fkID = fksData[2:fksData.find('_', 2)] fkData = fksData[fksData.find('_', 2)+1:] if fkData == 'attr': dbo = DBAttributeClass() dbo.attr = val if fksID not in aliases[tblID]._tmpFks: aliases[tblID]._tmpFks[fksID] = [] aliases[tblID]._tmpFks[fksID].append(dbo) elif fkData == 'type': aliases[tblID]._tmpFks[fksID][-1].type = val elif fkData == 'zdel': del aliases[tblID]._tmpFks[fksID][-1] elif fksData == 'fknew': # new fk dbo = DBAttributeClass() dbo.attr = val if fksID not in aliases[tblID]._tmpFks: aliases[tblID]._tmpFks[fksID] = [] aliases[tblID]._tmpFks[fksID].append(dbo) elif fksData == 'fktypenew' and form['__t'+tblID+'_fks'+fksID+'_fknew'].value != '--new--': aliases[tblID]._tmpFks[fksID][-1].type = val elif fksData == 'target': if fksID in aliases[tblID]._tmpFks: _fks = aliases[tblID]._tmpFks[fksID] del aliases[tblID]._tmpFks[fksID] else: _fks = [] aliases[tblID].addFK(val, _fks) elif tblData == 'fksnew': # new fks aliases[tblID].addFK(val,[]) elif tblData[:2] == 'pk': # PK if tblData.find('_', 2) > 0: # existing pk pkID = tblData[2:tblData.find('_', 2)] pkData = tblData[tblData.find('_', 2)+1:] #log.error("pkData = " + str(pkData)); if pkData == 'attr': dbo = DBAttributeClass() dbo.attr = val aliases[tblID].pk.append(dbo) elif pkData == 'type': aliases[tblID].pk[-1].type = val elif pkData == 'zdel': del aliases[tblID].pk[-1] elif tblData == 'pknew': # new pk dbo = DBAttributeClass() dbo.attr = val aliases[tblID].pk.append(dbo) elif tblData == 'pktypenew' and form['__t'+tblID+'_pknew'].value != '--new--': aliases[tblID].pk[-1].type = val elif tblData == 'table': aliases[tblID].table = val # create graph aliases = list(aliases.values()) #log.error("aliases: " + str(aliases)) try: psfObj.buildGraph(aliases) except: #pass #psfStatus = "Inconsistent." log.error(str(sys.exc_info()[0])) log.error(str(sys.exc_info()[1])) log.error(str(sys.exc_info()[2])) return psfObj # ------------------------------------------------------------------------------------ def getGraphPic(psfObj): '''Create a new jpg image for the db structure graph in the PSF and return the filename.''' try: # If there is a path given for dot, add it to the path env variable if cfg.server.graphviz_dot: log.debug('Adding %s to system''s path environment variable' % cfg.server.graphviz_dot) os.environ["PATH"] += os.pathsep + cfg.server.graphviz_dot # Picname based on timestamp t = time.time() picdir = os.path.join( os.path.dirname( __file__ ), 'graphpic' ) picname = 'dbstructure.' + str(t) log.debug("GraphViz version: %s" % '.'.join([str(i) for i in graphviz.version()])) # create dot graph and add nodes and edges g = graphviz.Digraph(name=picname, directory=picdir, engine='dot', format='jpg') for node in psfObj.tablegraph.getGraphNodesList(): aliasObj = psfObj.getTableAliasObj(node) if aliasObj is None: label = node else: FKeys = flatten([[key.toSimpleString() for key in fk.attrList] for fk in aliasObj.fks]) if len(FKeys) > 0: FKeysString = " | {Fk | {%s} }" % ' | '.join(FKeys) else: FKeysString = "" label = "{%s | {Pk | {%s} }%s }" %(node.upper(), " | ".join([pk.toSimpleString() for pk in aliasObj.pk]), FKeysString) g.node(node, label, shape = 'Mrecord', fontsize = '9') g.edges(psfObj.tablegraph.getGraphEdgesList()) # Remove files older than one hour for file in os.listdir(picdir): try: fa = file.split('.') if t - float(fa[1]) > 3600: log.debug("Removing old graph %s" % os.path.join(picdir, file)) os.remove(os.path.join(picdir, file)) except: pass # And render new graph g.render(cleanup = True) return 'graphpic/' + picname + '.gv.jpg' except Exception as e: log.error("Creating the graph with GraphViz failed (%s)." % e) return None # ------------------------------------------------------------------------------------ def getPSFTemplateList(): psflist=[] tmpldir = cfg.dsaTemplateLocator if os.path.isdir(tmpldir): for dir in os.listdir(tmpldir): if os.path.isdir( os.path.join(tmpldir,dir) ) and dir != 'CVS': psflist.append(dir) return psflist def getTables(): if dbmodObj == None: tables = None else: tables = dbmodObj.getTables() if tables == None: # todo: replace dropdown with text field return [] return tables def getTableListOps(selectedTable): list = [] try: tables = dsaObj.getDbmodObj().getTables() except: tables = None if tables is not None: for t in tables: if selectedTable == t: list.append({'table_opt': t, 'table_val': 'value="' + t + '" selected'}); else: list.append({'table_opt': t, 'table_val': 'value="' + t + '"'}); return list def getColumns(table): if dbmodObj == None: columns = None else: columns = dbmodObj.getColumns(table) if columns == None: # todo: replace dropdown with text field return [] return columns def getColumnListOps(table, selectedColumn, excludedColumns, addNewOpt): list = [] cols = getColumns(table) cols_lower_stripped = [c.lower().rstrip().lstrip() for c in cols] for c in cols: if selectedColumn == c: list.append({'column_opt': c, 'column_val': 'value="' + c + '" selected'}) else: if excludedColumns.count(c) == 0: list.append({'column_opt': c, 'column_val': 'value="' + c + '"'}) if addNewOpt: list.insert(0, {'column_opt': "--new--", 'column_val': 'value="--new--"'}) # Last: check if all used PKs/FKs (found in excludedColumns) do still exist. But only if columns could be loaded (i.e., DB connection works) if cols: for c in cols: if c in excludedColumns: excludedColumns.remove(c) for ec in excludedColumns: s = "Warning: Column %s.%s does not exist, please choose another PK/FK!" % (table, ec) # Skip warning if it is only a capitalisation issue or a missing leading/trailing space # Do this to prevent upgraders from warnings that do not cause errors if s not in message and ec.lower() not in cols_lower_stripped: message.append(s) return list def getNewPrimaryKeyOps(table, excludedColumns): r = getColumnListOps(table, "", excludedColumns, False) r.insert(0, {'column_opt': "--new--", 'column_val': 'value="--new--"'}) return r def getPrimaryKeysFromAliasOps(pks): list = [] for c in pks.pk: list.append({'column_opt': c.attr, 'column_val': 'value="' + c.attr + '"'}) list.insert(0, {'column_opt': "--new--", 'column_val': 'value="--new--"'}) return list def find(f, seq): for item in seq: if f(item): return item ############################################################################################################ # # MAIN # #=========================================================================================================== # check authentication! if dsa is None: exec(compile(open( os.path.abspath( os.path.join( os.path.dirname( __file__ ), 'main.cgi' ) ), "rb").read(), os.path.abspath( os.path.join( os.path.dirname( __file__ ), 'main.cgi' ) ), 'exec')) sys.exit() # check authentication! authorize(form, dsa=dsa) #print ('Content-Type: text\n\n') #if "HTTP_COOKIE" in os.environ: # print os.environ["HTTP_COOKIE"] #else: # print "HTTP_COOKIE not set!" # # get psfObj # psfObj = dsaObj.getPSFObj(tmp=False) psfStatus = 'Not implemented yet.' # # CAN WE USE THE DB METADATA RETRIEVAL? defines the template to be used # dbmodObj = dsaObj.getDbmodObj() # load template tmpl = PageMacro('Content', PageMacro.DELMODE) tmpl.load('Content', os.path.join(templateDir, '_dbstructure.html')) # general user message message = [''] # page setup according to if database metadata is available if dbmodObj == None or not dbmodObj.hasMetaInfo(): tmpl["dbMeta"] = "false" else: tmpl["dbMeta"] = "true" for et in dbmodObj.verifyTables([str(ta.table) for ta in psfObj.getTableAliasList()]): message.append("Warning: Table/view " + str(et) + " does not exist in database.") # # look for new form values # if action in ('refresh','save', 'add'): # handle unsaved values if action == 'add': #we have unsaved values tmpl["dirty"] = "true" if action == 'refresh': tmpl["dirty"] = form.getvalue("dirty") if action == 'save': tmpl["dirty"] = "false" # parse form values parseNoMetaForm(form, psfObj) # process other action if action == 'save': # write to real PSF msg = psfObj.cleanup() if msg: message.append(msg) psfObj.writePSFile() else: # write to tmp PSF psfObj.writePSFile( filename=dsaObj.getAbsTempPSFilePath() ) elif(action=='load'): # load a psf template message.append('The loading of PSFiles has not yet been implemented. Sorry.') else: # revert to file or first time in this page. read from original PSF # this is the default anyway (to check connection). so dont do anything... pass # # DISPLAY: fill template # # general tmpl['dsa'] = dsa tmpl['wrapper_url'] = dsaObj.getBioCASeAccessPoint() tmpl['ServiceTitle'] = 'BioCASe Provider Software %s' % __version__ # dbconnection status tmpl['connection'] = dsaObj.getDBConnectionStatus() # PSF status tmpl['status'] = psfStatus # TABLE ALIASES # psf structure data displayNoMetaForm(tmpl, psfObj) # list of available PSF templates tmplList = getPSFTemplateList() dropDown = getDropDownOptionHtml(['--select--']+tmplList, '--select--') tmpl['template_optionlist'] = dropDown _tables = getTables() if _tables == None: _tables = [] tmpl['tables_opt'] = getDropDownOptionHtml(['--select--']+_tables, '--select--') dbGraphPic = getGraphPic(psfObj) if dbGraphPic: tmpl.expand('Content', 'dbGraphpicBlock', [{'dbgraphpic_fn':dbGraphPic}]) # User message tmpl['message'] = '
'.join(message) # # print HTML ! # printOverHTTP( tmpl )