Source code for wbuild.scanFiles

import os
import sys
import pathlib
import re
from snakemake.logging import logger
from wbuild.utils import parseWBInfosFromRFiles, parseWBInfosFromRFile, parseMDFiles, getYamlParam, \
    pathsepsToUnderscore, \
    Config, wbuildVersionIsCurrent, bcolors
import tempfile
import wbuild

pathsep = "/"
sys.path.insert(0, os.getcwd() + "/.wBuild")

# wbuildPath = pathlib.Path(wbuild.__file__).parent
# Config().conf_dict.setdefault('wBuildPath',str(wbuildPath))
# print(Config().conf_dict.items())

# SNAKEMAKE  = ["input", "output", "threads"]

# dict containing snakemake supported fields
SNAKEMAKE_FIELDS = ["input",
                    "output",
                    "params",
                    "threads",
                    "resources",
                    "priority",
                    "version",
                    "log",
                    "message",
                    "run",
                    "shell",
                    "script"]


[docs]def writeDependencyFile(): """ Entry point for writing .wBuild.depend. """ # if not wbuildVersionIsCurrent(): # print(bcolors.WARNING + "Version of the project's static .wBuild lib is not the same as the dynamically loaded " # "wBuild" # "version. It is strongly recommended to update .wBuild lib using \'wbuild update\'; " # "otherwise, the consistency of the build can not be guaranteed." + bcolors.ENDC) logger.info("Structuring dependencies...") conf = Config() htmlOutputPath = conf.get("htmlOutputPath") logger.debug("Loaded config.\n html output path (key htmlOutputPath): " + htmlOutputPath + "\n") scriptsPath = conf.get("scriptsPath") readmePath = conf.get("readmePath") wbData = parseWBInfosFromRFiles(script_dir=scriptsPath, htmlPath=htmlOutputPath) mdData = parseMDFiles(script_dir=scriptsPath, htmlPath=htmlOutputPath, readmePath=readmePath) dependFile = tempfile.NamedTemporaryFile('w',delete=False) with dependFile as f: #start off with the header f.write('######\n') f.write('#This is a autogenerated snakemake file by wBuild\n') f.write('#wBuild by Leonhard Wachutka\n') f.write('######\n') # write build index rule writeIndexRule(wbData, mdData, f) logger.info("Dependencies file generated at: {}\n".format(dependFile.name)) return dependFile.name
[docs]def writeWBParseDependencyFile(filename): """ Entry point for writing .wBuild.depend. for the wbParseFunction in R """ conf = Config() htmlOutputPath = conf.get("htmlOutputPath") wbData = parseWBInfosFromRFile(filename=filename, htmlPath=htmlOutputPath) with open('.wBuild.depend', 'w') as f: #start off with the header f.write('######\n') f.write('#This is a autogenerated snakemake file by wBuild\n') f.write('#wBuild by Leonhard Wachutka\n') f.write('######\n') # write rules for r in wbData: writeRule(r, f, True) writeIndexRule(wbData, list(), file=f, ignoreMD=True, dump=True) logger.info("Dependencies file generated.\n")
[docs]def joinEmpty(string_list): """ :param string_list: :return: sting representation of a list without the blank elements. """ return ", ".join([x for x in string_list if x.strip() != ''])
[docs]def escapeSMString(item): """ Convert item to the appropriate string representation. :param item: string or dict :return: "key = 'value'" (dict), "'value'" (string) or '' (other type) """ if type(item) is dict: # return "key = value" return str(list(item.keys())[0]) + ' = ' + escapeSMString(str(list(item.values())[0])) elif type(item) is str: if item.startswith("`sm ") and item.endswith("`"): # strip `sm ... ` return item[4:-1] return "'" + item + "'" # return item as quoted string return ''
def ensureString(elem): if elem is None: return '' elif type(elem) is list: if len(elem) == 0: return '' else: # make sure each element is a character elem = [escapeSMString(item) for item in elem] elem = [x for x in elem if str(x).strip() != ''] return ", ".join(elem) elif type(elem) is str: if "," not in elem: return "'" + elem + "'" else: return elem elif type(elem) is int: return str(elem) elif type(elem) is dict: elemArr = [k + " = " + escapeSMString(elem[k]) for k in elem] return ", ".join(elemArr) # raise TypeError("A wBuild tag is a dict, whereas it can be list or string only. Please check if you have indented" # " all the YAML \'output\' tags in your scripts properly. \nwBuild output should be listed one level deeper than external (knitr) output." # "(The latter one should be on the same level with \'wb\' tag)") else: raise TypeError("Can't parse type " + str( type(elem)) + "as a valid workflow information under a wBuild YAML tag (input or output).")
[docs]def dumpSMRule(ruleInfos, outputFile, inputFile): """ Write the rule to the file. :param ruleInfos: dictionary containing all the rule's data :param outputFile: file to print the rule to :param inputFile: the object of the rule """ if 'py' in ruleInfos: code = ruleInfos['py'] if type(code) is str: outputFile.write(insertPlaceholders(code, inputFile)) elif type(code) is list: [outputFile.write(insertPlaceholders(line, inputFile) + '\n') for line in code] outputFile.write('rule ' + ruleInfos['rule'] + ':\n') for field in SNAKEMAKE_FIELDS: if field in ruleInfos: outputFile.write(' ' + field + ': ' + str(ruleInfos[field]) + '\n')
[docs]def insertPlaceholders(dest, source): """ Infer placeholders' substitutions. :param dest: string to replace placeholders in :param source: file; from its path we infer the placeholders values :return: dest with replaced placeholders """ path = pathlib.Path(source) # get the path to the file conf = Config() processedDataPath = conf.get("processedDataPath") PD = pathlib.Path(processedDataPath) PP = path.parts[-2] dest = dest.replace("{wbPD}", str(PD)) dest = dest.replace("{wbPP}", str(PP)) if len(path.parts) <= 2 and bool(re.search('{wbP(D_P*)?}', source)): print("If using placeholders please make sure you have the right", " directory structure.") if len(path.parts) > 2: P = path.parts[-3] dest = dest.replace("{wbPD_P}", str(PD / P)) dest = dest.replace("{wbPD_PP}", str(PD / P / PP)) dest = dest.replace("{wbP}", str(P)) return dest
[docs]def writeRule(r, file, dump=False): """ Write Snakemake rule from the parsed WB header informations. :param r: parsed WB data dictionary entry :param file: to write the rule to """ # TODO cleanup boilerplate commented code here wbInfos = r["param"]["wb"] inputFile = r['file'] # input R script for Snakemake wbuildPath = pathlib.Path(wbuild.__file__).parent # extract rule # rule = r['file'].replace('.','_').replace('/','_') if wbInfos == None: return # determine input, output and script wbInfos["input"] = insertPlaceholders( joinEmpty([ensureString(wbInfos.get("input")), "RScript = '" + inputFile + "'"]), inputFile) if wbInfos.get("type") == 'script': wbInfos["output"] = insertPlaceholders(ensureString(wbInfos.get("output")), inputFile) wbInfos["script"] = "'" + os.getcwd() + '/' + inputFile + "'" elif wbInfos.get("type") == 'noindex': wbInfos["output"] = insertPlaceholders(ensureString(wbInfos.get("output")), inputFile) wbInfos["script"] = "'" + str(wbuildPath / 'R' / 'wBRender.R') + "'" else: wbInfos["output"] = insertPlaceholders( joinEmpty([ensureString(wbInfos.get("output")), "wBhtml = '" + r['outputFile'] + "'"]), inputFile) wbInfos["script"] = "'" + str(wbuildPath / 'R' / 'wBRender.R') + "'" if dump == True: wbInfos["script"] = "'" + str(wbuildPath / 'R' / 'wBSMDump.R') + "'" for i in SNAKEMAKE_FIELDS: if i not in ['output', 'script', 'input', 'run', 'shell'] and i in wbInfos.keys(): wbInfos[i] = ensureString(wbInfos.get(i)) # remove wb related elements # wbInfos = {key: wbInfos[key] for key in wbInfos if key not in WB_FIELDS} # if not set(wbInfos.keys()).issubset(SNAKEMAKE_FIELDS): # Warning("File: {0}. The following fields don't correspond to any snakemake or wBuild tag: {1}" # .format(r['file'], ",".join(set(wbInfos.keys()).difference(SNAKEMAKE_FIELDS)))) # remove fields not in SNAKEMAKE_FIELDS # wbInfos = {key: wbInfos[key] for key in wbInfos if key in SNAKEMAKE_FIELDS} # remove "Scripts" prefix for prettier rule names filename = r['file'] filename = filename.replace("Scripts/", "") wbInfos['rule'] = pathsepsToUnderscore(filename, True) # convert filepath to the unique id of the rule # write to file file.write('\n') # dumpDict = {'rule ' + rule: wbInfos} # file.write(yaml.dump(dumpDict, default_flow_style = False, indent=4).replace("\'\'\'", "'").replace("\'\'", "'")) dumpSMRule(wbInfos, file, inputFile) file.write('\n')
[docs]def writeMdRule(ruleInfos, file): """ :param ruleInfos: :param file: file to write the rule to """ # remove "Scripts" prefix for prettier rule names filename = ruleInfos['file'] filename = filename.replace("Scripts/", "") file.write('\n') file.write('rule ' + pathsepsToUnderscore(filename, True) + ':\n') file.write(' input: "' + ruleInfos['file'] + '"\n') file.write(' output: "' + ruleInfos['outputFile'] + '"\n') file.write( ' shell: "pandoc --from markdown --to html --css {config[wBuildPath]}/html/lib/github.css --toc --self-contained -s -o {output} {input}"\n') file.write('\n')
[docs]def writeIndexRule(wbData, mdData, file, ignoreMD=False, dump=False): """ Write the rule of mapping the R and md wbData to the index.html. :param wbRRows: info dict parsed from R wB files :param wbMDrows: info dict parsed from MD wB files :param file: file to print the index rule to """ conf = Config() htmlOutputPath = conf.get("htmlOutputPath") for r in wbData: writeRule(r, file, dump) if not ignoreMD: for r in mdData: writeMdRule(r, file) input, output, graphPath, _ = wbuild.createIndex.createIndexRule(wbData=wbData, mdData=mdData) # write rule file.write('\n') file.write('rule Index:\n') file.write(' input: \n "' + '",\n "'.join(input) + '"\n') file.write(' output: \n') file.write(' index = "' + output + '", \n') file.write(' graph = "' + graphPath + '" \n') # file.write(' script: ".wBuild/createIndex.py"\n') file.write(' run:\n') file.write(' import wbuild.createIndex\n') file.write(' wbuild.createIndex.ci()\n') file.write(' shell("snakemake --rulegraph | dot -Tsvg -Grankdir=RL > {output.graph}")\n') file.write('\n')
if __name__ == "__main__": writeDependencyFile()