#!/grid/gillis/home/nfox/software/miniconda3/bin/python """Main program running the pipeline for an individual Run. Contains the coordinating function for running each step of the Hi-C data processing pipeline, typically run in a UGE compute cluster job, issued by hicup_data_processing.py. In brief, this pipeline: 1. Downloads SRA file 2. Converts it to FASTQ files 3. Submits the FASTQ files to HiCUP which a. Truncates the reads b. Aligns the reads c. Filters the aligned reads d. Removes PCR duplicates from the aligned reads e. Outputs a final SAM file with Hi-C contacts 4. Converts the HiCUP-output SAM file to a 4DN pairs file. 5. Builds an HDF5 file containing the 1bp contacts matrices and one binned network, using the interval specified by the global variables RESOLUTION and HUMAN_RESOLUTION It generates extensive logs and cleans up most files along the way. It deletes all data files except the indexed and compressed pairs file and the networks HDF5 file. """ import argparse import datetime as dt import logging import os import pandas as pd import pickle import sys class Out_Filter(logging.Filter): """Extension of logging.Filter to get messages for stdout. Filters for messages that are INFO or less severe, which would be expected to go to an *.out file, as opposed to an *.err file. """ def filter(self, rec): """Filter for messages of INFO and less severe.""" if rec.levelno <= logging.INFO: return(True) else: return(False) class Err_Filter(logging.Filter): """Extension of logging.Filter to get messages for stderr. Filters for messages that are WARNING or more severe, which would be expected to go to an *.err file, as opposed to an *.out file. """ def filter(self, rec): """Filter for messages of WARNING and more severe.""" if rec.levelno >= logging.WARNING: return(True) else: return(False) def setup_logger(cfg, log_dir, which='log'): """Set up and configure a logger.""" valid_loggers = {'log', 'clean'} if which not in valid_loggers: raise ValueError(f'which={which} is not a valid logger!') logger = logging.getLogger(which) if which in {'log'}: formatter = logging.Formatter(fmt=('%(asctime)s [%(levelname)s] ' '%(message)s'), datefmt='%Y-%m-%d %H:%M:%S') elif which in {'clean'}: formatter = logging.Formatter(fmt='') else: raise AssertionError(f'\'which\' value "{which}" made it past ' 'input validation!') logger.setLevel(logging.INFO) if which in {'log', 'clean'}: logfile = os.path.join(log_dir, '00_batch_job.log') handler_file = logging.FileHandler(filename=f'{logfile}', mode='a') handler_file.setFormatter(formatter) logger.addHandler(handler_file) else: raise AssertionError(f'\'which\' value "{which}" made it past ' 'input validation!') return(logger) def print_timedelta(timedelta): """Convert a datetime.timedelta object to a pretty string. Convert a datetime.timedelta or an integer number of seconds to a human readable string in the format: 0 days, 0 hours, 0 minutes, 0 seconds Leading zero units are stripped. For example: 2 hours, 0 minutes, 10 seconds Stolen from https://gist.github.com/thatalextaylor/7408395 on 2020-09-09. Args: timedelta: Either a datetime.timedelta object or a number of seconds that can be converted to an integer. The time interval to be converted to a string. Returns: String. The human readable version of timedelta in the format listed above. """ import datetime if type(timedelta) is datetime.timedelta: seconds = timedelta.total_seconds() else: seconds = timedelta if seconds < 0: sign_string = '-' else: sign_string = '' seconds = abs(int(seconds)) days, seconds = divmod(seconds, 86400) hours, seconds = divmod(seconds, 3600) minutes, seconds = divmod(seconds, 60) unit = {} unit['d'] = 'day' if days == 1 else 'days' unit['h'] = 'hour' if hours == 1 else 'hours' unit['m'] = 'minute' if minutes == 1 else 'minutes' unit['s'] = 'second' if seconds == 1 else 'seconds' if days > 0: return(f'{sign_string}{days} {unit["d"]}, {hours} {unit["h"]}, ' f'{minutes} {unit["m"]}, {seconds} {unit["s"]}') elif hours > 0: return(f'{sign_string}{hours} {unit["h"]}, {minutes} {unit["m"]}, ' f'{seconds} {unit["s"]}') elif minutes > 0: return(f'{minutes} {unit["m"]}, {seconds} {unit["s"]}') else: return(f'{seconds} {unit["s"]}') def generate_hicup_config(run, cfg, srr_dir, run_info): """Generate config file to run HiCUP. Args: run: str. The SRA Run ID. cfg: dict. The dict of config values retrieved from disk. srr_dir: str. The path to the Run directory in the database. run_info: pandas DataFrame. Contains the metadata for this Run. """ if cfg['TMPDIR'] is not None: fq_dir = cfg['TMPDIR'] else: fq_dir = srr_dir config_file = ( '# Configuration File for the hicup Perl script', '##############################################', '', '', '# Directory to which output files should be written', '# Default: current working directory', f'Outdir: {srr_dir}/hicup_output', '', '', '# Number of threads to use', f'Threads: {cfg["THREADS"]}', '', '', '# Suppress progress updates (0: off, 1: on)', 'Quiet:0', '', '', '# Retain intermediate pipeline files (0: off, 1: on)', 'Keep:0', '', '', '# Compress output files (0: off, 1: on)', 'Zip:0', '', '', '# Path to the alignment program (Bowtie, Bowtie2, or STAR)', f'STAR: {cfg["STARPATH"]}', '', '', '# Path to the reference genome indices', '# Remember to include the basename of the genome indices', f'Index: {cfg["STARINDICES"]}', '', '', '# Path to the genome digest file produced by hicup_digester', f'Digest: {run_info.iloc[0]["digest_file"]}' '', '', '# FASTQ format (valid formats: "Sanger", "Solexa_Illumina_1.0"', '# "Illumina_1.3", "Illumina_1.5")', '# If not specified, HiCUP will try to determine', '# the format automatically', 'Format:', '', '', '# Maximum di-tag length (optional)', 'Longest:', '', '', '# Minimum di-tag length (optional)', 'Shortest:', '', '', '# FASTQ files to be analysed, placing paired', '# files on adjacent lines', f'{fq_dir}/{run}.sra_1.fastq', f'{fq_dir}/{run}.sra_2.fastq' ) with open(os.path.join(srr_dir, f'{run}_hicup.config'), 'w') as f: for line in config_file: f.write(f'{line}\n') def download_run(run, cfg, srr_dir, log_dir, max_attempts=3): """Download SRA file for a Run. Use the NCBI provided program "prefetch" to download an SRA file for a specific Run. Args: run: str. The SRA Run ID. cfg: dict. The dict of config values retrieved from disk. srr_dir: str. The path to the Run directory in the database. log_dir: str. The path to the directory in which to store log files. Typically `srr_dir`/logs. max_attempts: int. The maximum number of attempts to try running prefetch on a failure result. Returns: success. bool indicating if the SRA file was downloaded successfully or not. """ logger = logging.getLogger('log') pf_out = os.path.join(log_dir, '01_prefetch.out') pf_err = os.path.join(log_dir, '01_prefetch.err') if cfg['TMPDIR'] is not None: out_dir = cfg['TMPDIR'] else: out_dir = srr_dir loop = True attempts = 0 success = False while loop: attempts += 1 os.system( '/grid/gillis/home/nfox/software/' 'sratoolkit.2.10.8-centos_linux64/bin/prefetch ' '--max-size 1000000000 ' f'--output-directory {out_dir} ' '--verify yes ' f'{run}' f'> {pf_out} ' f'2> {pf_err}' ) with open(os.path.join(log_dir, '01_prefetch.out'), 'r') as f: pf_report = f.read() if ('is found locally' in pf_report): success = True loop = False elif not ('was downloaded successfully' in pf_report and 'is valid' in pf_report): if attempts < max_attempts: logger.error(f'{run} failed to download successfully. ' 'Trying again.') loop = True else: logger.error(f'{run} failed to download in maximum ' 'number of attempts.') success = False loop = False else: success = True loop = False return(success) def convert_run_to_fastq(run, cfg, srr_dir, log_dir): """Convert SRA file to FASTQ files. Use the NCBI program fasterq-dump to convert an SRA file to FASTQ files. Automatically splits paired reads. Args: run: str. The SRA Run ID. cfg: dict. The dict of config values retrieved from disk. srr_dir: str. The path to the Run directory in the database. log_dir: str. The path to the directory in which to store log files. Typically `srr_dir`/logs. """ logger = logging.getLogger('log') sra_file = os.path.join(srr_dir, f'{run}.sra') crtf_out = os.path.join(log_dir, '02_fasterq-dump.out') crtf_err = os.path.join(log_dir, '02_fasterq-dump.err') if cfg['TMPDIR'] is not None: out_dir = cfg['TMPDIR'] else: out_dir = srr_dir sra_file = os.path.join(out_dir, f'{run}.sra') if not os.path.exists(sra_file): if os.path.isdir(os.path.join(out_dir, run)): if os.path.exists(os.path.join(out_dir, run, f'{run}.sra')): sra_file = os.path.join(out_dir, run, f'{run}.sra') else: logger.error(f'{run} SRA file not found! fasterq-dump ' 'cannot run!') sys.exit(1) else: logger.error(f'{run} SRA file not found! fasterq-dump ' 'cannot run!') sys.exit(1) os.system( '/grid/gillis/home/nfox/software/' 'sratoolkit.2.10.8-centos_linux64/bin/fasterq-dump ' f'--outdir {out_dir} ' f'--mem 10GB ' f'--temp {out_dir} ' f'--threads {cfg["THREADS"]} ' '--details ' '--force ' '--split-3 ' f'{sra_file} ' f'> {crtf_out} ' f'2> {crtf_err}' ) # Remove the unpaired reads file fq_prefix = os.path.join(out_dir, run) if os.path.exists(f'{fq_prefix}.sra.fastq'): os.remove(f'{fq_prefix}.sra.fastq') def clean_sra_files(run, cfg, srr_dir, log_dir): """Delete all unneeded SRA files. SRA files are not needed once they have been successfully converted to FASTQ files. This function removes them, checking to see if they live in a directory or not. Args: run: str. The SRA Run ID. cfg: dict. The dict of config values retrieved from disk. srr_dir: str. The path to the Run directory in the database. log_dir: str. The path to the directory in which to store log files. Typically `srr_dir`/logs. """ logger = logging.getLogger('log') if cfg['TMPDIR'] is not None: out_dir = cfg['TMPDIR'] else: out_dir = srr_dir sra_file = os.path.join(out_dir, f'{run}.sra') leftover_folder = False if not os.path.exists(sra_file): if os.path.isdir(os.path.join(out_dir, run)): if os.path.exists(os.path.join(out_dir, run, f'{run}.sra')): sra_file = os.path.join(out_dir, run, f'{run}.sra') leftover_folder = True else: logger.error(f'{run} SRA file not found! clean_sra_files() ' 'did not run!') return else: logger.error(f'{run} SRA file not found! clean_sra_files() ' 'did not run!') return csf_out = os.path.join(log_dir, '03_clean_sra.out') csf_err = os.path.join(log_dir, '03_clean_sra.err') os.system( f'rm --verbose {sra_file} ' f'> {csf_out} ' f'2> {csf_err}' ) if leftover_folder: leftover_folder = os.path.join(out_dir, run) os.system( f'rmdir --verbose {leftover_folder} ' f'>> {csf_out} ' f'2>> {csf_err}' ) def run_hicup(run, cfg, run_info, srr_dir, log_dir): """Run the HiCUP Hi-C processing tool. HiCUP from the Babraham lab is the tool that we are using to process Hi-C data from reads to SAM files. Briefly, it truncates reads based on restriction sites, aligns reads, filters output reads, and removes PCR duplicates. We are using a modified version that uses STAR instead of bowtie2, and uses slightly different logging behavior. Args: run: str. The SRA Run ID. cfg: dict. The dict of config values retrieved from disk. run_info: pandas DataFrame. Contains the metadata for this Run. srr_dir: str. The path to the Run directory in the database. log_dir: str. The path to the directory in which to store log files. Typically `srr_dir`/logs. """ generate_hicup_config(run, cfg, srr_dir, run_info) hc_config = os.path.join(srr_dir, f'{run}_hicup.config') if not os.path.isdir(os.path.join(srr_dir, 'hicup_output')): os.mkdir(os.path.join(srr_dir, 'hicup_output')) rh_out = os.path.join(log_dir, '04_hicup.out') rh_err = os.path.join(log_dir, '04_hicup.err') os.system( '/grid/gillis/home/nfox/software/hicup_v0.7.4/hicup ' f'--config {hc_config}' f'> {rh_out} ' f'2> {rh_err}' ) def clean_fastq_files(run, cfg, srr_dir, log_dir): """Delete all unneeded FASTQ files. FASTQ files are not needed once they have been successfully run through the HiCUP pipeline. This function removes them. Args: run: str. The SRA Run ID. cfg: dict. The dict of config values retrieved from disk. srr_dir: str. The path to the Run directory in the database. log_dir: str. The path to the directory in which to store log files. Typically `srr_dir`/logs. """ if cfg['TMPDIR'] is not None: fq_prefix = os.path.join(cfg['TMPDIR'], run) else: fq_prefix = os.path.join(srr_dir, run) cff_out = os.path.join(log_dir, '05_clean_fastq.out') cff_err = os.path.join(log_dir, '05_clean_fastq.err') os.system( 'rm --verbose ' f'{fq_prefix}.sra_1.fastq ' f'{fq_prefix}.sra_2.fastq ' f'> {cff_out} ' f'2> {cff_err}' ) def convert_sam_to_pairs(run, cfg, srr_dir, log_dir): """Convert HiCUP output SAM file to 4DN pairs file format. The 4DN consortium publishes a file format for Hi-C contacts, called a pairs file. It can be indexed for quick access and is essentially a compressed text form of a triplets sparse matrix. This function uses their pairix program to convert the SAM file, compress, and index the reads to a smaller, more efficient format. Args: run: str. The SRA Run ID. cfg: dict. The dict of config values retrieved from disk. srr_dir: str. The path to the Run directory in the database. log_dir: str. The path to the directory in which to store log files. Typically `srr_dir`/logs. """ sam_file = os.path.join(srr_dir, 'hicup_output', f'{run}.sra_1_2.hicup.sam') b2p_prefix = os.path.join(srr_dir, run) cstp_out = os.path.join(log_dir, '06_bam2pairs.out') cstp_err = os.path.join(log_dir, '06_bam2pairs.err') os.system( '/grid/gillis/home/nfox/software/pairix/util/bam2pairs/bam2pairs ' f'--chromsize {cfg["CHROMSIZES"]} ' f'--threads {cfg["THREADS"]} ' f'{sam_file} ' f'{b2p_prefix} ' f'> {cstp_out} ' f'2> {cstp_err}' ) if not os.path.exists(os.path.join(srr_dir, f'{run}.bsorted.pairs.gz')): os.system( '/grid/gillis/home/nfox/software/pairix/bin/bgzip ' '-f ' f'{os.path.join(srr_dir, f"{run}.bsorted.pairs")}' ) os.system( '/grid/gillis/home/nfox/software/pairix/bin/pairix ' '-f ' f'{os.path.join(srr_dir, f"{run}.bsorted.pairs.gz")}' ) elif not os.path.exists(os.path.join(srr_dir, f'{run}.bsorted.pairs.gz.px2')): os.system( '/grid/gillis/home/nfox/software/pairix/bin/pairix ' '-f ' f'{os.path.join(srr_dir, f"{run}.bsorted.pairs.gz")}' ) else: pass def clean_sam_files(run, srr_dir, log_dir): """Delete all unneeded SAM files. The SAM file is not needed once it has been converted to a pairs file. Additionally, HiCUP leaves multiple SAM files of rejected reads that we choose not to keep. This function removes all of these SAM files. Args: run: str. The SRA Run ID. srr_dir: str. The path to the Run directory in the database. log_dir: str. The path to the directory in which to store log files. Typically `srr_dir`/logs. """ logger = logging.getLogger('log') sam_file = os.path.join(srr_dir, 'hicup_output', f'{run}.sra_1_2.hicup.sam') csf_out = os.path.join(log_dir, '07_clean_sam.out') csf_err = os.path.join(log_dir, '07_clean_sam.err') os.system( f'rm --verbose {sam_file} ' f'> {csf_out} ' f'2> {csf_err}' ) reject_folder = [] for f in os.scandir(os.path.join(srr_dir, 'hicup_output')): if f.is_dir() and f.name.startswith('hicup_filter_ditag_rejects'): reject_folder.append(f.name) if len(reject_folder) > 1: logger.warning(f'<{run}> Multiple matching filter reject folders! ' 'Skipping deletion!') elif len(reject_folder) == 1: reject_folder = os.path.join(srr_dir, 'hicup_output', reject_folder[0]) os.system( f'rm -r --verbose {reject_folder} ' f'>> {csf_out} ' f'2>> {csf_err}' ) else: pass # DEBUG >>> # def build_networks_hdf5(srr_dir, log_dir, cfg): def build_networks(srr_dir, log_dir, cfg): # <<< DEBUG """Build the networks.hdf5 file. The main data is held in an HDF5 file where it can be efficiently stored at multiple resolutions. This function builds the initial networks.hdf5 file at 1bp resolution, then optionally builds networks of different binned resolutions. This is just a wrapper function around the build_network.py program. Args: srr_dir: str. The path to the Run directory in the database. log_dir: str. The path to the directory in which to store log files. Typically `srr_dir`/logs. cfg: dict. The dict of config values retrieved from disk. """ logger = logging.getLogger('log') # DEBUG >>> os.system( '/grid/gillis/data/nfox/hi_c_data_processing/' 'software/build_network.py ' '--interval ' '--humanres 1bp ' f'--logfile {os.path.join(log_dir, "10_build_networks.log")} ' '--splitouterr ' '1 ' f'{srr_dir} ' f'{cfg}' ) # <<< DEBUG for res, humanres in zip(cfg['RESOLUTIONS'], cfg['HUMANRESOLUTIONS']): # DEBUG >>> # os.system( # '/grid/gillis/data/nfox/hi_c_data_processing/' # 'software/build_networks.py ' # '--interval ' # f'--humanres {humanres} ' # f'{res} ' # f'{srr_dir} ' # f'{cfg["CHROMSIZES"]} ' # f'> {os.path.join(log_dir, "10_build_networks_hdf5.out")} ' # f'2> {os.path.join(log_dir, "10_build_networks_hdf5.err")}' # ) if res == 1: logger.warning('1bp resolution is passed, will be ignored because ' 'a 1bp resolution network is automatically built.') continue elif res <= 0: logger.error('AssertionError: a resolution {res} <= 0 made it ' 'past input validation to the pipeline. Skipping.') continue os.system( '/grid/gillis/data/nfox/hi_c_data_processing/' 'software/build_network.py ' '--interval ' f'--humanres {humanres} ' f'--logfile {os.path.join(log_dir, "10_build_networks.log")} ' '--splitouterr ' f'{res} ' f'{srr_dir} ' f'{cfg}' ) # <<< DEBUG def concat_logs(log_dir): """Concatenate all individual log files into one file. The main logging messages are written to one file, but all called programs, e.g. prefetch, hicup, rm, pairix, pipe their stdout and stderr to individual files. This function appends all those files in an organized fashion to the main log file. Args: log_dir: str. The path to the directory in which the log files are stored. Typically RUNDIR/logs. """ logs = [ '01_prefetch', '02_fasterq-dump', '03_clean_sra', '04_hicup', '05_clean_fastq', '06_bam2pairs', '07_clean_sam', '08_build_network', '09_bin_network', '10_build_networks_hdf5' ] with open(os.path.join(log_dir, '00_batch_job.log'), 'a') as log_file: log_file.write('\n') log_file.write('LOGS\n') log_file.write('=' * 100) log_file.write('\n') for log in logs: log_file.write(f'{log} STDOUT\n') log_file.write('-' * (len(log) + 7)) log_file.write('\n') out_file = os.path.join(log_dir, f'{log}.out') if os.path.exists(out_file): with open(out_file, 'r') as f: for line in f: line = line.strip() log_file.write(f'{line}\n') log_file.write('\n') os.remove(out_file) log_file.write(f'{log} STDERR\n') log_file.write('-' * (len(log) + 7)) log_file.write('\n') err_file = os.path.join(log_dir, f'{log}.err') if os.path.exists(err_file): with open(err_file, 'r') as f: for line in f: line = line.strip() log_file.write(f'{line}\n') log_file.write('\n') os.remove(err_file) log_file.write('=' * 60) log_file.write('\n\n') def process_srr(run, exp, study, cfg, srx_dir): """Coordinate data pipeline for a single Run. This function coordinates and calls the individual steps of the Hi-C Run processing pipeline. It also generates the main logging messages. It begins by reading the metadata and config data, then takes that metadata and runs the full pipeline. Args: run: str. The SRA Run ID. exp: str. The SRA Experiment ID. study: str. The SRA Project ID. cfg: str. The path to the pickled dict of config values. srx_dir: str. The path to the Experiment directory in the database. """ start = dt.datetime.now() srr_dir = os.path.join(srx_dir, run) log_dir = os.path.join(srr_dir, 'logs') with open(os.path.join(log_dir, 'pipeline_marker.txt'), 'w') as f: f.write('started\n') cfg_file = cfg with open(cfg_file, 'rb') as f: cfg = pickle.load(f) if cfg['TMPDIR']: cfg['TMPDIR'] = os.environ['TMPDIR'] setup_logger(cfg, log_dir) setup_logger(cfg, log_dir, which='clean') logger = logging.getLogger('log') run_info = pd.read_csv(f'{srr_dir}/{run}_metadata.csv', sep=',', header=0, index_col=False) logger.info(f'<{study}-{exp}-{run}> Downloading SRA file') success = download_run(run, cfg, srr_dir, log_dir) if not success: logger.error(f'<{study}-{exp}-{run}> prefetch download failed! ' 'Skipping Run.') return logger.info(f'<{study}-{exp}-{run}> Converting SRA file to FASTQ files') convert_run_to_fastq(run, cfg, srr_dir, log_dir) logger.info(f'<{study}-{exp}-{run}> Removing SRA file') clean_sra_files(run, cfg, srr_dir, log_dir) logger.info(f'<{study}-{exp}-{run}> Running HiCUP') run_hicup(run, cfg, run_info, srr_dir, log_dir) logger.info(f'<{study}-{exp}-{run}> Removing FASTQ files') clean_fastq_files(run, cfg, srr_dir, log_dir) logger.info(f'<{study}-{exp}-{run}> Converting HiCUP SAM file ' 'to pairix-indexed pairs file') convert_sam_to_pairs(run, cfg, srr_dir, log_dir) logger.info(f'<{study}-{exp}-{run}> Removing SAM file') clean_sam_files(run, srr_dir, log_dir) logger.info(f'<{study}-{exp}-{run}> Building 1bp_raw ' 'and 10kbp_raw networks') # DEBUG >>> # build_networks_hdf5(srr_dir, log_dir, cfg) build_networks(srr_dir, log_dir, cfg) # <<< DEBUG end = dt.datetime.now() logger_clean = logging.getLogger('clean') logger_clean.info(f'Runtime: {print_timedelta(end - start)}') logger_clean.info('\n') logger_clean.info('*' * 100) concat_logs(log_dir) with open(os.path.join(log_dir, 'pipeline_marker.txt'), 'w') as f: f.write('finished\n') if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--run', required=True) parser.add_argument('--exp', required=True) parser.add_argument('--study', required=True) parser.add_argument('--cfg', required=True) parser.add_argument('--srx_dir', required=True) args = parser.parse_args() process_srr(args.run, args.exp, args.study, args.cfg, args.srx_dir)