Scalare da qualche decina di job a centinaia è una sfida per diverse ragioni, una delle quali è l'osservabilità. L'osservabilità è la capacità di comprendere il sistema analizzando componenti come log, metriche e tracce. Questo è altrettanto rilevante per i team di dati più piccoli con solo poche pipeline da monitorare, e motori di calcolo distribuiti come Spark possono essere difficili da monitorare, eseguire il debug e creare procedure di escalation mature in modo affidabile.
La registrazione è probabilmente la più semplice e di maggiore impatto di questi componenti di osservabilità. Fare clic e scorrere i log, un'esecuzione di job alla volta, non è scalabile. Può richiedere molto tempo, essere difficile da analizzare e spesso richiede competenze specifiche sul flusso di lavoro. Senza costruire standard di registrazione maturi nelle tue pipeline di dati, la risoluzione di errori o fallimenti di job richiede molto più tempo, portando a interruzioni costose, livelli di escalation inefficaci e affaticamento da alert.
In questo blog, ti guideremo attraverso:
Le seguenti considerazioni sono importanti da tenere a mente per adattare queste raccomandazioni di registrazione alla tua organizzazione:
La standardizzazione è fondamentale per l'osservabilità dei log di livello production.. Idealmente, la soluzione dovrebbe accogliere centinaia o addirittura migliaia di job/pipeline/cluster.
Per l'implementazione completa di questa soluzione, visita questo repository qui: https://github.com/databricks-industry-solutions/watchtower
Innanzitutto, possiamo creare un Volume di Unity Catalog per essere il nostro archivio file centrale per i log. Non raccomandiamo DBFS poiché non fornisce lo stesso livello di governance dei dati. Raccomandiamo di separare i log per ogni ambiente (ad es. dev, stage, prod) in directory o volumi diversi in modo che l'accesso possa essere controllato in modo più granulare.
Puoi crearlo nell'interfaccia utente, all'interno di un Databricks Asset Bundle (AWS | Azure | GCP), o nel nostro caso, con Terraform:
Assicurati di avere i permessi READ VOLUME e WRITE VOLUME sul volume (AWS | Azure | GCP).
Ora che abbiamo un posto centrale dove mettere i nostri log, dobbiamo configurare i cluster per consegnare i loro log in questa destinazione. Per fare ciò, configura la consegna dei log del calcolo (AWS | Azure | GCP) sul cluster.
Ancora una volta, usa l'interfaccia utente, Terraform o un altro metodo preferito; useremo Databricks Asset Bundles (YAML):
Dopo aver eseguito il cluster o il job, entro pochi minuti, possiamo navigare nel Volume nel Catalog Explorer e vedere i file arrivare. Vedrai una cartella con l'ID del cluster (ad es. 0614-174319-rbzrs7rq), quindi cartelle per ogni gruppo di log:


Gli amministratori del workspace dovrebbero applicare configurazioni standard quando possibile. Ciò significa limitare l'accesso alla creazione di cluster e fornire agli utenti una Cluster Policy (AWS | Azure | GCP) con la configurazione dei log del cluster impostata su valori fissi come mostrato di seguito:
Impostare questi attributi su un valore "fisso" configura automaticamente la destinazione corretta del Volume e impedisce agli utenti di dimenticare o modificare la proprietà.
Ora, invece di configurare esplicitamente il cluster_log_conf nel tuo file YAML del bundle di asset, possiamo semplicemente specificare l'ID della policy del cluster da utilizzare:
Sebbene le istruzioni print() possano essere utili per il debug rapido durante lo sviluppo, risultano inadeguate in ambienti di produzione per diversi motivi:
Framework di logging appropriati, come Log4j per Scala/Java (JVM) o il modulo logging integrato per Python, risolvono tutti questi problemi e sono preferiti in produzione. Questi framework ci consentono di definire livelli di log o verbosità, produrre output in formati leggibili dalle macchine come JSON e impostare destinazioni flessibili.
Si noti inoltre la differenza tra stdout, stderr e log4j nei log del driver Spark:
print() e l'output generale.In Python, ciò comporta l'importazione del modulo di logging standard, la definizione di un formato JSON e l'impostazione del livello di log.
A partire da Spark 4, o Databricks Runtime 17.0+, un logger strutturato semplificato è integrato in PySpark: https://spark.apache.org/docs/latest/api/python/development/logger.html. L'esempio seguente può essere adattato a PySpark 4 scambiando l'istanza del logger con un'istanza di pyspark.logger.PySparkLogger.
Gran parte di questo codice serve solo a formattare i nostri messaggi di log Python come JSON. JSON è semi-strutturato e facile da leggere sia per gli esseri umani che per le macchine, cosa che apprezzeremo quando ingeriremo e interrogheremo questi log più avanti in questo blog. Se saltassimo questo passaggio, potresti ritrovarti a fare affidamento su espressioni regolari complesse e inefficienti per indovinare quale parte del messaggio sia il livello di log rispetto a un timestamp rispetto al messaggio, ecc.
Naturalmente, questo è piuttosto verboso da includere in ogni notebook o pacchetto Python. Per evitare duplicazioni, questo codice boilerplate può essere impacchettato come codice di utilità e caricato nei tuoi job in alcuni modi:
Gli stessi principi si applicano a Scala, ma useremo Log4j invece, o più specificamente, l'astrazione SLF4j:
Quando visualizziamo i Log del Driver nell'interfaccia utente, troviamo i nostri messaggi di log INFO e WARN sotto Log4j. Questo perché il livello di log predefinito è INFO, quindi i messaggi DEBUG e TRACE non vengono scritti.

I log Log4j non sono in formato JSON, però! Vedremo come risolvere questo problema la prossima volta.
Per acquisire informazioni utili per i job di streaming, come metriche della sorgente e della destinazione di streaming e progressi della query, possiamo anche implementare StreamingQueryListener di Spark.
Quindi registra il listener di query con la tua sessione Spark:
Eseguendo una query di streaming strutturato Spark, vedrai ora qualcosa di simile a quanto segue nei log log4j (nota: in questo caso utilizziamo una sorgente e una destinazione Delta; le metriche dettagliate possono variare a seconda della sorgente/destinazione):

Fino ad ora, abbiamo influenzato solo il logging del nostro codice. Tuttavia, guardando i Log del Driver del cluster, possiamo vedere molti più log - la maggior parte, infatti - provengono dall'interno di Spark. Quando creiamo logger Python o Scala nel nostro codice, questo non influenza i log interni di Spark.
Ora esamineremo come configurare i log di Spark per il nodo Driver in modo che utilizzino un formato JSON standard che possiamo facilmente analizzare.
Log4j utilizza un file di configurazione locale per controllare la formattazione e i livelli di log, e possiamo modificare questa configurazione utilizzando uno Script di Inizializzazione del Cluster (AWS | Azure | GCP). Si noti che prima di DBR 11.0, veniva utilizzato Log4j v1.x, che utilizza un file Java Properties (log4j.properties). DBR 11.0+ utilizza Log4j v2.x che utilizza invece un file XML (log4j2.xml).
Il file log4j2.xml predefinito sui nodi driver Databricks utilizza un PatternLayout per un formato di log di base:
Cambieremo questo in JsonTemplateLayout usando il seguente script di inizializzazione:
Questo script di inizializzazione sostituisce semplicemente PatternLayout con JsonTemplateLayout. Nota che gli script di inizializzazione vengono eseguiti su tutti i nodi del cluster, inclusi i nodi worker; in questo esempio, stiamo configurando solo i log del driver per motivi di verbosità e perché ingeriremo solo i log del driver in seguito. Tuttavia, il file di configurazione pu ò essere trovato anche sui nodi worker in /home/ubuntu/databricks/spark/dbconf/log4j/executor/log4j.properties.
Puoi aggiungere a questo script secondo necessità, o cat $LOG4J2_PATH per visualizzare il contenuto completo del file originale per modifiche più semplici.
Successivamente, caricheremo questo script di inizializzazione nel volume di Unity Catalog. Per organizzazione, creeremo un volume separato invece di riutilizzare il nostro volume di log grezzi di prima, e questo può essere realizzato in Terraform in questo modo:
Questo creerà il Volume e caricherà automaticamente lo script di inizializzazione al suo interno.
Ma dobbiamo ancora configurare il nostro cluster per utilizzare questo script di inizializzazione. In precedenza, abbiamo utilizzato una Politica Cluster per applicare la destinazione di consegna dei log, e possiamo fare lo stesso tipo di applicazione per questo script di inizializzazione per garantire che i nostri log Spark abbiano sempre la formattazione JSON strutturata. Modificheremo il JSON della politica precedente aggiungendo quanto segue:
Ancora una volta, l'uso di un valore fisso qui garantisce che lo script di inizializzazione sarà sempre impostato sul cluster.
Ora, se rieseguiamo il nostro codice Spark precedente, possiamo vedere tutti i Log del Driver nella sezione Log4j formattati correttamente come JSON!
A questo punto, abbiamo abbandonato le semplici istruzioni di stampa per il logging strutturato, lo abbiamo unificato con i log di Spark e abbiamo indirizzato i nostri log a un volume centrale. Questo è già utile per sfogliare e scaricare i file di log utilizzando Catalog Explorer o Databricks CLI: databricks fs cp dbfs:/Volumes/watchtower/default/cluster_logs/cluster-logs/$CLUSTER_ID . --recursive.
Tuttavia, il vero valore di questo hub di logging si vede quando ingeriamo i log in una tabella Unity Catalog. Questo chiude il cerchio e ci fornisce una tabella su cui possiamo scrivere query espressive, eseguire aggregazioni e persino rilevare problemi di performance comuni. Tutto questo lo vedremo tra poco!
L'ingestione dei log è semplice grazie a Lakeflow Declarative Pipelines, e impiegheremo un'architettura a medaglione con Auto Loader per caricare i dati in modo incrementale.

La prima tabella è semplicemente una tabella bronze per caricare i dati grezzi dei log del driver, aggiungendo alcune colonne aggiuntive come il nome del file, la dimensione, il percorso e l'ora dell'ultima modifica.
Utilizzando le aspettative di Lakeflow Declarative Pipeline (AWS | Azure | GCP), otteniamo anche il monitoraggio nativo della qualità dei dati. Vedremo altri controlli di qualità dei dati sulle altre tabelle.
La tabella successiva (silver) è più critica; vorremmo analizzare ogni riga di testo dai log, estraendo informazioni come il livello del log, il timestamp del log, l'ID del cluster e la sorgente del log (stdout/stderr/log4j).
Nota: sebbene abbiamo configurato il logging JSON il più possibile, avremo sempre un certo grado di testo grezzo in forma non strutturata da altri strumenti avviati all'avvio. La maggior parte di questi sarà in stdout, e la nostra trasformazione silver dimostra un modo per mantenere l'analisi flessibile, tentando di analizzare il messaggio come JSON e ricorrendo al regex solo quando necessario.
L'ultima tabella nella nostra pipeline è una vista materializzata creata su Databricks System Tables. Memorizzerà gli ID di calcolo utilizzati da ogni esecuzione di job e semplificherà le future join quando vorremo recuperare l'ID del job che ha prodotto determinati log. Nota che un singolo job può avere più cluster, così come task SQL che vengono eseguiti su un warehouse anziché su un cluster di job, da qui l'utilità di pre-calcolare questo riferimento.
La pipeline può essere distribuita tramite l'interfaccia utente, Terraform o all'interno del nostro asset bundle. Utilizzeremo l'asset bundle e forniremo il seguente YAML delle risorse:
Infine, possiamo interrogare i dati dei log tra job, esecuzioni di job, cluster e workspace. Grazie alle ottimizzazioni delle tabelle gestite da Unity Catalog, queste query saranno anche veloci e scalabili. Vediamo un paio di esempi.
Questa query trova gli errori più comuni riscontrati, aiutando a dare priorità e migliorare la gestione degli errori. Può anche essere un indicatore utile per scrivere runbook che coprano i problemi più comuni.
Questa query classifica i job in base al numero di errori osservati, aiutando a trovare i job più problematici.
Se inseriamo queste query in una dashboard AI/BI di Databricks, avremo ora un'interfaccia centrale per cercare e filtrare tutti i log, rilevare problemi comuni e risolvere i problemi.


Questa dashboard AI/BI di esempio è disponibile insieme a tutto il resto del codice per questa soluzione su GitHub.
Come abbiamo dimostrato nella dashboard di riferimento, ci sono molti casi d'uso pratici che una soluzione di logging come questa supporta, ad esempio:
In uno scenario realistico, i professionisti saltano manualmente da un'esecuzione di job all'altra per dare un senso agli errori e non sanno come dare priorità agli avvisi. Stabilendo non solo log robusti ma anche una tabella standard per memorizzarli, i professionisti possono semplicemente interrogare i log per l'errore più comune a cui dare priorità. Supponiamo che ci sia 1 esecuzione di job fallita a causa di un errore OutOfMemory, mentre ci sono 10 job falliti a causa di un improvviso errore di permessi quando SELECT è stato involontariamente revocato al service principal; il tuo team di reperibilità è normalmente affaticato dal picco di avvisi, ma ora è in grado di rendersi conto rapidamente che l'errore di permessi è una priorità più alta e inizia a lavorare per risolvere il problema al fine di ripristinare i 10 job.
Allo stesso modo, i professionisti hanno spesso bisogno di controllare i log di più esecuzioni dello stesso job per fare confronti. Un esempio reale è correlare i timestamp di un messaggio di log specifico da ogni esecuzione batch del job, con un'altra metrica o grafico (ad esempio, quando è stato registrato "batch completed" rispetto a un grafico del throughput delle richieste su un'API che hai chiamato). L'ingestione dei log semplifica questo, in modo che possiamo interrogare la tabella e filtrare per l'ID del job, e opzionalmente una lista di ID di esecuzione del job, senza dover fare clic su ogni esecuzione una alla volta.
cloudFiles.cleanSource per eliminare i file dopo un periodo di conservazione specificato, definito anche come cloudFiles.cleanSource.retentionDuration. È anche possibile utilizzare le regole del ciclo di vita dello storage cloud.I clienti potrebbero anche voler integrare i loro log con strumenti di logging popolari come Loki, Logstash o AWS CloudWatch. Sebbene ognuno abbia i propri requisiti di autenticazione, configurazione e connettività, questi seguirebbero tutti uno schema molto simile utilizzando lo script di inizializzazione del cluster per configurare e spesso eseguire un agente di inoltro dei log.
Per riassumere, le lezioni chiave sono:
Inizia a mettere in produzione i tuoi log oggi stesso consultando il repository GitHub per questa soluzione completa qui: https://github.com/databricks-industry-solutions/watchtower!
I Delivery Solutions Architects (DSA) di Databricks accelerano le iniziative di Dati e AI nelle organizzazioni. Forniscono leadership architetturale, ottimizzano le piattaforme per costi e prestazioni, migliorano l'esperienza degli sviluppatori e guidano l'esecuzione di progetti di successo. I DSA colmano il divario tra l'implementazione iniziale e le soluzioni di livello di produzione, lavorando a stretto contatto con vari team, tra cui data engineering, lead tecnici, dirigenti e altri stakeholder per garantire soluzioni su misura e un valore più rapido. Per beneficiare di un piano di esecuzione personalizzato, guida strategica e supporto durante il tuo percorso di dati e AI da parte di un DSA, contatta il tuo Databricks Account Team.
(Questo post sul blog è stato tradotto utilizzando strumenti basati sull'intelligenza artificiale) Post originale
