#!/grid/gillis/home/nfox/software/miniconda3/bin/python """ Builds aggregate networks in aggregate.hdf5 HDF5 files for the Hi-C database. Sources existing individual networks on the sample level. i.e. the network resolution must exist in all sample-level networks.hdf5 files before an aggregate can be built. It's entirely possible to build lower-resolution networks from compatible higher-resolution networks. e.g. build a 10kbp network/aggregate from an existing 5kbp network/aggregate, but this program does not support that. """ import h5py import itertools import logging import numpy as np import os import scipy.sparse as ss import traceback def setup_loggers(global_level=logging.DEBUG, logfile=''): """Set up all loggers for use.""" std_format = logging.Formatter(fmt=('%(asctime)s [%(levelname)s] ' '%(message)s'), datefmt='%Y-%m-%d %H:%M:%S') clean_format = logging.Formatter(fmt='') if logfile == '': logfile = os.path.basename(__file__) if logfile.endswith('.py'): logfile = logfile[:-3] logfile = logfile + '.log' elif os.path.isdir(logfile): logdir = logfile logfile = os.path.basename(__file__) if logfile.endswith('.py'): logfile = logfile[:-3] logfile = logfile + '.log' logfile = os.path.join(logdir, logfile) else: # No effect, here for readability logfile = logfile std_file_handler = logging.FileHandler(filename=logfile, mode='a') std_file_handler.setFormatter(std_format) cln_file_handler = logging.FileHandler(filename=logfile, mode='a') cln_file_handler.setFormatter(clean_format) # Assigning configurations to each logger logger = logging.getLogger('standard') logger.addHandler(std_file_handler) logger.setLevel(global_level) logger = logging.getLogger('clean') logger.addHandler(cln_file_handler) logger.setLevel(global_level) return def update_dataset(hfile, path, replacement): """Update an HDF5 Dataset. Horribly hacky. Deletes the old dataset and replaces it. Avoid any type or shape issues with no subtlety. Args: hfile: h5py File or Group object available for writing. path: str. Path to the dataset to be replaced. If relative, must be valid to the location of hfile. replacement: Any valid HDF5 dataset. Stored at path. Returns: None. Just creates the new dataset. """ del hfile[path] hfile.create_dataset(path, data=replacement) return def validate_root(root): """Validate that root is a reasonable SRA Project root. root should be a folder containing SRA Experiment folders, which each contain SRA Run folders, which each contain a "networks.hdf5" file. Does a soft validation of these criteria. Args: root: str. Path to the root folder to be validated. Returns: A tuple with the number of Experiments and the number of Runs in root. """ logger = logging.getLogger('standard') exp_total = 0 run_total = 0 for exp in os.scandir(root): if os.path.isdir(exp.path) and exp.name[1:3] == 'RX': exp_total += 1 for run in os.scandir(exp): if os.path.isdir(run.path) and run.name[1:3] == 'RR': run_total += 1 if exp_total == 0: logger.error(f'There are no Experiments in {root}') return 0, 0 elif run_total == 0: logger.error(f'There are no Runs in {root}') return 0, 0 else: return exp_total, run_total def get_chrom_sizes(chroms): """Reads in a dict of chromosome sizes. Takes a standard chromosome sizes file (no header, one chromosome per line as "chrom_nameTABsize_in_bp") and builds a dict with names as keys and sizes as values. Args: chroms: str. Path to chromosome sizes file. Returns: A dict where chrom_sizes[chrom] gives the size in base pairs of the chromosome "chrom". """ chrom_sizes = {} with open(chroms, 'r') as f: for line in f: values = line.split('\t') chrom_sizes[values[0]] = int(values[1]) return chrom_sizes def get_main_chroms(chroms): """Filters a list of chromosomes for main chromosomes. Many genomes have partial scaffolds not yet integrated into the main chromosomes. They tend to have "_" in the name. This filters for the ones that do not. e.g. chrIX, chrM, chr10. Args: chroms: list of chromosomes to filter. Returns: A list of all entries in chroms that do not contain "_". """ main_chroms = [c for c in chroms if '_' not in c] return main_chroms def get_chrom_combs(chroms): """Create a list of chromosome combinations. Given a list of chromosomes, produces a list of strings in the format "CHROM1_vs_CHROM2" for all chromsome combinations in chroms, including each identical pair. CHROM1 is always < CHROM2 by string comparison. Args: chroms: list of chromosomes to generate combinations for. Returns: A list of combinations. """ main_chroms = get_main_chroms(chroms) chrom_combs = [f'{c}_vs_{c}' for c in main_chroms] for c1, c2 in itertools.combinations(main_chroms, 2): if c1 > c2: temp = c1 c1 = c2 c2 = temp chrom_combs.append(f'{c1}_vs_{c2}') return chrom_combs def get_exclude(aggregate, network_path): """Get the exclude data from an aggregate file. From a standard aggregate file, retrieves the exclude metadata from a given network as a dict of lists: project, experiment, run, network; for each incorporated dataset. Args: aggregate: str. Path to the aggregate file from which to retrieve the exclude data. network_path: str. Path to the correct network inside the aggregate file. Returns: A dict of matched lists. """ logger = logging.getLogger('standard') exclude = {} if not os.path.exists(aggregate): logger.error(f'{network_path} does not exist.') return exclude with h5py.File(aggregate, 'r') as hfile: if network_path not in hfile: logger.error(f'{network_path} does not exist in {aggregate}.') return exclude group = hfile[f'{network_path}/metadata/exclude'] for col in group.keys(): exclude[col] = [val.decode('utf-8') for val in group[col][:]] return exclude def get_blacklist(aggregate, network_path): """Get the blacklist data from an aggregate file. From a standard aggregate file, retrieves the blacklist metadata from a given network as a dict of lists: project, experiment, run, network; for each incorporated dataset. Args: aggregate: str. Path to the aggregate file from which to retrieve the blacklist data. network_path: str. Path to the correct network inside the aggregate file. Returns: A dict of matched lists. """ logger = logging.getLogger('standard') blacklist = {} if not os.path.exists(aggregate): logger.error(f'{network_path} does not exist.') return blacklist with h5py.File(aggregate, 'r') as hfile: if network_path not in hfile: logger.error(f'{network_path} does not exist in {aggregate}.') return blacklist group = hfile[f'{network_path}/metadata/blacklist'] for col in group.keys(): blacklist[col] = [val.decode('utf-8') for val in group[col][:]] return blacklist def get_aggregate_data(aggregate, network_path, chrom_data): """Get the aggregate networks from an aggregate file. From a standard aggregate file, retrieves the data from a given network for the given chromosomes. Args: aggregate: str. Path to the aggregate file from which to retrieve the aggregate data. network_path: str. Path to the correct network inside the aggregate file. chrom_data: dict. Chromosome names as keys to int chromosome sizes. Data will be retrieved for all chromosome keys. Returns: A dict of scipy.sparse.coo_matrix objects, each keyed to a chromosome combination and containing the data for that combination from the aggregate file. """ logger = logging.getLogger('standard') aggregate_data = {} chrom_combs = get_chrom_combs(list(chrom_data.keys())) if not os.path.exists(aggregate): logger.error(f'{network_path} does not exist.') return aggregate_data with h5py.File(aggregate, 'r') as hfile: if network_path not in hfile: logger.error(f'{network_path} does not exist in {aggregate}.') return aggregate_data for c in chrom_combs: if c not in hfile[f'{network_path}'].keys(): logger.error(f'{c} does not exist in the ' f'{network_path} Group in the ' f'{aggregate} HDF5 file.') continue net = ss.coo_matrix( (hfile[f'{network_path}/{c}/data'][:], (hfile[f'{network_path}/{c}/row'][:], hfile[f'{network_path}/{c}/col'][:])), hfile[f'{network_path}/{c}/shape'][:]).tocsr() aggregate_data[c] = net return aggregate_data def get_chrom_data(aggregate, network_path): """Get chromosome names and sizes from aggregate file. From a standard aggregate file, retrieves the chromosome names and matched chromosome sizes that generate the chromosome combination contact matrices in the network. Args: aggregate: str. Path to the aggregate file from which to retrieve the chromosome data. network_path: str. Path to the correct network inside the aggregate file. Returns: A dict of chromosome sizes, keyed by chromosome name. """ logger = logging.getLogger('standard') chrom_data = {} if not os.path.exists(aggregate): logger.error(f'{network_path} does not exist.') return chrom_data with h5py.File(aggregate, 'r') as hfile: if network_path not in hfile: logger.error(f'{network_path} does not exist in {aggregate}.') return chrom_data chrom_names = hfile[f'{network_path}/metadata/chrom_names'][:] chrom_names = [c.decode('utf-8') for c in chrom_names] chrom_sizes = hfile[f'{network_path}/metadata/chrom_sizes'][:] if len(chrom_names) != len(chrom_sizes): logger.error('Chromosome names and sizes are different lengths.') return chrom_data chrom_data = {n: s for n, s in zip(chrom_names, chrom_sizes)} return chrom_data def init_agg(aggregate, networks, chrom_data): """Initialize an empty standard aggregate file. Builds the HDF5 Group and Dataset structure of a standard aggregate file, ready to be filled. Args: aggregate: str. Path to new aggregate file. networks: list of str. Each item will be the name of an initialized empty network in the new file. Duplicates will be ignored. chrom_data: dict. Chromosome sizes keyed by chromosome name. Returns: A bool. False if aggregate exists, True if new one was initialized. """ logger = logging.getLogger('standard') if os.path.exists(aggregate): logger.error(f'{aggregate} exists. Cannot overwrite.') return False with h5py.File(aggregate, 'w') as hfile: hfile.create_group('/aggregates') for network in set(networks): init_network(aggregate, network, chrom_data) return True def init_network(aggregate, network, chrom_data): """Initialize a network HDF5 Group. Args: aggregate: str. Path to the HDF5 aggregates file in which the network will be initialized. network: str. Name of the new network to initialize. chrom_data: dict. int chromosome sizes keyed by str chromosome names. """ chrom_names = list(chrom_data.keys()) chrom_sizes = [chrom_data[k] for k in chrom_names] with h5py.File(aggregate, 'a') as hfile: agg = hfile.create_group(f'/aggregates/{network}') for c in list(get_chrom_combs(chrom_names)): c_net = agg.create_group(c) c_net.create_dataset('data', data=np.array([])) c_net.create_dataset('row', data=np.array([])) c_net.create_dataset('col', data=np.array([])) c_net.create_dataset('shape', data=np.array([0, 0])) meta = agg.create_group('metadata') meta.create_dataset('chrom_names', data=chrom_names) meta.create_dataset('chrom_sizes', data=chrom_sizes) magg = meta.create_group('exclude') magg.create_dataset('project', data=np.array([])) magg.create_dataset('experiment', data=np.array([])) magg.create_dataset('run', data=np.array([])) magg.create_dataset('network', data=np.array([])) mbla = meta.create_group('blacklist') mbla.create_dataset('project', data=np.array([])) mbla.create_dataset('experiment', data=np.array([])) mbla.create_dataset('run', data=np.array([])) mbla.create_dataset('network', data=np.array([])) return def add_dataset(run_path, network, aggregate_data, exclude, blacklist, chrom_data): """Add dataset to in-memory aggregate. Retrieves a dataset from an individual network and adds it to the working aggregate. Network is added to the exclude data on success, blacklist otherwise. If blacklisted, will attempt to remove the Run from the working aggregate. Args: run_path: str. Path to the SRA Run folder holding the "networks.hdf5" file to be retrieved. network: str. The name of the network to retrieve from run_path/networks.hdf5. aggregate_data: dict. scipy.sparse csr_matrix objects for each chromosome combination, keyed by combination name. exclude: dict. Matched lists of project, experiment, run, network for all Runs that have already been aggregated. blacklist: dict. Matched lists of project, experiment, run, network for all Runs that have already failed to aggregate. chrom_data: dict. Chromosome sizes keyed by chromosome name. """ logger = logging.getLogger('standard') logger_c = logging.getLogger('clean') project, experiment, run = run_path.split('/')[-3:] network_file = os.path.join(run_path, 'networks.hdf5') success = True with h5py.File(network_file, 'r') as hfile: chrom_combs = get_chrom_combs(list(chrom_data.keys())) finished_combs = {} for c in chrom_combs: net = ss.coo_matrix( (hfile[f'/networks/{network}/{c}/data'][:], (hfile[f'/networks/{network}/{c}/row'][:], hfile[f'/networks/{network}/{c}/col'][:])), hfile[f'/networks/{network}/{c}/shape'][:]).tocsr() if (aggregate_data[c].shape != (0, 0) and net.shape != aggregate_data[c].shape): logger.error(f'new {c} and in memory agg {c} do not ' 'have the same shape.') continue try: if aggregate_data[c].shape == (0, 0): aggregate_data[c] = net else: aggregate_data[c] = aggregate_data[c] + net except Exception as e: logger.error(f'Adding {run}:{c} to in memory ' 'aggregate failed. Error message:') exception_string = ''.join( traceback.format_exception( etype=type(e), value=e, tb=e.__traceback__ ) ) logger_c.error(exception_string) success = False break finished_combs[c] = net if success: exclude['project'].append(project) exclude['experiment'].append(experiment) exclude['run'].append(run) exclude['network'].append(network) else: for c in finished_combs: if (aggregate_data[c] != net).nnz == 0: aggregate_data[c] = ss.coo_matrix( (np.array([]), (np.array([]), np.array([]))), np.array([0, 0]) ).tocsr() else: aggregate_data[c] = aggregate_data[c] - net blacklist['project'].append(project) blacklist['experiment'].append(experiment) blacklist['run'].append(run) blacklist['network'].append(network) return def write_results(aggregate, h5_network, aggregate_data, exclude, blacklist, chrom_data): """Write in-memory aggregate to disk. Writes the working aggregate in memory to disk, updating the stored aggregate file. Note, this function currently implements a horrible update by deleting the existing Dataset and writing a new one. Args: aggregate: str. Path to the aggregate file to be updated. h5_network: str. Path inside the aggregate to the network to be updated. aggregate_data: dict. scipy.sparse.csr_matrix contact matrices, keyed by chromosome combination. exclude: dict. Matched lists of project, experiment, run, network for all Runs that have already been aggregated. blacklist: dict. Matched lists of project, experiment, run, network for all Runs that have already failed to aggregate. chrom_data: dict. Chromosome sizes keyed by chromosome name. Returns: A integer exit code. 0 for success, 1 for failure. """ logger = logging.getLogger('standard') EXIT_SUCCESS = 0 EXIT_FAILURE = 1 if not os.path.exists(aggregate): logger.error(f'Aggregate `{aggregate}` does not exist and ' 'cannot be written to.') return EXIT_FAILURE with h5py.File(aggregate, 'r+') as hfile: chrom_combs = get_chrom_combs(list(chrom_data.keys())) for c in chrom_combs: temp = aggregate_data[c].tocoo() if c not in hfile[f'{h5_network}'].keys(): logger.error(f'{h5_network}/{c} does not exist.') continue if any(map(lambda k: k not in hfile[f'{h5_network}/{c}'].keys(), ['data', 'row', 'col', 'shape'])): logger.error(f'{h5_network}/{c} does not have the appropriate ' 'data, row, col, shape entries.') continue update_dataset(hfile, f'{h5_network}/{c}/data', temp.data) update_dataset(hfile, f'{h5_network}/{c}/row', temp.row) update_dataset(hfile, f'{h5_network}/{c}/col', temp.col) update_dataset(hfile, f'{h5_network}/{c}/shape', temp.shape) del temp for col in exclude.keys(): if col not in hfile[f'{h5_network}/metadata/exclude/'].keys(): logger.error(f'"{col}" not in {h5_network}/metadata/exclude/') continue update_dataset(hfile, f'{h5_network}/metadata/exclude/{col}', exclude[col]) for col in blacklist.keys(): if col not in hfile[f'{h5_network}/metadata/blacklist/'].keys(): logger.error(f'"{col}" not in ' f'{h5_network}/metadata/blacklist/') continue update_dataset(hfile, f'{h5_network}/metadata/blacklist/{col}', blacklist[col]) return EXIT_SUCCESS def main(root, aggregate, network, write_count, chroms, log): """Aggregate all Runs in an SRA Project. Iterates over all SRA Runs in a root Project folder and adds all contact matrices for a given network to an aggregate file, creating it if necessary. Args: root: str. Path to the root Project folder to be processed. aggregate: str. Path to the aggregate file. Will be created if necessary. network: str. The name of the network in the aggregate file that will be filled with the data in the same network in each Run's data. write_count: int. Frequency (number of Runs) with which the aggregate will be written to disk. A higher number means less time on expensive I/O, but higher risk that a crash will lose data. A 0 value will not write until the very end. chroms: str. Path to the chromosome sizes file to be used. Default is to take the information from the aggregate if existing, or the first network to be aggregated. log: str. Path to the logfile. Returns: None. All print output is logged to file. All data output is written to the aggregate HDF5 file. """ setup_loggers(global_level=logging.INFO, logfile=log) logger = logging.getLogger('standard') logger_c = logging.getLogger('clean') logger.info('Starting.') try: write_count = int(write_count) except ValueError as e: exception_string = ''.join( traceback.format_exception( etype=type(e), value=e, tb=e.__traceback__ ) ) logger.error('write_counts must be an integer. ValueError message:') logger_c.error(exception_string) return logger.info(f'Processing {root}') proj = os.path.basename(root.rstrip('/')) if os.path.isdir(proj) and proj[1:3] == 'RP': logger.info(f'SRA Project : {proj}') network_path = f'/aggregates/{network}' if aggregate == '': aggregate = os.path.join(root, 'aggregate.hdf5') if os.path.exists(aggregate): logger.info(f'Appending to existing aggregate : {aggregate}') if chroms != '': logger.warning(f'chroms={chroms} is ignored due to ' 'existing aggregate.') else: logger.info(f'Creating new aggregate : {aggregate}') if chroms == '': logger.error('If new aggregate, chroms cannot be "".') return logger.info(f'Using chromosome sizes at {chroms}') chrom_data = get_chrom_sizes(chroms) main_chroms = get_main_chroms(list(chrom_data.keys())) main_chroms_print = ', '.join(main_chroms) logger.info(f'Using only main chromosomes: {main_chroms_print}') chrom_data = {k: v for k, v in chrom_data.items() if k in main_chroms} # [network] is a hack because init_agg expects a list # of networks, but the surrounding program is only working # with one. init_agg(aggregate, [network], chrom_data) with h5py.File(aggregate, 'r') as hfile: existing_networks = set(hfile['/aggregates'].keys()) if network not in existing_networks: chrom_data = get_chrom_sizes(chroms) main_chroms = get_main_chroms(list(chrom_data.keys())) main_chroms_print = ', '.join(main_chroms) chrom_data = {k: v for k, v in chrom_data.items() if k in main_chroms} init_network(aggregate, network, chrom_data) exclude = get_exclude(aggregate, network_path) blacklist = get_blacklist(aggregate, network_path) chrom_data = get_chrom_data(aggregate, network_path) aggregate_data = get_aggregate_data(aggregate, network_path, chrom_data) logger.info(f'Building aggregate for {network} network.') logger.info(f'Writing every {write_count} ' f'{"Run" if write_count == 1 else "Runs"}.') run_count = 0 exp_count = 0 exp_total, run_total = validate_root(root) if write_count <= 0: write_count = run_total if (exp_total, run_total) == (0, 0): return for exp in os.scandir(root): if os.path.isdir(exp.path) and exp.name[1:3] == 'RX': exp_count += 1 logger.info(f'Processing {exp.name}. ({exp_count}/{exp_total})') for run in os.scandir(exp.path): if os.path.isdir(run.path) and run.name[1:3] == 'RR': run_count += 1 if run.name in exclude['run']: logger.info(f'{exp.name}-{run.name} found in ' 'exclude list. Skipping.') continue elif run.name in blacklist['run']: logger.info(f'{exp.name}-{run.name} found in ' 'blacklist list. Skipping.') continue else: logger.info(f'Processing {exp.name}-{run.name}. ' f'({run_count}/{run_total})') add_dataset(run.path, network, aggregate_data, exclude, blacklist, chrom_data) if run_count % write_count == 0: logger.info('Writing to disk...') write_results(aggregate, network_path, aggregate_data, exclude, blacklist, chrom_data) else: continue else: continue if run_count % write_count != 0: logger.info('Writing to disk...') write_results(aggregate, network_path, aggregate_data, exclude, blacklist, chrom_data) logger.info('Finished.') return if __name__ == '__main__': import argparse parser = argparse.ArgumentParser() parser.add_argument('root', help='The SRA Project directory holding the ' '`[SED]RX[0-9]+` folders.') parser.add_argument('-a', '--aggregate', default='', help='The aggregate HDF5 file. Will be generated ' 'if it doesn\'t already exist. Default is ' '`root`/aggregate.hdf5') parser.add_argument('-n', '--network', default='1bp_raw', help='The name of the network in each networks ' 'HDF5 file to be aggregated') parser.add_argument('--write-count', dest='write_count', type=int, default=10, help='The interval at which the aggregate is written ' 'to disk. e.g. default is 10, meaning that the ' 'aggregate will be saved to disk every 10 new ' 'datasets. The larger the interval, the more ' 'you risk losing upon a runtime error, but the ' 'less time the program spends on costly I/O ' 'operations. If write-count <= 0, the only' 'disk write will be at the end.') parser.add_argument('-c', '--chroms', default='', help='Path to chromosome sizes file used if ' 'initializing new aggregate file. Ignored ' 'if updating existing file. If not given, ' 'will use the chroms for the first file ' 'aggregated. ') parser.add_argument('-l', '--log', default='', help='Path to file used as the logfile. Will be ' 'appended to. Default is ' '`root`/\'aggregate\'.log') args = parser.parse_args() main(args.root, args.aggregate, args.network, args.write_count, args.chroms, args.log)