"""Utility functions for the Hi-C database admin. Functions: get_exclude_file: Get the exclude file for a species as a DataFrame get_network_path: Get the networks.hdf5 path for a Run get_aggregate_path: Get the aggregates.hdf5 path for a Project list_resolutions: Pretty print the resolutions from an HDF5 file list_chrom_counts: Pretty print the contact counts from one chrom_comb from an HDF5 file. """ import sys sys.path.append('/grid/gillis/data/nfox/hi_c_data_processing/software') import access import os import pandas as pd DATABASE_PATH = '/grid/gillis/data/nfox/hi_c_data_processing' def get_exclude_file(species='human'): """Get the exclude file for a species.""" global DATABASE_PATH if species not in {'human', 'mouse', 'drosophila'}: raise ValueError('species must be "human", "mouse", or "drosophila".') exclude = pd.read_csv(f'{DATABASE_PATH}/metadata/exclude_{species}.csv', header=0, index_col=None) return exclude def get_network_path(run, species='human'): """Get the path to a networks.hdf5 file for a Run.""" global DATABASE_PATH if species not in {'human', 'mouse', 'drosophila'}: raise ValueError('species must be "human", "mouse", or "drosophila".') exclude = get_exclude_file(species) exclude = exclude.query('run == @run') if exclude.shape != (1, 4): return '' runpath = '/'.join(exclude.iloc[0, :3].values) network_path = f'{DATABASE_PATH}/data_{species}/{runpath}/networks.hdf5' if not os.path.exists(network_path): return '' return network_path def get_aggregate_path(project, species='human'): """Get the path to an aggregate.hdf5 file for a Project.""" global DATABASE_PATH if species not in {'human', 'mouse', 'drosophila'}: raise ValueError('species must be "human", "mouse", or "drosophila".') aggregate_path = f'{DATABASE_PATH}/data_{species}/{project}/aggregate.hdf5' if not os.path.exists(aggregate_path): return '' return aggregate_path def list_resolutions(db_id, species='human', filetype='network'): """Pretty print the resolutions.""" if filetype not in {'network', 'aggregate'}: raise ValueError('filetype must be "network" or "aggregate".') if filetype == 'network': filepath = get_network_path(db_id, species) elif filetype == 'aggregate': filepath = get_aggregate_path(db_id, species) else: raise AssertionError(f'invalid filetype {filetype} got past ' 'validation.') if not filepath: return '' res = access.list_networks(filepath) if filetype == 'network': res_strings = list(map(lambda x: f'{x:>12s}', res)) elif filetype == 'aggregate': res_strings = list(map(lambda x: f'{x:>10s}', res)) else: raise AssertionError(f'invalid filetype {filetype} got past ' 'validation.') print(f'{db_id:11s} : {" ".join(res_strings)}') def list_chrom_counts(db_id, species='human', chrom='chr1_vs_chr1', filetype='network'): """Pretty print the counts for one matrix for all resolutions.""" if filetype not in {'network', 'aggregate'}: raise ValueError('filetype must be "network" or "aggregate".') if filetype == 'network': filepath = get_network_path(db_id, species) elif filetype == 'aggregate': filepath = get_aggregate_path(db_id, species) else: raise AssertionError(f'invalid filetype {filetype} got past ' 'validation.') res = access.list_networks(filepath) list_resolutions(db_id, species, filetype) indent = ' ' * 14 contacts = [access.get_chrom_contacts(filepath, r, chrom).sum() for r in res] if filetype == 'network': count_strings = ' '.join(map(lambda x: f'{float(x):12.0f}', contacts)) elif filetype == 'aggregate': count_strings = ' '.join(map(lambda x: f'{float(x):10.0f}', contacts)) else: raise AssertionError(f'invalid filetype {filetype} got past ' 'validation.') print(f'{indent}{count_strings}')