import fnmatch
import os
import sys
import yaml
import yaml.scanner
import yaml.parser
import yaml.error
import operator
import re
from functools import reduce
from snakemake.logging import logger
from snakemake import get_argument_parser, parse_config, SNAKEFILE_CHOICES
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
[docs]def checkFilename(filename):
"""
:param filename: to check
:return: has appropriate name?
:raises: ValueError if the name is inappropriate
"""
if " " in filename:
raise ValueError("Spaces are not allowed in the filenames. File: {0}",filename)
if "-" in os.path.basename(filename):
raise ValueError("- are not allowed in the filenames. File: {0}", filename)
return True
def findFirstFile(startingPath, pattern="readme", ext=".md", re_flags=0):
pattern = re.compile(pattern, flags=re_flags)
onlyfiles = [f for f in os.listdir(startingPath) if os.path.isfile(os.path.join(startingPath, f))]
for f in onlyfiles:
if pattern.match(f) and f.endswith(ext):
return os.path.join(startingPath, f)
return None
[docs]def findFilesRecursive(startingPath, patterns):
"""
:param startingPath: root path of the search
:param patterns: patterns to search file names for
:return: paths to files matching the patterns
"""
matchedFilepaths = []
for root, dirnames, filenames in os.walk(startingPath):
dirnames[:] = [d for d in dirnames if not d[0] == '_']
dirnames[:] = [d for d in dirnames if not d[0] == '.']
for file in reduce(operator.add, (fnmatch.filter(filenames, p) for p in patterns)):
checkFilename(file)
absFilepath = os.path.join(root, file)
if not absFilepath in matchedFilepaths:
matchedFilepaths.append(absFilepath)
sortedMatchedFilepaths = sorted(matchedFilepaths)
conf = Config()
regex = re.compile(conf.get("fileRegex"))
reFiles = list(filter(regex.search, sortedMatchedFilepaths))
logger.debug("Found files in scope of wBuild: " + str(reFiles) + ".\n")
return reFiles
[docs]def parseWBInfosFromRFiles(script_dir="Scripts", htmlPath="Output/html"):
"""
:param script_dir: Relative path to the Scripts directory
:param htmlPath: Relative path to the html output path
:return: a list of dictionaries with fields:
- file - what is the input R file
- outputFile - there to put the output html file
- param - parsed yaml params
"""
parsedInfos = []
#errorOccured = False
for filename in findFilesRecursive(script_dir, ['*.r', '*.R']):
if not hasYAMLHeader(filename):
# Ignore files without YAML infos
continue
header = parseYAMLHeader(filename)
# run all the synthax checks - will raise an error if it fails
yamlParamsDict = parseYamlParams(header, filename)
if yamlParamsDict == None: #parsing error occured
continue #go on parsing next file
if type(yamlParamsDict) is str: #allow parsing one tag without double points as string; put it in a dict and check later on
yamlParamsDict = {yamlParamsDict: None}
if('wb' in yamlParamsDict):# the header contains wb informations
outFile = htmlPath + "/" + pathsepsToUnderscore(os.path.splitext(filename)[0]) + ".html"
parsedInfos.append({'file': linuxify(filename), 'outputFile': outFile, 'param': yamlParamsDict})
logger.debug("Parsed informations from R files: " + str(parsedInfos))
#if errorOccured:
# raise ValueError("Errors occured in parsing the R files. Please fix them.") TODO really raise a ValueError?
return parsedInfos
[docs]def parseWBInfosFromRFile(filename, htmlPath="Output/html"):
"""
:param filename: Relative path to the Scripts directory
:param htmlPath: Relative path to the html output path
:return: a list of dictionaries with fields:
- filen - what is the input R file
- outputFile - there to put the output html file
- param - parsed yaml params
"""
parsedInfos = []
#errorOccured = False
if not hasYAMLHeader(filename):
# Ignore files without YAML infos
print('Header not valid')
header = parseYAMLHeader(filename)
# run all the synthax checks - will raise an error if it fails
yamlParamsDict = parseYamlParams(header, filename)
if type(yamlParamsDict) is str: #allow parsing one tag without double points as string; put it in a dict and check later on
yamlParamsDict = {yamlParamsDict: None}
if('wb' in yamlParamsDict):# the header contains wb informations
outFile = htmlPath + "/" + pathsepsToUnderscore(os.path.splitext(filename)[0]) + ".html"
parsedInfos.append({'file': linuxify(filename), 'outputFile': outFile, 'param': yamlParamsDict})
logger.debug("Parsed informations from R files: " + str(parsedInfos))
#if errorOccured:
# raise ValueError("Errors occured in parsing the R files. Please fix them.") TODO really raise a ValueError?
return parsedInfos
[docs]def parseMDFiles(script_dir="Scripts", htmlPath="Output/html", readmePath=None):
"""
:param script_dir: Relative path to the Scripts directory
:param htmlPath: Relative path to the html output path
:param readmePath: Relative path to the readme
:return: a list of dictionaries with fields:
- file - what is the input .md file
- outputFile - there to put the output html file
- param - parsed yaml header - always an empty list
"""
logger.debug("Finding .md files:\n")
htmlFiles = []
foundMDFiles = findFilesRecursive(script_dir, ['*.md'])
if readmePath is not None and readmePath not in foundMDFiles:
foundMDFiles.append(readmePath)
for f in foundMDFiles:
outFile = htmlPath + "/" + pathsepsToUnderscore(os.path.splitext(f)[0])+ ".html"
logger.debug("Found " + outFile + ".\n")
htmlFiles.append({'file': linuxify(f), 'outputFile': outFile, 'param': []})
return htmlFiles
def getYamlParam(r, paramName):
if 'wb' in r['param'] and type(r['param']['wb']) is dict and paramName in r['param']['wb']:
foundParam = r['param']['wb'][paramName]
return foundParam
return None
[docs]def parseYamlParams(header, f):
"""
:param header: String form of YAML header
:param f: Filename of a file from where the header was parsed
:return: Parameters dictionary parsed from the header; None if parsing errors occured
"""
try:
param = next(yaml.safe_load_all(header))
except (yaml.scanner.ScannerError, yaml.parser.ParserError, yaml.error.YAMLError, yaml.error.MarkedYAMLError) as e:
if hasattr(e, 'problem_mark'):
if e.context != None:
logger.error('Error while parsing YAML area in the file ' + f + ':\n' + str(e.problem_mark) + '\n ' +
str(e.problem) + ' ' + str(e.context) +
'\nPlease correct the header and retry.')
else:
logger.error('Error while parsing YAML area in the file ' + f + ':\n' + str(e.problem_mark) + '\n ' +
str(e.problem) + '\nPlease correct the header and retry.')
else:
logger.error("YAMLError parsing yaml file.")
return None
except Exception as e:
print(bcolors.FAIL + bcolors.BOLD + 'Could not parse', f,
'. Include valid yaml header. Not showing any further errors. \n',
'Errors {0}'.format(e) + bcolors.ENDC)
return None
logger.debug("Parsed params: " + str(param) + "\n.")
return param
[docs]def pathsepsToUnderscore(systemPath, dotsToUnderscore = False, trimPrefix=True):
"""
Convert all system path separators and dots to underscores. Product is used as a unique ID for rules in scanFiles.py or the output HTML files
:param systemPath: path to convert in
:param dotsToUnderscore: if the dot should be converted as well. Defaults to false
:return: path string with converted separators
"""
if trimPrefix:
conf = Config()
systemPath = removeFilePrefix(systemPath, conf.snakeroot)
if dotsToUnderscore:
return systemPath.replace('.', '_').replace('/', '_').replace('\\', '_')
return systemPath.replace('/', '_').replace('\\', '_')
[docs]def linuxify(winSepStr, doubleBackslash = False):
"""
Convert windows (path) string to the linux format.
:param winSepStr: (path) string with windows-like "\" separators
:param doubleBackslash: if the slashes in the winSepStr are double (happens when you read a macro string raw. Ex.: "C:\\Program Files\\a.txt"
:return: str with substituted "\" -> "/"
"""
if doubleBackslash:
return winSepStr.replace("\\\\", "/")
return winSepStr.replace("\\", "/")
class Config:
sysargs = None
args = None
config = None
config_dict = None
path = "wBuild.yaml"
snakefile = "Snakefile"
snakeroot = ""
instance = None
def __init__(self):
# check if it is already initialized
if self.instance != None:
self.config = self.instance.config
self.conf_dict = self.instance.conf_dict
self.args = self.instance.args
self.path = self.instance.path
self.snakefile = self.instance.snakefile
self.snakeroot = self.instance.snakeroot
return
# we dont need the first argument aka call to snakemake
self.sysargs = sys.argv[1:]
parser = get_argument_parser()
self.args = parser.parse_args(self.sysargs)
self.path = self.args.configfile
self.snakefile = self.args.snakefile
self.config = parse_config(self.args)
if self.path is None:
for p in ["wbuild.yaml", "config.yaml", "wBuild.yaml"]:
if os.path.exists(p):
self.path = p
break
else:
if type(self.path) is list:
if len(self.path) != 1:
raise Exception("Path is a list of more than one element! '" + str(self.path) + "'")
self.path = self.path[0]
self.path=os.path.abspath(self.path)
# this is taken from the snakemake main file
if self.snakefile is None:
for p in SNAKEFILE_CHOICES:
if os.path.exists(p):
self.snakefile = p
break
self.snakeroot = os.path.dirname(self.snakefile)
#load defaults
self.loadDefaultConfiguration()
try:
fh = open(self.path, "r")
except IOError:
raise IOError("Can not read config. Are you sure you have enough "
"rights and config path (wbuild.yaml) is right?")
configDict = next(yaml.safe_load_all(fh))
if configDict == None:
logger.error("Error parsing wbuild.yaml - format is wrong. Working with defaults...")
else:
self.conf_dict = merge_two_dicts(self.conf_dict, configDict)
#fill Singleton
Config.instance = self
# check if readme file exists
readme = self.get("readmePath")
if not readme.endswith(".md"):
raise ValueError("Readme file is '{}' but should end with '.md'".format(readme))
def loadDefaultConfiguration(self):
# Readme
readmePath = "readme.md"
abspathSnakeroot = os.path.abspath(self.snakeroot)
onlyfiles = [f for f in os.listdir(abspathSnakeroot)]
for f in onlyfiles:
f = os.path.join(abspathSnakeroot, f)
if not os.path.isfile(f):
continue
if ("readme" in f) and f.endswith(".md"):
readmePath = f
break
self.conf_dict = {"htmlOutputPath": "Output/html",
"processedDataPath": "Output/ProcessedData",
"scriptsPath": "Scripts",
"projectTitle": "Project",
"readmePath": readmePath,
"htmlIndex": "index.html",
"indexWithFolderName" : False}
def getConfig(self):
return self.conf_dict
def get(self, attrname):
if (attrname in self.conf_dict):
return self.conf_dict[attrname]
else:
raise AttributeError("There is no attribute " + attrname +
" in the configuration file loaded!")
def merge_two_dicts(x, y):
z = x.copy() # start with x's keys and values
z.update(y) # modifies z with y's keys and values & returns None
return z
[docs]def writeWbuildVersion():
"""
Write wBuild version to .wBuild/.version
"""
with open(".wBuild/.version", 'w') as file:
import wbuild
file.write(wbuild.__version__)
file.close()
[docs]def wbuildVersionIsCurrent():
"""
Read wBuild version from .wBuild/.version and compare it to wbuild module version from pckg mngr.
:return: True if wBuild up-to-date, False if not
"""
with open(".wBuild/.version", 'r') as file:
static_v = file.read()
import wbuild
dynamic_v = wbuild.__version__
return dynamic_v in static_v
def removeFilePrefix(f, prefix):
if f.startswith(prefix):
f = f[len(prefix):]
if len(prefix) > 0 and f.startswith("/"):
f = f[1:]
absPrefix = os.path.abspath(prefix)
if f.startswith(absPrefix):
f = f[(len(absPrefix)+1):]
return(f)