import logging import h5py import numpy as np from hicmatrix import HiCMatrix as hm from pyranges import read_gtf def setup_logger(): logger = logging.getLogger('standard') formatter = logging.Formatter( fmt='%(asctime)s [%(levelname)s] %messages', datefmt='%Y-%m-%d %H:%M:%S' ) handler = logging.FileHandler(filename='make_gene_vectors.log', mode='a') handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) def main(chrom_list, aggregate_file, gene_annot_file, out_file): """Build gene x bins vectors from a HiCMatrix. Args: aggregate_file: str. Path to the HiCMatrix file. gene_annot_file: str. Path to the GTF gene annotations file. out_file: str. Path to the output HDF5 file. """ logger = logging.getLogger('standard') logger.info('Starting') logger.info(f'Reading Hi-C data from {aggregate_file}') hic = hm.hiCMatrix(aggregate_file) hic.keepOnlyTheseChr(chrom_list) hic.matrix.setdiag(0, k=0) logger.info(f'Reading gene annotations from {gene_annot_file}') gtf = read_gtf(gene_annot_file) genes = gtf.df.loc[gtf.df['Feature'] == 'gene'].copy() genes['gene_id'] = [x.split('.')[0] for x in genes['gene_id']] genes['gene_id'].drop_duplicates(inplace=True) if not genes['Chromosome'].str.startswith('chr').any(): genes['Chromosome'] = [f'chr{x}' for x in genes['Chromosome']] genes = genes.loc[genes['Chromosome'].isin(hic.getChrNames())] genes.sort_values(['Chromosome', 'Start'], inplace=True) logger.info(f'Writing gene vectors to {out_file}') with h5py.File(out_file, 'a') as hf: total = genes.shape[0] progress = [0, 0] for i, gene in enumerate(genes.iterrows()): gene = gene[1] progress[1] = int(i / total * 100) if progress[1] % 5 == 0 and progress[1] > progress[0]: logger.info(f'{progress[1]:2d}% of gene vectors built.') progress[0] = progress[1] start_bin, end_bin = hic.getRegionBinRange( gene['Chromosome'], min(gene['Start'], gene['End']), max(gene['Start'], gene['End']) ) idx = start_bin if gene['Strand'] == '+' else end_bin gene_vector = hic.matrix[idx, :].toarray().flatten() if gene['gene_id'] in hf: del hf[gene['gene_id']] hf.create_dataset(gene['gene_id'], data=gene_vector, dtype=np.float32) logger.info('Finished') if __name__ == '__main__': from argparse import ArgumentParser parser = ArgumentParser() parser.add_argument('aggregate', help='The HiCMatrix containing the binned aggregate ' 'from which the gene vectors will be built.') parser.add_argument('chrom_list', help='The HiCMatrix containing the binned aggregate ' 'from which the gene vectors will be built.') parser.add_argument('genes', help='The GTF file containing gene annotation ' 'information. Must be built for hg38, mm10, or ' 'dm6. Must contain the columns ["Start", ' '"End", "Chromosome", "Strand", "gene_id", ' '"Feature"].') parser.add_argument('outfile', help='The HDF5 file to which the gene vectors ' 'will be written at the root as 1D datasets ' 'named as the "gene_id" column in the GTF file. ' 'Existing gene vectors will be overwritten, but ' 'the file as a whole will not be overwritten.') args = parser.parse_args() chrom_list = args.chrom_list.split(',') main(chrom_list, args.aggregate, args.genes, args.outfile)