#!/grid/gillis/home/nfox/software/miniconda3/bin/python """Access functions to Hi-C database HDF5 files. A collection of functions to access individual network and project-level aggregate HDF5 files without having to know the internal structure of the files. Functions: list_networks: Get the available networks get_chrom_data: Get chromosome names and sizes get_exclude: Get list of incorporated Runs. get_blacklist: Get list of failed Runs. get_chrom_contacts: Get single contacts matrix. get_all_chrom_contacts: Get all contacts matrices. """ import h5py import scipy.sparse as ss import os import sys import itertools def layout_error_msg(msg): """Format an error message for invalid file layout.""" return f'Invalid HDF5 Layout: {msg}' def get_network_type(hfile, network=None): """Get the type of networks in the file. There are two types of HDF5 files: aggregates and networks. Ascertains which and returns the type. Optionally, can also verify if a specific network exists. Args: hfile: str. Path to the HDF5 file. network: str. Name of the network to check for existence. Returns: The net group: "aggregates" or "networks". If network is not None, then a tuple of (net group, network) is returned. Raises: ValueError: If the layout is incorrect, or if the network does not exist. """ if not os.path.exists(hfile): raise ValueError('hfile must be a path pointing to a ' 'readable HDF5 file.') with h5py.File(hfile, 'r') as hf: if 'aggregates' in hf.keys() and 'networks' in hf.keys(): raise ValueError(layout_error_msg('contains both "aggregates" ' 'and "networks" as top level groups.')) elif 'aggregates' in hf.keys(): net_group = 'aggregates' elif 'networks' in hf.keys(): net_group = 'networks' else: raise ValueError(layout_error_msg('contains both "aggregates" ' 'and "networks" as top level groups.')) if network is not None: if (network not in hf[f'/{net_group}'].keys() or type(hf[f'/{net_group}/{network}']) is not h5py._hl.group.Group): raise ValueError(layout_error_msg('network must be an HDF5 ' f'Group under \'/{net_group}/\'')) return net_group, network else: return net_group def list_networks(hfile): """List the networks in the file. Args: hfile: str. Path to the HDF5 file. Returns: List of network names in the file. """ net_group = get_network_type(hfile) with h5py.File(hfile, 'r') as hfile: return [k for k in hfile[f'/{net_group}'].keys()] def get_chrom_data(hfile, network): """Get the chromosome names and sizes for a network. Args: hfile: str. Path to the HDF5 file. network: str. Name of the network from which to retrieve data. Returns: dict of chromosome data. keys are names and values are sizes in base pairs. Raises: AssertionError: If get_network_type() returns an unexpected network type. ValueError: If the names and sizes arrays are different shapes. The file is incorrectly formatted. """ net_group, network = get_network_type(hfile, network) if net_group == 'aggregates': path_prefix = f'/{net_group}/{network}/metadata' elif net_group == 'networks': path_prefix = f'/{net_group}/{network}' else: raise AssertionError(f'Received invalid net_group "{net_group}" ' 'from get_network_type()') with h5py.File(hfile, 'r') as hfile: if (hfile[f'{path_prefix}/chrom_names'].shape[0] != hfile[f'{path_prefix}/chrom_sizes'].shape[0]): raise ValueError('Invalid HDF5 Layout: chrom_names and ' 'chrom_sizes are different sizes.') chrom_names = [k.decode('utf-8') for k in hfile[f'{path_prefix}/chrom_names'][:]] chrom_sizes = list(hfile[f'{path_prefix}/chrom_sizes'][:]) return {n: s for n, s in zip(chrom_names, chrom_sizes)} def get_chrom_combs(chrom_data): """Get the chromosome combination labels. Args: chrom_data: dict. Chromosome sizes in base pairs keyed by chromosome names. Returned from get_chrom_data(). Returns: list of str. All combinations of the given chromosomes that are used to generate contact matrices. Raises: ValueError: If chrom_data is not a valid format. """ chroms = chrom_data.keys() chroms = sorted(chroms, key=lambda x: chrom_data[x], reverse=True) chrom_combs = [f'{c}_vs_{c}' for c in chroms] for c1, c2 in itertools.combinations(chroms, 2): if c1 > c2: temp = c1 c1 = c2 c2 = temp chrom_combs.append(f'{c1}_vs_{c2}') return(chrom_combs) def get_exclude(hfile, network): """Retrieve the list of incorporated Runs, marked for exclude. Args: hfile: str. Path to the HDF5 file. network: str. Name of the network from which to retrieve data. Returns: dict of 4 lists. Lists are of identical length and matched. They are the project, experiment, and run (SRA data), as well as the network that the entry is for. Keys are those names. Raises: ValueError: If anything about the format is invalid. """ net_group, network = get_network_type(hfile, network) if net_group != 'aggregates': raise ValueError(f'{hfile} is not an aggregate file and so, ' 'does not have an exclude file.') exclude = {} exclude_path = f'/{net_group}/{network}/metadata/exclude' valid_cols = {'project', 'experiment', 'run', 'network'} with h5py.File(hfile, 'r') as hfile: for col in hfile[exclude_path].keys(): if col not in valid_cols: raise ValueError(layout_error_msg('Invalid Dataset names in ' f'exclude for {net_group}/{network}')) exclude[col] = [c.decode('utf-8') for c in hfile[f'{exclude_path}/{col}'][:]] col_length = 0 for cname, cdata in exclude.items(): if col_length == 0: col_length = len(cdata) else: if len(cdata) != col_length: raise ValueError(layout_error_msg('All Datasets in ' 'exclude are not the same length')) return exclude def get_blacklist(hfile, network): """Retrieve the list of failed Runs, marked for blacklist. Args: hfile: str. Path to the HDF5 file. network: str. Name of the network from which to retrieve data. Returns: dict of 4 lists. Lists are of identical length and matched. They are the project, experiment, and run (SRA data), as well as the network that the entry is for. Keys are those names. Raises: ValueError: If anything about the format is invalid. """ net_group, network = get_network_type(hfile, network) if net_group != 'aggregates': raise ValueError(f'{hfile} is not an aggregate file and so, ' 'does not have an blacklist file.') blacklist = {} blacklist_path = f'/{net_group}/{network}/metadata/blacklist' valid_cols = {'project', 'experiment', 'run', 'network'} with h5py.File(hfile, 'r') as hfile: for col in hfile[blacklist_path].keys(): if col not in valid_cols: raise ValueError(layout_error_msg('Invalid Dataset names in ' f'blacklist for {net_group}/{network}')) blacklist[col] = [c.decode('utf-8') for c in hfile[f'{blacklist_path}/{col}'][:]] col_length = 0 for cname, cdata in blacklist.items(): if col_length == 0: col_length = len(cdata) else: if len(cdata) != col_length: raise ValueError(layout_error_msg('All Datasets in ' 'blacklist are not the same length')) return blacklist def get_chrom_contacts(hfile, network, chrom, fmt='coo'): """Return the contact matrix for the specified chromosome comparison. Args: hfile: str. Path to the HDF5 file. network: str. Name of the network from which to retrieve data. chrom: str. The name of the chromosome comparison for which to retrieve the contacts matrix. Format is "CHROM1_vs_CHROM2", where CHROM1 is always > CHROM2 in alphabetic comparison. If `chrom` is valid only when CHROM1 and CHROM2 are reversed, a warning will be issued and the function will proceed. fmt: str. Type of scipy.sparse matrix to return the results as. Valid Formats = {'bsr', 'coo', 'csc', 'csr', 'dia', 'dok', 'lil'} Returns: The specified contacts matrix in the specified format from the indicated network in the indicated file. Raises: ValueError: The file is not in a valid format. """ valid_fmts = {'bsr', 'coo', 'csc', 'csr', 'dia', 'dok', 'lil'} if fmt not in valid_fmts: raise ValueError(f'{fmt} is not a valid format.') net_group, network = get_network_type(hfile, network) with h5py.File(hfile, 'r') as hfile: if chrom not in hfile[f'/{net_group}/{network}'].keys(): chroms = chrom.split('_vs_') if len(chroms) == 2: if chroms[0] == chroms[1]: raise ValueError( layout_error_msg( f'\'{chrom}\' does not exist in ' f'\'/{net_group}/{network}/\'' ) ) else: new_chrom = f'{chroms[1]}_vs_{chroms[0]}' if new_chrom in hfile['/{net_group}/{network}'].keys(): print(f'WARNING: {chrom} was reversed to {new_chrom}') else: raise ValueError( layout_error_msg( f'Neither \'{chrom}\' nor \'{new_chrom}\' ' f'exists in \'/{net_group}/{network}/\'' ) ) else: raise ValueError( layout_error_msg( f'\'{chrom}\' does not exist in ' f'\'/{net_group}/{network}/\'' ) ) chrom_path = f'/{net_group}/{network}/{chrom}' matrix = ss.coo_matrix((hfile[f'{chrom_path}/data'][:], (hfile[f'{chrom_path}/row'][:], hfile[f'{chrom_path}/col'][:])), hfile[f'{chrom_path}/shape'][:]) if fmt == 'coo': return matrix else: return matrix.__getattribute__(f'to{fmt}')() def get_all_chrom_contacts(hfile, network, fmt='coo'): """Return the contact matrix for the specified chromosome comparison. Args: hfile: str. Path to the HDF5 file. network: str. Name of the network from which to retrieve data. fmt: str. Type of scipy.sparse matrix to return the results as. Valid Formats = {'bsr', 'coo', 'csc', 'csr', 'dia', 'dok', 'lil'} Returns: dict of contacts matrices, each in the specified format from the indicated network in the indicated file. Keyed by the chromosome comparison label. Raises: ValueError: The file is not in a valid format. """ valid_fmts = {'bsr', 'coo', 'csc', 'csr', 'dia', 'dok', 'lil'} if fmt not in valid_fmts: raise ValueError(f'{fmt} is not a valid format.') net_group, network = get_network_type(hfile, network) matrices = {} with h5py.File(hfile, 'r') as hfile: for chrom in hfile[f'/{net_group}/{network}'].keys(): if not chrom.startswith('chr'): continue chrom_path = f'/{net_group}/{network}/{chrom}' try: matrix = ss.coo_matrix((hfile[f'{chrom_path}/data'][:], (hfile[f'{chrom_path}/row'][:], hfile[f'{chrom_path}/col'][:])), hfile[f'{chrom_path}/shape'][:]) except Exception as e: print(layout_error_msg(f'Failed to read \'{chrom_path}/'), file=sys.stderr) raise e if fmt == 'coo': matrices[chrom] = matrix else: matrices[chrom] = matrix.__getattribute__(f'to{fmt}')() return matrices