<a href="https://www.databricks.com/resources/ebook/delta-lake-running-oreilly?itm_data=simplifyinggenomicspipelinesdelta-blog-oreillydlupandrunning">Ottieni un'anteprima del nuovo e-book di O'Reilly</a> per la guida dettagliata di cui hai bisogno per iniziare a usare Delta Lake.</p><hr><p><a class="dbce_cta" href="https://pages.databricks.com/rs/094-YMS-629/images/Simplifying%20Genomics%20Pipelines%20at%20Scale%20with%20Databricks.html" target="_blank">Prova questo Notebook in Databricks</a><br><em>Questo è il primo blog della nostra serie "Analisi genomica su vasta scala". In questa serie, dimostreremo come la </em><a href="https://www.databricks.com/product/genomics"><em>Piattaforma di analisi unificata per la genomica di Databricks</em></a><em> consenta ai clienti di analizzare dati genomici su scala di popolazione. Partendo dall'output della nostra </em><a href="https://www.databricks.com/blog/2018/09/10/building-the-fastest-dnaseq-pipeline-at-scale.html"><em>pipeline genomica</em></a><em>, questa serie fornirà un tutorial sull'uso di Databricks per eseguire il controllo qualità dei campioni, la genotipizzazione congiunta, il controllo qualità della coorte e analisi avanzate di genetica statistica.</em></p><hr><p>Dal completamento del <a href="https://www.genome.gov/human-genome-project/What">Progetto Genoma Umano</a> nel 2003, si è assistito a un'esplosione di dati alimentata da un drastico calo del costo del sequenziamento del DNA, dai 3 miliardi di dollari<sup>1</sup> per il primo genoma a meno di 1.000 dollari di oggi.</p><blockquote><p>[1] <a href="https://www.genome.gov/human-genome-project/What">Il Progetto Genoma Umano</a> è stato un progetto da 3 miliardi di dollari guidato dal Dipartimento dell'Energia e dai National Institutes of Health, iniziato nel 1990 e completato nel 2003.</p></blockquote><p><a href="https://www.databricks.com/wp-content/uploads/2019/03/costpergenome_2017.jpg"><img class="aligncenter wp-image-39985" style="width:700px;" src="https://www.databricks.com/wp-content/uploads/2019/03/costpergenome_2017.jpg" alt="Costo per genoma" height="525"></a></p><p class="text-align-center">Fonte: <a href="https://www.genome.gov/about-genomics/fact-sheets/DNA-Sequencing-Costs-Data">Costi del sequenziamento del DNA: Dati</a></p><p>Di conseguenza, il campo della genomica è ora maturato fino a un punto in cui le aziende hanno iniziato a eseguire il sequenziamento del DNA su scala di popolazione. Tuttavia, il sequenziamento del codice del DNA è solo il primo passo: i dati grezzi devono poi essere trasformati in un formato adatto all'analisi. Generalmente, questo viene fatto unendo una serie di strumenti di <a href="https://www.databricks.com/glossary/bioinformatics">bioinformatica</a> con script personalizzati ed elaborando i dati su un singolo nodo, un campione alla volta, fino a ottenere una raccolta di varianti genomiche. Gli scienziati di <a href="https://www.databricks.com/glossary/bioinformatics">Bioinformatica</a> oggi trascorrono la maggior parte del loro tempo a creare e manutenere queste pipeline. Dato che i set di dati genomici hanno raggiunto la scala dei petabyte, è diventato difficile rispondere tempestivamente anche alle seguenti semplici domande:</p><ul><li>Quanti campioni abbiamo sequenziato questo mese?</li><li>Qual è il numero totale di varianti uniche rilevate?</li><li>Quante varianti abbiamo osservato nelle diverse classi di variazione?</li></ul><p>A complicare ulteriormente questo problema, i dati di migliaia di individui non possono essere archiviati, tracciati o versionati pur rimanendo accessibili e interrogabili. Di conseguenza, i ricercatori spesso duplicano sottoinsiemi dei loro dati genomici durante le analisi, causando un aumento dell'ingombro di archiviazione e dei costi complessivi. Nel tentativo di alleviare questo problema, oggi i ricercatori impiegano una strategia di "congelamento dei dati" (data freeze), in genere per un periodo da sei mesi a due anni, durante il quale interrompono il lavoro su nuovi dati e si concentrano invece su una copia congelata dei dati esistenti. Non esiste una soluzione per sviluppare in modo incrementale le analisi in tempi più brevi, causando un rallentamento del progresso della ricerca.</p><p>C'è una forte necessità di un software robusto in grado di elaborare dati genomici su scala industriale, mantenendo al contempo la flessibilità per gli scienziati di esplorare i dati, iterare sulle loro pipeline di analisi e ricavare nuove conoscenze.</p><p><a href="https://www.databricks.com/wp-content/uploads/2019/06/genomics-reader-writer_image3.png"><img class="aligncenter wp-image-44884" style="width:698px;" src="https://www.databricks.com/wp-content/uploads/2019/06/genomics-reader-writer_image3.png" alt="Architettura per l'analisi genomica end-to-end con Databricks" height="212"></a></p><p class="text-align-center">Fig. 1. Architettura per l'analisi genomica end-to-end con Databricks</p><p>Con <a href="https://www.youtube.com/watch?v=gg-lEJ4sBaA">Databricks Delta: un sistema di gestione unificato per l'analisi dei Big Data in tempo reale</a>, la piattaforma Databricks ha compiuto un passo importante verso la risoluzione dei problemi di governance dei dati, accesso e analisi dei dati che i ricercatori devono affrontare oggi. Con <a href="https://www.databricks.com/product/delta-lake-on-databricks">Databricks Delta Lake</a>, puoi archiviare tutti i tuoi dati genomici in un unico posto e creare analisi che si aggiornano in tempo reale man mano che nuovi dati vengono inseriti. In combinazione con le ottimizzazioni della nostra <a href="https://www.databricks.com/product/genomics">Piattaforma di analisi unificata per la genomica</a> (UAP4G) per la lettura, la scrittura e l'elaborazione dei formati di file genomici, offriamo una soluzione end-to-end per i flussi di lavoro delle pipeline genomiche. L'architettura UAP4G offre flessibilità, consentendo ai clienti di integrare le proprie pipeline e sviluppare le proprie analitiche terziarie. Ad esempio, abbiamo evidenziato la dashboard seguente che mostra metriche e visualizzazioni di controllo qualità che possono essere calcolate e presentate in modo automatico e personalizzate per soddisfare requisiti specifici.</p><p>https://www.youtube.com/watch?v=73fMhDKXykU</p><p>Nel resto di questo blog, illustreremo i passaggi che abbiamo seguito per creare la dashboard di controllo qualità riportata sopra, che si aggiorna in tempo reale man mano che l'elaborazione dei campioni viene completata. Utilizzando una pipeline basata su Delta per l'elaborazione dei dati genomici, i nostri clienti possono ora gestire le loro pipeline in modo da fornire visibilità in tempo reale, campione per campione. Con i notebook Databricks (e integrazioni come GitHub e MLflow) possono tracciare e versionare le analisi in modo da garantire la riproducibilità dei risultati. I loro bioinformatici possono dedicare meno tempo alla manutenzione delle pipeline e più tempo a fare scoperte. Consideriamo l'UAP4G come il motore che guiderà la trasformazione dalle analisi ad hoc alla genomica di produzione su scala industriale, consentendo di ottenere approfondimenti migliori sul link tra genetica e malattia.</p><h2>Leggi dati di esempio</h2><p>Iniziamo leggendo i dati di variazione da una piccola coorte di campioni; l'istruzione seguente legge i dati per un sampleId specifico e li salva utilizzando il formato Databricks Delta (nella cartella delta_stream_output).</p><pre style="font-size:10pt;">spark.read. \ format("parquet"). \ load("dbfs:/annotations_etl_parquet/sampleId=" + "SRS000030_SRR709972"). \ write. \ format("delta"). \ save(delta_stream_outpath) </pre><blockquote><p>Nota: la cartella annotations_etl_parquet contiene annotazioni generate dal <a href="https://www.internationalgenome.org/">set di dati 1000 Genomes</a> archiviato in formato Parquet. L'ETL e l'elaborazione di queste annotazioni sono stati eseguiti utilizzando <a href="https://www.databricks.com/product/genomics">Databricks’ Piattaforma di analisi unificata per la genomica</a>.</p></blockquote><h2>Start lo streaming della tabella Databricks Delta</h2><p>Nella seguente istruzione, stiamo creando il DataFrame Apache Spark exomes, che legge un stream (tramite readStream) di dati utilizzando il formato Databricks Delta. Si tratta di un DataFrame a esecuzione continua o dinamico, ovvero il DataFrame exomes caricherà nuovi dati man mano che vengono scritti nella cartella delta_stream_output. Per visualizzare il DataFrame exomes, possiamo eseguire una query sul DataFrame per trovare il conteggio delle varianti raggruppate per sampleId.</p><pre style="font-size:10pt;"># Read the stream of data exomes = spark.readStream.format("delta").load(delta_stream_outpath) # Display the data via DataFrame query display(exomes.groupBy("sampleId").count().withColumnRenamed("count", "variants")) </pre><p>Quando si esegue l'istruzione <code>display</code>, il notebook di Databricks fornisce una dashboard di streaming per monitorare i job di streaming. Subito sotto il job di streaming si trovano i risultati dell'istruzione display (ossia il conteggio delle varianti per sample_id).</p><p><a href="https://www.databricks.com/wp-content/uploads/2019/03/single_sample_display_variant_count_by_sampleid.png"><img class="aligncenter wp-image-39992" style="width:700px;" src="https://www.databricks.com/wp-content/uploads/2019/03/single_sample_display_variant_count_by_sampleid.png" alt height="282"></a></p><p>Continuiamo a rispondere alle domande iniziali eseguendo altre query basate sul nostro DataFrame `exomes`.</p><h2>Conteggio delle varianti a singolo nucleotide</h2><p>Per continuare con l'esempio, possiamo calcolare rapidamente il numero di varianti a singolo nucleotide (SNV), come mostrato nel graph seguente.</p><pre style="font-size:10pt;">%sql select referenceAllele, alternateAllele, count(1) as GroupCount from snvs group by referenceAllele, alternateAllele order by GroupCount desc </pre><p><a href="https://www.databricks.com/wp-content/uploads/2019/03/single_nucleotide_variant_count_sorted.png"><img class="aligncenter wp-image-39994" style="width:699px;" src="https://www.databricks.com/wp-content/uploads/2019/03/single_nucleotide_variant_count_sorted.png" alt height="356"></a></p><blockquote><p>Nota: il comando <code>display</code> fa parte del workspace di Databricks che consente di visualizzare il DataFrame utilizzando le visualizzazioni di Databricks (ad es. nessuna codifica richiesta).</p></blockquote><h2>Conteggio delle varianti</h2><p>Poiché abbiamo annotato le nostre varianti con effetti funzionali, possiamo proseguire la nostra analisi esaminando la distribuzione degli effetti delle varianti che osserviamo. La maggior parte delle varianti rilevate fiancheggia le regioni che codificano per le proteine; queste sono note come varianti non codificanti.</p><pre style="font-size:10pt;">display(exomes.groupBy("mutationType").count()) </pre><p><a href="https://www.databricks.com/wp-content/uploads/2019/03/mutation_type_donut.png"><img class="aligncenter wp-image-39996" style="width:650px;" src="https://www.databricks.com/wp-content/uploads/2019/03/mutation_type_donut.png" alt height="534"></a></p><h2>Mappa di calore della sostituzione degli aminoacidi</h2><p>Proseguendo con il nostro DataFrame <code>exomes</code>, calcoliamo i conteggi delle sostituzioni di amminoacidi con il seguente snippet di codice. Analogamente ai DataFrame precedenti, creeremo un altro DataFrame dinamico (<code>aa_counts</code>) in modo tale che, man mano che nuovi dati vengono elaborati dal DataFrame degli esomi, ciò si rifletta di conseguenza anche nei conteggi delle sostituzioni di amminoacidi. Stiamo anche scrivendo i dati in memoria (ad es. <code>.format("memory")</code>) e batch elaborati ogni 60s (ossia <code>trigger(processingTime=’60 seconds’)</code>) in modo che il codice della heatmap di Pandas a valle possa elaborare e visualizzare la heatmap.</p><pre style="font-size:10pt;"># Calculate amino acid substitution counts coding = get_coding_mutations(exomes) aa_substitutions = get_amino_acid_substitutions(coding.select("proteinHgvs"), "proteinHgvs") aa_counts = count_amino_acid_substitution_combinations(aa_substitutions) aa_counts. \ writeStream. \ format("memory"). \ queryName("amino_acid_substitutions"). \ outputMode("complete"). \ trigger(processingTime='60 seconds'). \ start() </pre><p>Il seguente frammento di codice legge la precedente tabella Spark <code>amino_acid_substitutions</code>, determina il conteggio massimo, crea una nuova tabella pivot Pandas dalla tabella Spark, quindi traccia la mappa di calore.</p><pre style="font-size:10pt;"># Use pandas and matplotlib to build heatmap amino_acid_substitutions = spark.read.table("amino_acid_substitutions") max_count = amino_acid_substitutions.agg(fx.max("substitutions")).collect()[0][0] aa_counts_pd = amino_acid_substitutions.toPandas() aa_counts_pd = pd.pivot_table(aa_counts_pd, values='substitutions', index=['reference'], columns=['alternate'], fill_value=0) fig, ax = plt.subplots() with sns.axes_style("white"): ax = sns.heatmap(aa_counts_pd, vmax=max_count*0.4, cbar=False, annot=True, annot_kws={"size": 7}, fmt="d") plt.tight_layout() display(fig) </pre><p><a href="https://www.databricks.com/wp-content/uploads/2019/03/single_sampleid_amino_acid_map.png"><img class="aligncenter wp-image-39998" style="width:701px;" src="https://www.databricks.com/wp-content/uploads/2019/03/single_sampleid_amino_acid_map.png" alt height="463"></a></p><h2>Migrazione a una pipeline continua</h2><p>Fino a questo punto, gli snippet di codice e le visualizzazioni precedenti rappresentano una singola esecuzione per un singolo <code>sampleId</code>. Ma poiché stiamo usando Structured Streaming e Databricks Delta, questo codice può essere usato (senza alcuna modifica) per costruire una pipeline di dati di produzione che calcola continuamente le statistiche di controllo qualità man mano che i campioni passano attraverso la nostra pipeline. Per dimostrarlo, possiamo eseguire il seguente snippet di codice che caricherà il nostro intero set di dati.</p><pre style="font-size:10pt;">import time parquets = "dbfs:/databricks-datasets/genomics/annotations_etl_parquet/" files = dbutils.fs.ls(parquets) counter=0 for sample in files: counter+=1 annotation_path = sample.path sampleId = annotation_path.split("/")[4].split("=")[1] variants = spark.read.format("parquet").load(str(annotation_path)) print("esecuzione di " + sampleId) if(sampleId != "SRS000030_SRR709972"): variants.write.format("delta"). \ mode("append"). \ save(delta_stream_outpath) time.sleep(10) </pre><p>Come descritto nei frammenti di codice precedenti, l'origine del DataFrame <code>exomes</code> sono i file caricati nella cartella <code>delta_stream_output</code>. Inizialmente, avevamo caricato un set di file per un singolo <code>sampleId</code> (ad esempio, <code>sampleId = “SRS000030_SRR709972”</code>). Lo snippet di codice precedente ora acquisisce tutti i campioni parquet generati (ossia <code>parquets</code>) e carica in modo incrementale tali file per <code>sampleId</code> nella stessa cartella <code>delta_stream_output</code>. La seguente GIF animata mostra l'output abbreviato dello snippet di codice precedente.</p><p>https://www.youtube.com/watch?v=JPngSC5Md-Q</p><h2>Visualizzazione della pipeline genomica</h2><p>Quando torni all'inizio del notebook, noterai che il DataFrame <code>exomes</code> sta caricando automaticamente i nuovi <code>sampleIds</code>. Poiché il componente di streaming strutturato della nostra pipeline genomica viene eseguito in modo continuo, elabora i dati non appena i nuovi file vengono caricati nella cartella <code>delta_stream_outputpath</code>. Utilizzando il formato Databricks Delta, possiamo garantire la coerenza transazionale dei dati trasmessi in streaming al DataFrame exomes.</p><p>https://www.youtube.com/watch?v=Q7KdPsc5mbY</p><p>A differenza della creazione iniziale del nostro DataFrame <code>exomes</code>, si può notare che il dashboard di monitoraggio dello streaming strutturato sta caricando i dati (ossia la fluttuazione della "velocità di input rispetto a quella di elaborazione", la fluttuazione della "durata del batch" e un aumento delle chiavi distinte nello "stato delle aggregazioni"). Mentre il DataFrame <code>exomes</code> è in fase di elaborazione, notare le nuove righe di <code>sampleIds</code> (e i conteggi delle varianti). Questa stessa azione si può osservare anche per la query associata <em>raggruppata per tipo di mutazione</em>.</p><p>https://www.youtube.com/watch?v=sT179SCknGM</p><p>Con Databricks Delta, qualsiasi nuovo dato è transazionalmente coerente in ogni singola fase della nostra pipeline genomica. Ciò è importante perché garantisce che la pipeline sia coerente (mantiene la coerenza dei dati, ovvero garantisce che tutti i dati siano “corretti”), affidabile (la transazione riesce o fallisce completamente) e in grado di gestire aggiornamenti in tempo reale (la capacità di gestire molte transazioni contemporaneamente e qualsiasi interruzione o errore non avrà alcun impatto sui dati). Pertanto, anche i dati nella nostra mappa di sostituzione degli amminoacidi downstream (che ha richiesto una serie di passaggi ETL aggiuntivi) vengono aggiornati senza soluzione di continuità.</p><p><a href="https://www.databricks.com/wp-content/uploads/2019/03/amino-acid-map-all-files.png"><img class="aligncenter wp-image-40004" style="width:700px;" src="https://www.databricks.com/wp-content/uploads/2019/03/amino-acid-map-all-files.png" alt height="478"></a></p><p>Come ultimo passaggio della nostra pipeline genomica, stiamo anche monitorando le mutazioni distinte esaminando i file Parquet di Databricks Delta all'interno di DBFS (ovvero l'aumento di mutazioni distinte nel tempo).</p><p><a href="https://www.databricks.com/wp-content/uploads/2019/03/distinct-mutations-multiple-files.png"><img class="aligncenter wp-image-40006" style="width:699px;" src="https://www.databricks.com/wp-content/uploads/2019/03/distinct-mutations-multiple-files.png" alt height="421"></a></p><h2>Riepilogo</h2><p>Utilizzando le fondamenta della Databricks Piattaforma di analisi unificata, con un'attenzione particolare a Databricks Delta, bioinformatici e ricercatori possono applicare le analitiche distribuite con coerenza transazionale usando <a href="https://www.databricks.com/product/genomics">Databricks Piattaforma di analisi unificata per la genomica</a>. Queste astrazioni consentono ai professionisti dei dati di semplificare le pipeline genomiche. Qui abbiamo creato una pipeline di controllo qualità per campioni genomici che elabora continuamente i dati man mano che vengono processati nuovi campioni, senza intervento manuale. Sia che si esegua l'ETL o si eseguano analitiche sofisticate, i dati fluiranno attraverso la pipeline genomica rapidamente e senza interruzioni. Provalo oggi stesso scaricando il <a href="https://pages.databricks.com/rs/094-YMS-629/images/Simplifying%20Genomics%20Pipelines%20at%20Scale%20with%20Databricks.html">Notebook Semplificare le pipeline genomiche su larga scala con Databricks Delta</a>.</p><p>Inizia ad analizzare la genomica su larga scala:</p><ul><li>Leggi la nostra <a href="https://www.databricks.com/product/genomics">guida alla soluzione</a>Unified Analytics per la genomica</li><li>Download the <a href="https://pages.databricks.com/rs/094-YMS-629/images/Simplifying%20Genomics%20Pipelines%20at%20Scale%20with%20Databricks.html">Simplifying genomica pipeline at Scale with Databricks Delta Notebook</a></li><li>Iscriviti a una <a href="https://pages.databricks.com/genomics-preview.html">prova gratuita </a>di Databricks Unified Analytics for Genomics</li></ul><p> </p><h2>Ringraziamenti</h2><p>Grazie a Yongsheng Huang e Michael Ortega per i loro contributi.<br> </p><div style="border-radius:10px;border:medium solid #0CF;margin-left:auto;margin-right:auto;max-width:700px;padding:25px;text-align:left;"><strong>Ti interessa Delta Lake open-source?</strong><br><a href="https://delta.io?utm_source=delta-blog" rel="noopener noreferrer" target="_blank">Visita l'hub online di Delta Lake</a> per saperne di più, scaricare il codice più recente e unirti alla community di Delta Lake.</div><p> </p>
Data Science e ML
October 31, 2023/9 min de leitura