#!/grid/gillis/home/nfox/software/miniconda3/bin/python """Parent program submitting Hi-C samples to the cluster. The Hi-C database is built by selecting an SRA Run ID and pushing it through a pipeline that takes it from an ID to two final files: A pairs.gz file that includes all contacts in a reasonably compressed format. And an HDF5 file containing all contact networks for that individual sample, e.g. 1bp_raw, 10kbp_raw, 10kbp_normalized. Each sample is run on an UGE compute cluster as a separate job. This program parses metadata files, identifies suitable Runs, constructs the folders and input data, and submits the pipeline job for that sample to the cluster. """ import argparse import logging as log import os import pandas as pd import sys import pickle import shutil def setup_logger(cfg, clean=False): """Initialize and configure a logger.""" if clean: logger = log.getLogger('clean') else: logger = log.getLogger('log') logger.setLevel(log.INFO) handler_file = log.FileHandler(filename=f'{cfg["LOGFILE"]}', mode='a') handler_stdout = log.StreamHandler(stream=sys.stdout) if clean: formatter = log.Formatter(fmt='') else: formatter = log.Formatter(fmt=('%(asctime)s [%(levelname)s] ' '%(message)s'), datefmt='%Y-%m-%d %H:%M:%S') handler_file.setFormatter(formatter) handler_stdout.setFormatter(formatter) logger.addHandler(handler_file) # logger.addHandler(handler_stdout) return(logger) def get_yes_or_no(input_string): """Get a yes or no response from the user. Get a yes or no response from the user in a way that handles invalid responses. If this code doesn't produce a 'y' or a 'n', then the prompt prints an error and loops. Code: INPUT_FROM_USER[0].lower() Args: input_string: String. Prompt fed to input() Returns: Boolean. True if answer was yes. False if no. """ yn_loop = True while yn_loop: user_input = input(input_string) if user_input is None: print('ERROR: Please enter yes or no!') continue if len(user_input) == 0: print('ERROR: Please enter yes or no!') continue user_input = user_input[0].lower() if user_input == 'y': yn_loop = False return(True) elif user_input == 'n': yn_loop = False return(False) else: print('ERROR: Please enter yes or no!') continue def print_timedelta(timedelta): """Convert a datetime.timedelta object to a pretty string. Convert a datetime.timedelta or an integer number of seconds to a human readable string in the format: 0 days, 0 hours, 0 minutes, 0 seconds Leading zero units are stripped. For example: 2 hours, 0 minutes, 10 seconds Stolen from https://gist.github.com/thatalextaylor/7408395 on 2020-09-09. Args: timedelta: Either a datetime.timedelta object or a number of seconds that can be converted to an integer. The time interval to be converted to a string. Returns: String. The human readable version of timedelta in the format listed above. """ import datetime if type(timedelta) is datetime.timedelta: seconds = timedelta.total_seconds() else: seconds = timedelta if seconds < 0: sign_string = '-' else: sign_string = '' seconds = abs(int(seconds)) days, seconds = divmod(seconds, 86400) hours, seconds = divmod(seconds, 3600) minutes, seconds = divmod(seconds, 60) unit = {} unit['d'] = 'day' if days == 1 else 'days' unit['h'] = 'hour' if hours == 1 else 'hours' unit['m'] = 'minute' if minutes == 1 else 'minutes' unit['s'] = 'second' if seconds == 1 else 'seconds' if days > 0: return(f'{sign_string}{days} {unit["d"]}, {hours} {unit["h"]}, ' f'{minutes} {unit["m"]}, {seconds} {unit["s"]}') elif hours > 0: return(f'{sign_string}{hours} {unit["h"]}, {minutes} {unit["m"]}, ' f'{seconds} {unit["s"]}') elif minutes > 0: return(f'{minutes} {unit["m"]}, {seconds} {unit["s"]}') else: return(f'{seconds} {unit["s"]}') def generate_config(): """Generate a blank config file for this program. This program takes a config file. This function writes a blank config file to be filled in. """ script_name = os.path.basename(sys.argv[0]) short_script_name = '.'.join(script_name.split('.')[:-1]) default_config = ( f'# Config for {script_name}', '', '# Aligner options', 'STARPATH=', 'STARINDICES=', 'DIGESTSFILE=', '', '# Cooler generation options (deprecated)', '# CHROMSIZES=', '# BINSIZE=', '', '# Pairs File generation options', 'CHROMSIZES=', '', '# Networks File generation options' 'RESOLUTIONS=', 'HUMANRESOLUTIONS=', '', '# General options', 'SCRIPTNAME=', 'ACCESSIONSFILE=', 'EXCLUDE=', 'EXCLUDELOG=', 'OUTDIR=', 'TMPDIR=', 'THREADS=', 'MEMORY=', 'SPECIES=', 'PROCESS_SRR=', 'OVERWRITE=' ) if os.path.exists(f'{short_script_name}.config'): if not get_yes_or_no(f'WARNING! {short_script_name}.config exists! ' 'Overwrite? (y/n): '): sys.exit(0) with open(f'{short_script_name}.config', 'w') as f: for line in default_config: f.write(f'{line}\n') print('Blank config file generated in current ' f'directory as {short_script_name}.config') def get_config(config): """Read and process a config file. Args: config: str. Path to the config file. Returns: A dict of config values, keyed by variable name. A few extra values are generated that are not in the actual config file. Raises: FileNotFoundError: The given config file does not exist. AssertionError: If the config file is incorrectly formatted. e.g. a config variable that is not recognized, a required variable that is missing, a required variable that is blank. """ if not os.path.isfile(config): raise FileNotFoundError('Config file not found!') cfg = {} with open(config, 'r') as f: for line in f: if line[0] == '#': pass elif '=' in line: var, val = (s.strip() for s in line.split('=')) cfg[var] = val req_cfg = ( 'STARPATH', 'STARINDICES', 'DIGESTSFILE', 'CHROMSIZES', 'RESOLUTIONS', 'HUMANRESOLUTIONS', 'SCRIPTNAME', 'ACCESSIONSFILE', 'EXCLUDE', 'EXCLUDELOG', 'OUTDIR', 'TMPDIR', 'THREADS', 'MEMORY', 'SPECIES', 'PROCESS_SRR', 'OVERWRITE' ) for k, v in cfg.items(): if k not in req_cfg: raise AssertionError(f'{k} is not a valid config variable!') if v == '': raise AssertionError(f'{k} cannot be blank in config file!') passed_keys = set(cfg.keys()) for c in req_cfg: if c not in passed_keys: raise AssertionError(f'{c} is missing from config file!') logfile = os.path.join(cfg['OUTDIR'], f'{cfg["SCRIPTNAME"]}.log') cfg['LOGFILE'] = logfile cfg['EXCLUDEFILE'] = cfg['EXCLUDE'] if os.path.exists(cfg['EXCLUDE']): cfg['EXCLUDE'] = set(pd.read_csv(cfg['EXCLUDE'], header=0)['run']) else: with open(cfg['EXCLUDEFILE'], 'w') as f: f.write('study,experiment,run,notes\n') cfg['EXCLUDE'] = set() if cfg['EXCLUDELOG'][0].lower() in ['t', 'y']: cfg['EXCLUDELOG'] = True else: cfg['EXCLUDELOG'] = False if cfg['TMPDIR'][0].lower() in ['t', 'y']: cfg['TMPDIR'] = os.environ['TMPDIR'] else: cfg['TMPDIR'] = None if cfg['OVERWRITE'][0].lower() in ['t', 'y']: cfg['OVERWRITE'] = True else: cfg['OVERWRITE'] = False resolutions = cfg['RESOLUTIONS'].split(',') human_resolutions = cfg['HUMANRESOLUTIONS'].split(',') if len(resolutions) != len(human_resolutions): raise AssertionError('RESOLUTIONS and HUMANRESOLUTIONS must be ' 'comma separated lists of the same length.') if resolutions == ['']: resolutions = [] human_resolutions = [] for c in ''.join(resolutions): if c not in map(str, range(10)): raise AssertionError('All values in RESOLUTIONS must ' 'be positive integers.') try: new_resolutions = list(map(int, resolutions)) except ValueError: raise AssertionError('All values in RESOLUTIONS ' 'must be positive integers.') cfg['RESOLUTIONS'] = new_resolutions cfg['HUMANRESOLUTIONS'] = human_resolutions return(cfg) def get_accessions(accessions_file, digests_file): """Get the metadata for the SRA Runs. Builds a metadata DataFrame from two files. The first is an accessions metadata file exported from search results in SRA. Typically, all Runs that are labeled Hi-C and the desired species. The second is a custom file indicating a path to the relevant HiCUP digests file for that Run. Args: accessions_file: str. Path to the SRA metadata file. digests_file: str. Path to the digests file. Must be a csv with no header and 2 columns: Run ID and path to HiCUP digest file. Lines commented with "#" are acceptable. Missing digests annotation can be indicated with "NA". Unannotated Runs can have the empty string for the digest column. Returns: pandas DataFrame. Contains the accessions_file with the digest filepaths appended as the last column. """ run_info = pd.read_csv(accessions_file, sep=',', quotechar='"', comment='#') run_info.index = run_info['Run'] digests = pd.read_csv(digests_file, sep=',', header=None, index_col=False, comment='#') digests.columns = ['Run', 'digest_file'] digests.index = digests['Run'] digests = digests.loc[:, ['digest_file']] run_info = pd.concat([run_info, digests.reindex(run_info.index)], axis=1) run_info.loc[run_info['digest_file'].isnull(), 'digest_file'] = '' return(run_info) def process_srp(study, cfg, outdir, run_info, n_jobs, job_n): """Iterate over an SRA Project and process eligible Experiments. SRA Runs are organized into Experiments which are in turn organized into Projects. This function iterates over the runs in a Project, identifies experiments that contain eligible Runs, and submits each Experiment to be similarly processed in turn. Args: study: str. The SRA Project ID. cfg: dict. The config dict returned by get_config() outdir: The directory under which the Project directory exists or should be created. Typically the root directory of the database. run_info: pandas DataFrame. Contains the metadata for all Runs in this Project. Should be a subset of the DataFrame returned from get_accessions(). n_jobs: int. The maximum number of jobs to submit during this program call. job_n: int. The current number of jobs submitted. Returns: The number of jobs submitted at the time of function return. """ logger = log.getLogger('log') if run_info['Run'].isin(cfg['EXCLUDE']).all(): if cfg['EXCLUDELOG']: logger.info(f'<{study}> All Runs found in exclude list. ' 'Skipping Study.') return(job_n) logger.info(f'Processing {study}') if 'hi-c' not in map(lambda x: x.lower(), run_info['LibraryStrategy']): logger.error(f'<{study}> No Hi-C Runs in this Study! Skipping Study.') return(job_n) if cfg['SPECIES'].lower() not in map(lambda x: x.lower(), run_info['ScientificName']): logger.error(f'<{study}> No {cfg["SPECIES"]} data in this Study! ' 'Skipping Study.') return(job_n) if (run_info['digest_file'] == '').all(): logger.error(f'<{study}> No Runs have assigned digest files! ' 'Skipping Study.') return(job_n) srp_dir = os.path.join(outdir, study) if not os.path.isdir(srp_dir): logger.info(f'Generating {srp_dir}/') os.mkdir(srp_dir) logger.info(f'<{study}> Writing SRP metadata file') run_info.to_csv(f'{srp_dir}/{study}_metadata.csv', sep=',', header=True, index=False) sra_exps = run_info['Experiment'].unique() for exp in sra_exps: job_n = process_srx( exp, study, cfg, srp_dir, run_info.loc[run_info['Experiment'].isin([exp]), :], n_jobs, job_n ) if job_n >= n_jobs: return(job_n) return(job_n) def process_srx(exp, study, cfg, srp_dir, run_info, n_jobs, job_n): """Iterate over an SRA Experiment and process eligible Experiments. SRA Runs are organized into Experiments which are in turn organized into Projects. This function iterates over the runs in a Experiment, identifies eligible Runs, and submits each Run to be processed. Args: exp: str. The SRA Experiment ID. study: str. The SRA Project ID for this Experiment. cfg: dict. The config dict returned by get_config() srp_dir: The Project directory under which the Experiment directory exists or should be created. run_info: pandas DataFrame. Contains the metadata for all Runs in this Experiment. Should be a subset of the DataFrame returned from get_accessions(). n_jobs: int. The maximum number of jobs to submit during this program call. job_n: int. The current number of jobs submitted. Returns: The number of jobs submitted at the time of function return. """ logger = log.getLogger('log') if run_info['Run'].isin(cfg['EXCLUDE']).all(): if cfg['EXCLUDELOG']: logger.info(f'<{study}-{exp}> All Runs found in exclude list. ' 'Skipping Experiment.') return(job_n) logger.info(f'Processing {study}-{exp}') if 'hi-c' not in map(lambda x: x.lower(), run_info['LibraryStrategy']): logger.error(f'<{study}-{exp}> No Hi-C Runs in this Experiment! ' 'Skipping Experiment.') return(job_n) if cfg['SPECIES'].lower() not in map(lambda x: x.lower(), run_info['ScientificName']): logger.error(f'<{study}-{exp}> No {cfg["SPECIES"]} data in this ' 'Experiment! Skipping Experiment.') return(job_n) if (run_info['digest_file'] == '').all(): logger.error(f'<{study}-{exp}> No Runs have assigned digest files! ' 'Skipping Experiment.') return(job_n) srx_dir = os.path.join(srp_dir, exp) if not os.path.isdir(srx_dir): logger.info(f'Generating {srx_dir}/') os.mkdir(srx_dir) logger.info(f'<{study}-{exp}> Writing SRX metadata file') run_info.to_csv(f'{srx_dir}/{exp}_metadata.csv', sep=',', header=True, index=False) sra_runs = run_info['Run'] for run in sra_runs: job_n = process_srr(run, exp, study, cfg, srx_dir, run_info.loc[[run]], job_n) if job_n >= n_jobs: return(job_n) return(job_n) def process_srr(run, exp, study, cfg, srx_dir, run_info, job_n): """Build a cluster job to process a Run and submit it. Each Run is processed through the pipeline in an individual cluster job. This function checks to see if the Run is eligible, and if so, lays out the necessary directory structure, input data, constructs the job submission command, and submits it. Args: run: str. The SRA Run ID. exp: str. The SRA Experiment ID. study: str. The SRA Project ID for this Experiment. cfg: dict. The config dict returned by get_config() srx_dir: The Experiment directory under which the Run directory exists or should be created. run_info: pandas DataFrame. Contains the metadata for all Runs in this Experiment. Should be a subset of the DataFrame returned from get_accessions(). job_n: int. The current number of jobs submitted. Returns: The number of jobs submitted at the time of function return. """ logger = log.getLogger('log') if run in cfg['EXCLUDE']: if cfg['EXCLUDELOG']: logger.info(f'<{study}-{exp}-{run}> Found in exclude list. ' 'Skipping Run.') return(job_n) logger.info(f'Processing {study}-{exp}-{run}') if run_info['LibraryStrategy'][0].lower() != 'hi-c': logger.error(f'<{study}-{exp}-{run}> Not a Hi-C run! ' 'Skipping Run.') return(job_n) if run_info['ScientificName'][0].lower() != cfg['SPECIES'].lower(): logger.error(f'<{study}-{exp}-{run}> Not {cfg["SPECIES"]} data! ' 'Skipping Run.') return(job_n) if run_info['digest_file'][0] == '': logger.error(f'<{study}-{exp}-{run}> No assigned digest file! ' 'Skipping Run.') srr_dir = os.path.join(srx_dir, run) log_dir = os.path.join(srr_dir, 'logs') if os.path.exists(srr_dir): if cfg['OVERWRITE']: logger.info(f'Removing existing entry for {run}') shutil.rmtree(srr_dir) else: logger.info(f'Entry already exists for {run}. Skipping.') return(job_n) logger.info(f'Generating {srr_dir}/ and {log_dir}/') if not os.path.isdir(srr_dir): os.mkdir(srr_dir) if not os.path.isdir(log_dir): os.mkdir(log_dir) logger.info(f'<{study}-{exp}-{run}> Writing SRR metadata file') run_info.to_csv(f'{srr_dir}/{run}_metadata.csv', sep=',', header=True, index=False) cfg_pickle = f'{srr_dir}/run_info.pickle' with open(cfg_pickle, 'wb') as f: pickle.dump(cfg, f) if cfg['TMPDIR'] is not None: tmpline = '-l tmp_free=350G ' else: tmpline = '' os.system('qsub ' f'-N {run}_hic ' f'-o {srr_dir}/batch_job.out ' f'-e {srr_dir}/batch_job.err ' f'-wd {log_dir} ' f'-pe threads {cfg["THREADS"]} ' f'-l m_mem_free={cfg["MEMORY"]} ' f'{tmpline}' f'{cfg["PROCESS_SRR"]} ' f'--run {run} ' f'--exp {exp} ' f'--study {study} ' f'--cfg {cfg_pickle} ' f'--srx_dir {srx_dir} ') with open(cfg['EXCLUDEFILE'], 'a') as f: f.write(f'{study},{exp},{run},\n') return(job_n + 1) def main(config, newconfig, njobs): """Get config and metadata, then iterates over Projects. The program takes a config file and metadata. This function parses it, and begins iterating over SRA Projects to look for and submit eligible Runs to the cluster. Also sets up the relevant loggers, and generates an example config file if requested. Args: config: str. Path to the config file. newconfig: bool. If True, a blank config file will be generated before exiting. njobs: The maximum number of jobs to submit. """ if newconfig: generate_config() sys.exit(0) # Have to index config because I specify that there can # only be one argument, and so argparse returns a list. cfg = get_config(config[0]) if not os.path.exists(cfg['OUTDIR']): os.mkdir(cfg['OUTDIR']) logger = setup_logger(cfg) logger.info('Starting') run_info = get_accessions(cfg['ACCESSIONSFILE'], cfg['DIGESTSFILE']) sra_studies = run_info['SRAStudy'].unique() job_n = 0 for study in sra_studies: job_n = process_srp( study, cfg, cfg['OUTDIR'], run_info.loc[run_info['SRAStudy'].isin([study]), :], njobs, job_n ) if job_n >= njobs: break if job_n < njobs: logger.warning(f'njobs = {njobs}, but only {job_n} jobs ' 'available for processing! Ending early.') logger.info('Finished') logger_clean = setup_logger(cfg, clean=True) logger_clean.info('\n' + '*'*100 + '\n') if __name__ == '__main__': script_name = os.path.basename(sys.argv[0]) short_script_name = '.'.join(script_name.split('.')[:-1]) parser = argparse.ArgumentParser() parser.add_argument('-c', '--config', nargs=1, required=False, default=os.path.join(os.getcwd(), f'{short_script_name}.config'), help=('Path to the config file for this script.')) parser.add_argument('--newconfig', required=False, action='store_true', help=('Generate a blank config file in the current' 'directory')) parser.add_argument('-n', '--njobs', type=int, default=1, help=('The number of datasets to process')) if len(sys.argv) == 1: print('Must have a config file! ' 'Use hicup_data_process.py -h for usage message.') sys.exit(1) args = parser.parse_args() main(args.config, args.newconfig, args.njobs)