معرفی شرکت ها


cpg-pipes-0.3.9


Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر

توضیحات

Hail Batch bioinformatics pipelines
ویژگی مقدار
سیستم عامل -
نام فایل cpg-pipes-0.3.9
نام cpg-pipes
نسخه کتابخانه 0.3.9
نگهدارنده []
ایمیل نگهدارنده []
نویسنده -
ایمیل نویسنده -
آدرس صفحه اصلی https://github.com/populationgenomics/production-pipelines
آدرس اینترنتی https://pypi.org/project/cpg-pipes/
مجوز MIT
# Production pipelines ## cpg_pipes python package The `cpg_pipes` package provides Python functions that help to design a pipeline powered by [Hail Batch](https://hail.is/docs/batch/service.html). ### Installation Requires Python 3.10 ```bash pip install cpg_pipes ``` ### Motivation A pipeline can be represented as a set of stages with dependency interrelationships (i.e. a directed acyclic graph of stages). Hail Batch has a concept of jobs that can depend on each other; however, a Hail Batch job corresponds to only one bash script that is run on a cloud VM, which doesn't always solve a higher-level task in the application domain. In contrast, a pipeline stage called "genotype" can include multiple jobs that generate intervals, partition input, run a genotyping tool on each partition in parallel, gather outputs, and perform some post-processing. A user might want to treat it as one "stage" that sits between an "alignment" stage and a "joint calling" stage. At the CPG, we also want to permanently record outputs of some stages (e.g. individual CRAM and GVCF files, fingerprints, a joint-called matrix table). Specifically, we store those results on buckets according to the storage policy and add entries into the sample metadata database. When rerunning a pipeline, we want to explicitly control whether to reuse existing results. We also want to be able to start the pipeline from a specific stage, making the pipeline assume that all previous stages have finished successfully; or run the pipeline only up to a specific stage. Motivated by that, we designed the `cpg_pipes` package. It implementes a concept of `Stage` that can act on a `Target`: e.g. a `Sample`, a `Project`, or a `Cohort`. For example, a stage that performs read alignment to produce a CRAM file would act on a sample, and a stage that performs joint-calling would act on an entire cohort. Each stage declares paths to the outputs it would produce by implementing the abstract `expected_outputs()` method; and it also defines how jobs are added into Hail Batch using the `queue_jobs()` method. Overall classes and object relationships look as follows: ![uml](docs/classes.png) ### Building pipelines To declare a stage, derive a class from `SampleStage`, `ProjectStage`, or `CohortStage`, implement the abstract methods, and wrap the class with a `@stage` decorator: ```python from cpg_pipes.pipeline import stage, StageInput, StageOutput, SampleStage from cpg_pipes.targets import Sample from cpg_pipes import Path, to_path @stage class Align(SampleStage): """Stage that runs read alignment""" def expected_outputs(self, sample: Sample) -> Path: """Returning a path to a CRAM file that will be generated""" return to_path('gs://cpg-thousand-genomes-test/cram/NA12878.cram') def queue_jobs(self, sample: Sample, inputs: StageInput) -> StageOutput | None: """Defines how jobs are added into a batch object""" cram_path = self.expected_outputs(sample) # This Job just writes the sample name to a file: j = self.pipe.b.new_job(f'Align {sample.id}') # Just writing a sample name for a demonstration: j.command(f'echo {sample.id} > {j.output}') self.pipe.b.write_output(j.output, cram_path) # Construct StageOutput object, where we pass a path to the results, # and all added jobs: return self.make_outputs(sample, data=cram_path, jobs=[j]) ``` The `queue_jobs` method is expected to return an output of type `StageOutput`: you can call `self.make_outputs()` to construct that object. Stages can depend on each other. Use the `required_stages` parameter to `@stage` to set dependencies, and use the `inputs` parameter in `queue_jobs` to get the output of a previous stage: ```python from cpg_pipes.pipeline import stage, StageInput, StageOutput, SampleStage from cpg_pipes.targets import Sample @stage(required_stages=[Align]) class ReadCramFile(SampleStage): """Stage that reads the CRAM file genereated by a Align stage""" def queue_jobs(self, sample: Sample, inputs: StageInput) -> StageOutput | None: # Read the `Align` stage outputs: cram_path = inputs.as_path(sample, stage=Align) return ... ``` Stage of differnet levels can depend on each other, and `cpg_pipes` will resolve that correctly. E.g. joint-calling taking GVCF outputs to produce a cohort-level VCF: ```python from cpg_pipes.pipeline import stage, StageInput, StageOutput, SampleStage, CohortStage from cpg_pipes.targets import Sample, Cohort @stage class HaplotypeCaller(SampleStage): """Stage that runs gatk Haplotype caller to generate a GVCF for a sample""" def queue_jobs(self, sample: Sample, inputs: StageInput) -> StageOutput | None: jobs = ... return self.make_outputs(sample, data=self.expected_outputs(sample), jobs=jobs) @stage(required_stages=HaplotypeCaller) class JointCalling(CohortStage): """Stage that runs joint calling on GVCFs""" def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None: # Get outputs from previous stage. Because the HaplotypeCaller stage # acts on a sample, we use a method `as_path_by_target` that returns # a dictionary index by sample ID: gvcf_by_sample_id = inputs.as_path_by_target(stage=HaplotypeCaller) job = ... return self.make_outputs(cohort, data=self.expected_outputs(cohort), jobs=[job]) ... ``` To submit the constructed pipeline to Hail Batch, create a pipeline with `create_pipeline`, and call `submit_batch()`. You need to pass a name, a description, a version of a run, the namespace according to the storage policies (`test` / `main` / `tmp`), and the `analysis_dataset` name that would be used to communicate with the sample metadata DB. ```python from cpg_pipes.pipeline import create_pipeline from cpg_pipes import Namespace pipeline = create_pipeline( name='my_pipeline', description='My pipeline', namespace=Namespace.TEST, analysis_dataset='fewgenomes', ) pipeline.run() ``` ### Input sources Class `cpg_pipes.providers.InputProvider` provides an interface for an input data provider, with two implementations: `CsvInputProvider` and `SmdbInputProvider`. The former one allows you to specify inputs in a CSV file. E.g.: ``` $ cat inputs.tsv dataset,sample,external_id,fqs_r1,fqs_r2,cram,sex hgdp,NA12878,NA12878,NA12878_R1.fq.gz|NA12878_R2.fq.gz,,F hgdp,NA12879,NA12879,NA12879_R1.fq.gz|NA12879_R2.fq.gz,,M ``` ```python from cpg_pipes.providers import InputProviderType from cpg_pipes.pipeline import create_pipeline pipeline = create_pipeline( ..., input_provider_type=InputProviderType.CSV, input_csv='inputs.csv', ) ``` `SmdbInputProvider` allows to pull input data from the CPG [sample metadata database](https://github.com/populationgenomics/sample-metadata): ```python from cpg_pipes.providers import InputProviderType from cpg_pipes.pipeline import create_pipeline pipeline = create_pipeline( ..., datasets=['hgdp'], input_provider_type=InputProviderType.CPG, ) ``` The defailt input provider type is `InputProviderType.NONE`, which means that no samples and datasets are loaded, but you can add them manually into the automatically created `cohort` object: ``` >>> pipeline.cohort.get_datasets() ['hgdp'] >>> for dataset in pipeline.cohort.get_datasets(): >>> samples = dataset.get_samples() >>> print(samples[0]) Sample(id='CPG68197', external_id='HGDP00001', dataset=Dataset("hgdp"), meta={}, alignment_input=CramPath('gs://cpg-nagim-test/cram/HGDP00001.cram'), pedigree=None), ... ``` If a `Participant` entry is available, `sample.participant_id` will be populated. If a corresponding `Sequence` is available, `reads` metadata will be parsed and populated as `sample.alignment_input`, along with `sample.sequencing_type`. If corresponding `Analysis` entries exist, they will be populated as `sample.sanalysis_by_type`. If `Family` data is available, it will be parsed and populated as `sample.pedigree`. ### Storage policies Outputs are written according to provided storage policy. Class `cpg_pipes.providers.StoragePolicy` provides an interface for abstract policy, with one implementation: `CpgStoragePolicy`, which implements the [CPG storage policy](https://github.com/populationgenomics/team-docs/tree/main/storage_policies). The default cloud storage provider is Google Cloud Storage, which can be overridden with `cloud` parameter to `create_pipeline` (or `--cloud <val>` in the command line). ### Bioinformatics jobs The `cpg_pipes.jobs` module defines functions that create Hail Batch Jobs for different bioinformatics purposes: alignment, fastqc, deduplication, variant calling, VQSR, etc. E.g. to implement the joint calling stage above, you can use: ```python from cpg_pipes.pipeline import stage, SampleStage, CohortStage, StageInput, StageOutput from cpg_pipes.targets import Sample, Cohort from cpg_pipes.jobs import haplotype_caller, joint_genotyping from cpg_pipes.filetypes import CramPath, GvcfPath @stage class HaplotypeCaller(SampleStage): """Stage that runs gatk HaplotypeCaller to produde a GVCF""" def queue_jobs(self, sample: Sample, inputs: StageInput) -> StageOutput | None: """Call the function from the jobs module""" cram_path = inputs.as_path(target=sample, stage=Align) expected_path = self.expected_outputs(sample) jobs = haplotype_caller.produce_gvcf( b=self.pipe.b, sample_name=sample.id, cram_path=CramPath(cram_path), output_path=expected_path, scatter_count=50, tmp_prefix=sample.dataset.tmp_prefix(), refs=self.refs, sequencing_type=sample.sequencing_type, ) return self.make_outputs(sample, data=expected_path, jobs=jobs) @stage(required_stages=HaplotypeCaller) class JointCalling(CohortStage): """Stage that runs joint calling on GVCFs""" def queue_jobs(self, cohort: Cohort, inputs: StageInput) -> StageOutput | None: """Call the function from the jobs module""" gvcf_by_sid = { sample.id: GvcfPath(inputs.as_path(target=sample, stage=HaplotypeCaller)) for sample in cohort.get_samples() } jobs = joint_genotyping.make_joint_genotyping_jobs( b=self.pipe.b, out_vcf_path=self.expected_outputs(cohort)['vcf'], out_siteonly_vcf_path=self.expected_outputs(cohort)['siteonly_vcf'], gvcf_by_sid=gvcf_by_sid, refs=self.refs, sequencing_type=cohort.get_sequencing_type(), tmp_bucket=cohort.analysis_dataset.tmp_prefix(), ) return self.make_outputs(cohort, data=self.expected_outputs(cohort), jobs=jobs) ``` Available jobs include alignment: ```python from cpg_pipes.jobs import align sample = ... j = align.align( b=..., alignment_input=sample.alignment_input, output_path=..., realignment_shards_num=10 ) ``` Getting intervals for sharding variant calling: ```python from cpg_pipes.jobs import split_intervals j, intervals = split_intervals.get_intervals(b=..., scatter_count=20) ``` Generate somalier pedigree fingerprints: ```python from cpg_pipes.jobs import somalier fingerprint_job, fingerprint_path = somalier.extact_job( b=..., gvcf_or_cram_or_bam_path=..., refs=..., ) ``` Infer pedigree relashionships and sex of samples in a dataset, and check with a probided PED file: ```python from cpg_pipes.jobs import somalier dataset = ... j, somalier_samples_path, somalier_pairs_path = somalier.ancestry( b=..., dataset=..., refs=..., input_path_by_sid=..., out_tsv_path=..., out_html_path=..., out_html_url=..., ) ``` VQSR (from a site-only VCF or a Hail matrix table): ```python from cpg_pipes.jobs import vqsr cohort = ... j = vqsr.make_vqsr_jobs( b=..., input_vcf_or_mt_path=..., gvcf_count=len(cohort.get_samples()), out_path=..., refs=..., sequencing_type=cohort.get_sequencing_type(), tmp_prefix=cohort.tmp_prefix(), use_as_annotations=True, ) ``` There other jobs available - refer to the code examples. ### Pipelines There are full pipelines available in the `pipelines` folder, specifically: - `pipelines/seqr_loader.py` takes CRAM/FASTQ, aligns and joint-genotypes, then annotates using Hail Query and creates an Elasticsearch index for Seqr. - `pipelines/somalier.py` uses Somalier and an input PED file to check pedigree and sex of CRAMs or GVCFs, - `pipelines/qc.py` takes CRAMs and runs CRAM QC and summarises it in a MultiQC report, - `pipelines/cram_gvcf.py` takes CRAM/FASTQ, and runs alignment and genotyping for each sample, - `pipelines/gatk_sv` orchestrates workflows of [GATK-SV](https://github.com/populationgenomics/gatk-sv). ### Click options The `cpg_pipes/pipeline/cli_opts.py` module provides CLI options for Click that handles default parameters that can be used to customise a pipeline. The basic usage is as follows: ```python import click from cpg_pipes.pipeline import create_pipeline, pipeline_entry_point @click.command() @pipeline_entry_point def main(**kwargs): pipeline = create_pipeline( name='my_pipeline', description='My pipeline', **kwargs, ) ``` When calling such a pipelne script from the command-line, the options defined in `@pipeline_click_options` will be available and passed to the `create_pipeline` factory. You can add more custom options like this: ```python import click from cpg_pipes.pipeline import pipeline_entry_point, create_pipeline @click.command() @pipeline_entry_point @click.option('--custom-option') def main(**kwargs): custom_option = kwargs.pop('custom_option') pipeline = create_pipeline( name='my_pipeline', description='My pipeline', **kwargs, ) ``` ### Batch helpers The `cpg_pipes.hb.batch` module provides a helper function `setup_batch` to set up Hail Batch in the CPG context: ```python from cpg_pipes.hb.batch import setup_batch b = setup_batch('My batch') ``` It will create an instance of Batch that extends the standard Hail Batch, that records stats of added jobs and prints statistics before submission, highlighting labelled jobs, e.g.: ``` Will submit 186 jobs: BWA: 3 for 3 samples Somalier extract (CRAMs): 3 for 3 samples HaplotypeCaller: 3 for 3 samples ReblockGVCF: 3 for 3 samples Somalier extract (GVCFs): 3 for 3 samples Other jobs: 171 ``` The Batch instance also constructs the job name if the names of a sample and a dataset are provided as attributes, e.g.: ```bash >>> j = b.new_job('My job', dict(sample='CPG196535', dataset='fewgenomes')) >>> print(j.name) fewgenomes/CPG196535: My job ``` `cpg_pipes.hb.command` provides a helper to set up a command that can be used to add monitoring of disk space, or authenticate with GCP to make `gsutil` work: ```python from cpg_pipes.hb.command import wrap_command b = ... j = b.new_job('My job') j.command(wrap_command( 'sleep 600', monitor_space=True, setup_gcp=True, )) ``` This will wrap the command as follows: ``` set -o pipefail set -ex export GOOGLE_APPLICATION_CREDENTIALS=/gsa-key/key.json gcloud -q auth activate-service-account --key-file=$GOOGLE_APPLICATION_CREDENTIALS (while true; do df -h; du -sh /io; du -sh /io/batch; sleep 600; done) & sleep 600 df -h; du -sh /io; du -sh /io/batch ``` ### Reusing existing results By default, if expected result exist, empty jobs will be submitted suffixed with ` [reuse]`. You can disable that check with `--no-check-expected-outputs`. You can also start from a specific stage with `--first-stage`, or finish on specific one with `--last-stage`. You can use `@skip` decorator to force skipping a stage: ```python from cpg_pipes.pipeline import stage, skip, SampleStage @skip(reason='Stage is not needed') @stage class MyStage1(SampleStage): ... ``` `assume_outputs_exist=True` would also tell the code that the expected results of that stage exist, and there is no need to check bucket objects for existence: ```python from cpg_pipes.pipeline import stage, skip, SampleStage @skip @stage(assume_outputs_exist=True) class MyStage2(SampleStage): ... ``` You can also force the pipeline to skip certain samples with `--skip-samples/-S`, or take only certain samples with `--only-samples/-s`, or force processing certain samples with `--force-samples`. ### Running pipelines To start the `seqr_loader` pipeline on 2 datasets `acute-care` and `perth-neuro` in the test namespace, using `seqr` as analysis dataset, and write the Elasticsearch index for `perth-neuro`: ```sh python pipelines/seqr_loader.py \ -n main \ --analysis-dataset seqr \ --dataset acute-care \ --dataset perth-neuro \ --skip-sample CPG11783 \ --skip-sample CPG13326 \ --keep-scratch ``` Alternatively, you can put the command line options into a config YAML file: ```sh python pipelines/seqr_loader.py --config pipelines/configs/seqr.yml ``` Another useful pipeline is `pipelines/somalier.py` to verify inferred sample relatedness and sex against a provided PED file(s): ```sh python pipelines/somalier.py --config pipelines/configs/seqr.yml ```


نحوه نصب


نصب پکیج whl cpg-pipes-0.3.9:

    pip install cpg-pipes-0.3.9.whl


نصب پکیج tar.gz cpg-pipes-0.3.9:

    pip install cpg-pipes-0.3.9.tar.gz