Clinical Suspecting at Scale Using PySpark

Download Slides

One of the key components of handling patient healthcare is reducing the number of misdiagnosed/missed diseases. While the doctor may mention the condition, the overall list of diseases may not have been documented properly during risk adjustment process. We deep dive into the architectural, framework and data decisions taken while performing exploratory analysis to algorithms deployment.We extract datasets from previous medical records, past medical history, drugs, laboratory results, usage of medical equipments, procedures performed and provider specialities.

While hunting for evidence of missing disease conditions, we have to make complex decisions around the following questions:

  1. Which framework to use?
  2. How to perform feature engineering at scale?
  3. Tuning PySpark configurations
  4. Ingesting insights back into the pipeline
  5. Hyper-parameters tuning at scale
  6. Parallel processing and handling OOM errors

We had to devise a hybrid framework, both using clinical rules and ML algorithms. In the end, we identify patients with the highest possibility of having incomplete diagnosis codes.In this talk, we take a deep dive into the above questions, talking about the roadblocks (and examples) we faced while building this platform. Also discussed will be key insights that any data scientist or ML engineer may find handy while dealing with similar data or problem statements.

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Hi, hello there. Good afternoon, good morning. Wherever you are logging into this virtual summit at Spark AI. This year to start off with, I want to acknowledge everyone’s presence here. It’s a very crazy world out there and I realize things have not been as normal, as you would have liked. But then again, this is the world we’re living in. So thank you all for joining in today. For this session today, we will be talking about my work and my team’s work around Clinical Suspecting and some of the issues that we had to resolve along the way while working with PySpark. And that is kind of what I will try to give away for this session is, not try to talk too much about PySpark as is. I’m pretty sure most of the participants would have worked in some capacity or the other with Python, PySpark or Spark environment. So let’s start off and try to understand what machine learning use case they are trying to resolve. And what were the problems we faced and how did we solve it. So a brief introduction about myself, for those of you who are joining and who don’t entirely are aware of me or the company. So I am currently heading machine learning at Episource, as a vice president there. Episource is a healthcare company based in the US. And we work extensively with insurance companies and hospitals to allow for risk adjustment and provide services around healthcare in the US. And my team majorly works on NLP and machine learning platforms. And we focus on building machine learning platforms at large-scale. For example, one of the engines and the platforms that we had made last year, processed more than 100 million pages in the financial year, last year. And personally I belong with academic training, I belong to data sciences and maths and I’m a self taught programmer. I’ve picked up a few programming languages along the way. I’m also a trained AWS Solutions Architect certified. And I’ve worked… In my previous roles and companies, I’ve worked in deploying machine learning analytics and Artificial Intelligence based solutions in multiple domains. And overall, I love Python. I love numbers and I really love building anything that I can get my hands on. So for today, let’s kind of try to walk through a few of the key elements of the use case around machine learning that we’re entitled to build out. And along with trying to understand what were the issues and with PySpark, and how did we go about resolving it. And if it would have been in person, I would have taken a very different approach to this presentation. But I realized that there will be a lot of follow-up questions in a virtual format. Hence, I tried to keep it as straightforward as possible and giving as I’m trying to condense as much information as possible within the provided time. So we’ll just do broadly focus on four key areas. What was the broad use case that we were trying to solve? The problem statement itself? We try to understand what was the data profile of the machine learning problem statement that we’re calling Clinical Suspecting. And what does Clinical Suspecting even mean? And try to understand that at the very outset. And we’ll kind of also try to understand why did we go about using Spark. And also what are the issues around and how did we kind of resolve it? So let’s get started with that. And so, when we’re talking about Clinical Suspecting, the broad use case at this point in time, when we say Clinical Suspecting is, we have to put it in one sentence. It is essentially looking at some key variables, predict whether or not a person will have a disease. For example, I have to look at a patient’s profile. The medicines that he has had, his parents medical history, the labs tests that he has been tested on, the procedures if he has had any, any comorbidities that if he has and any patient visit history with his previous doctors. And take all of that into account and say, “Okay, I have all of this information, and does this person suffer from a certain disease?” Now of course, as the clinical domain or the healthcare domain goes, that there’s not only one disease, there are literally hundreds of diseases out there. But for our problem statement, we chose around 80 odd

diseases on. So what that meant was, it was a classification problem,

of course, where we have lots of independent variables, broadly fitting into these five categories as showcasing. But it also means that it is a multi-class multi-label problem. Which means the same person would have one or more diseases and it’s as so. This person could be suffering from diabetes and hypertension. This person could be suffering from diabetes, hypertension and a chronic kidney problem, which meant that the prediction, the feature engineering, as well as linear logic becomes very complicated. So with that, kind of let’s try to also understand why did we choose Spark. Now, we had the data for patients for over four years. And we tried to kind of, profile it for over 1.2 million

patients of course over four years. And this data was spread across the five broad categories that I just mentioned earlier, (mumbles) the patient’s visit history. The lab does the procedures that have been performed on them. And there are over 25,000 features. This is without any feature engineering, Which meant that there were a lot of columns that we had to work with. And it was 500 GB of raw data, just for these 1.2 million

patients. Which meant that of course, we could not use the normal pythonic methods that we could use to do machine learning and Scikit or tell us(mumbles). So the broad process flow for doing machine learning for this use case was something like this. The so very 30,000 feet view out there, is not as detailed as I would have liked to present. But there’s just a very broad view of what is out there. So we first of course have to load the data, we have to clean it based on some logic, which is dependent on some of the business logics that we have to write up. For example, if someone has diabetes, then I don’t want to capture both of the type of diabetes, that the person suffers from, diabetes type 1 or type 2, then I don’t want to capture the other. So, and of course there were problems around the data sparsity. How do you logically group a person based on his gender, based on his location, based on the zip code, based on what lab test has he had in the previous time.

And of course, then join all of these data across all the multiple categories. And then finally arrive at the data that you want to run any kind of machine learning on them. And then of course, I have 25,000 plus features and plus some more after doing feature engineering, Which of course means that I cannot use everything. So I have to run feature importance pipelines based on random forests based, feature importance identifier. Choose maybe top 5000 features, convert them to sparse matrices, and then run very straightforward, multi-layer perceptron kind of architecture, based on libraries like Keras Python or TensorFlow. And then of course, we have to monitor how the training is going. So we have to monitor that our TensorBoard as well. So this is how the broad workflow, of us working with the data went about. Which of course means that, we had to have faced problems in each of these areas, and there were literally tons of problems that we faced because this was literally the first time when we were doing any kind of training, as well as trying to predict on a data this key. And which meant, we realized on the output that we needed to get away from pandas, the normal cycle methods, and use something like a distributed machine learning on PySpark on the cloud. And that essentially brings us to what were the issues that we faced. So as I mentioned that there were a lot of issues that we had to face and try to resolve within this period. And there were obvious problems at the very outset. There were problems around cloud computing, there were problems around data, there were problems around how do I ingest my data into a certain ML pipeline. Because I also need to take snapshots, to do some amount of exploratory analysis on my data and eventually do exploratory analysis. I cannot use pandas because I need to use all the data for all of my expert analysis as well. So what I’ve done is I’ve shortlisted the top five problems, which in a way, took the most of our time debugging, as well as trying to find our ways around it. Which also took a fair amount of research and took a fair amount of reaching out to people to try to resolve it and I shared with you folks and see if that also helps you as well. So the first issue, the very outset of course, as I mentioned, will come about when I’m loading Debian. And of course, if I’m using Pandas or Pandas+Ray, kind of what technical stack raises, of course, something which allows you to work with large data frames. But even with using the Ray, or even Pandas alone, we used to often run out of memory errors. And the dot apply map function in pandas were very slow. Because we had a lot of complex business logic that we had to use, which required a group by which required a fair amount of filtering for some Lambda functions which was fairly slow with the pandas. So then, that is when we kind of took a decision to move to cloud and using Spark. For us, the stack that we used with working on deploying the machine learning model, was using PySpark, along with AWS EMR, which is the platform where we work with Spark on database cloud with JupyterLab. And of course, using spot instances in the cluster to reduce the cost. And what we found is that our method of using the apply and the math functions was using the UDFs. So User Defined Functions in Spark. But even after doing all of that, what we found was that UDFs were still slow, though, they were faster than pandas, but there was still slow. So this was the first problem that we were faced with using PySpark. And how did we kind of go about resolving it? It was kind of essentially focusing on understanding why were the UDF slow. Why was my PySpark data frame, processing my data in a slow manner? So that is when we realized that the data that we had, was in zip, CSV format. It was compressed CSV, but we realized that when we are working with CSVs and Spark, and especially with PySpark, in a cloud environment, it really does not unlock all of the potential of PySpark that we can use. And that is where we realized that, maybe there are some other data structures out there, which can help us. And upon a lot of inquiring on Stack Overflow, reading a lot of blogs, trying and testing with multiple data formats, we arrived at the parquet data format. And that was the fastest data format for us, for our use case, because we have lots of columns, we have lots of data as well. And parquet allowed us to create the columnar data structure that parquet is very famous for. And allow us to improve the speed of UDFs functions, as well as allow us improve speed that we were looking for the exploratory analysis that we wanted to do with the data at the first scope. And so what we did essentially, was we had our intermediate batch job which we wrote up, to load the CSV files and dump it into parquet formats. And any of these blog jobs that we have to write, will use these parquet files rather than the CSV files. And this gave us immediate 10X speed up with the apply kind of style, PySpark (indistinct). And it was a tremendous improvement in the speed of the exploratory analysis that we wanted to do. Any of the data cleaning that we wanted to do, any business logic testing that we wanted to do on the data and it improved dramatically. And of course, it improved the joins, which we’ll talk about in a while. So the second issue that we faced as we went along was again, still with loading the data. This was the first issue which we had resolved. And we still found that UDFs are running fast, faster than pandas more faster after we convert it into parquet format but it was still slow compared to what we had expected. We cannot still be running an immediate job which runs for one hour or two hours, I would ideally want that to finish within 15 minutes at tops. And we still realize that UDFs were still running slow. And again, then this started another round of research and started another round of things, where we had to go out and reach multiple people and ask them what was the major issue with this. And that is when we realized that PySpark as a whole, and the ecosystem that we were trying to build, we were using the data frames. But we were not defining the schema of the data frame for each of those columns, with the exact data types. And that is what was tripping us up, because now PySpark was spending a fair amount of time inferring the data type of each of the columns and then loading them up in a data frame before executing any excess, any functions that we wanted it to work with. Now, especially with the amount of columns that we had, it was pretty noticeable had we had only maybe 10 or 15 columns, the performance difference would not have been noticeable. But since we had such a wide data frame to work with, the performance was very, very noticeable. And so what we did was try to define the exact data type of each of the data frames that we wanted to work with depending on the category, depending on the joins that will make it and that improve the speed again, tremendously. And this was an additional three X speed upon the 10X speed that we had reported earlier. And this was something which was very good for us, because we realized that we had to work with data frames and define proper schema for each of them. But we also realized that it means that, I cannot do that for 25,000 columns and most of them were MDR had a lot of non-null values. So we had to be very selective about which columns to be using, and how to be choosing them, and also be very rigid and strict in cleaning the data. Earlier if you could have tried to do feature importance on all of the columns, but now we realize that rather than trying to do that, I just be very strict in removing the columns, which are not going to add much information. Because that is also going to add time to my processing, because I also have to define the datatype.

And for that, we had to, of course, write up a lot of custom functions to define these data form data frame column data types, on the go. And that kind of in a way, was before we encountered the third issue, which was another major one, is (mumbles) all the data we had done exploiting others on the data from broadly which were the columns that we were going to use. And we’re broadly aware of how these columns are going to be used. And of course, if I have to run any machine learning model, then I have to go out and join and use all of the features that I wanted. And that is where we encountered an issue with joining the data. Now, I have five broad categories of data, each of them which have been converted to parquet. Each of them which have been cleaned up, which each of them were the dataframe types have been defined. And now when we’re trying to join it, it was again, very, very painfully slow. Where each of those joins were taking anywhere up to three to five hours to try, which was ridiculous. And that was leading to multiple timeouts, multiple retries. And we had to do multiple instances of vertical scaling where I was working with say, a 64 gig RAM machine and if I wanted to speed up, then I eventually would end up with a machine, which is like over 256 gig RAM. Which is again, ridiculous because I could pretty sure, add some more optimizations to my processing pipeline with PySpark, and work with it. And the way we approach this and try to resolve this, was we realized that, Spark hates wide data frames, right? If I’m working with, say two data frames and trying to join them in Spark, each of them say having 10 columns or 20 columns, then it will do it without any complaint. And you’re doing very, very smooth. But the moment each of those contain 1000 columns, or 2000, or 3000 columns, performance starts to degrade massively. And we did an internal exercise where we tried to see plot this drop-off point, as where thus the performance really start to degrade at which column number in a way. And the reason why we did that analysis was to also give us some indication of how many columns can I really work with in my current scenario, and to give me a realistic timeframe. And we realized that the performance slowly started degrading around 250 to 350 number of column columns.

And that was kind of a big relation, because each of the data that we’re working with, even after cleaning, had close to two to 3000 columns. And we cannot suddenly work with 250, 300 columns only.

And what we then did was, we realized that there were a lot of irrelevant columns, which we were still working with, and we ended up dropping all of them. A lot of them were information about a patient’s first name, patient’s last name, those kind of columns, which we ended up dropping, without even thinking. We also talked to our domain experts to realize, which are the usual columns, which would add more value to a machine learning model for this use case, and accordingly, we also chose the broad themes of columns that we had to work with. So what we did was, we had this large data frame for each of those categories, and we broke it up into sub data frames. So we had close to five large data frames, and we ended up breaking each of them into five again. So we ended up with roughly 25 to 30 data frames overall. And each of them were themes like one of them would just focus, on your lab tests done on your testing area. One of them would focus on procedures done, which are invasive procedures. Another would be procedures which required you to get admitted to a hospital and so on. So we created such theme data frames, and then we joined them to these other requirements. This improved our speed of joins and speed of analysis and training massively. And we realized that we had to be less greedy and limit the number of joins that we were trying to do at each point in time. And that was very helpful. And after joining, after cleaning the data, after loading the data, and solving all these problems around data, we now arrived at the problems of the training. And realized that while we’re training the data using the inbuilt Spark MLlib platform,

simple logistic regression or a random forest (indistinct).

And we realized that we were working with the dense matrices. And our features were defined in a way where we were feeding in multiple columns to work with. And that’s something that we realized was pretty slow. And we didn’t want to continue with that. And we realized that it was also impacting my choice of batch sizes, I could not work with very large batch sizes. And the number of iterations that I had to do, before I realized that this is something that is going to be helpful for me. And it was also affecting whenever I was trying to… We were trying to do any kind of hyperparameter optimization service. It was not each of the hyperparameter optimization while you were trying to explore the hyperparameter space is going to take a huge amount of time. And that’s where we came up with a solution for this.

And we realized that we had to now convert these dense matrices to sparse matrices. And the obvious answer to that, was using the VectorAssembler class and by Spark. Now what VectorAssembler does is, it takes in a list of columns, which you want to use as features and converts them into a sparse representation. And, of course, enfolds what kind of representation, would be the best for you. It may be sparse, it may be dense, but most of the time, it ends up giving you sparse representation. And of course, it takes that (indistinct) based on the RAM, based on the amount of data, based on the requirements that you have. But if you want to be more explicit, in defining those sparse representations, you could use something like a SparseVector or a VectorUDT class in PySpark itself. But again, once you have created these, what we did was, once we created these sparse matrices, we just dumped them into a feature files which allowed us to train further using any framework like TensorFlow or PyTorch to work with.

And now once this was solid, now, there was another final problem where we realized that this is a multi-class multi-label classification problem. Which means at any point in time, I’m trying to predict 80 classes together, and one patient may have a single

disease, he may have 10 diseases. And which meant that my decision boundaries that were being created for (indistinct) classes, it is of course going to be very confused in a way. Because there are a lot of the diseases which are supposedly rare in a way, very rare hereditary genetic diseases, which you are not going to find in everyone. But something like a lifestyle disease, like hypertension, or diabetes, you’re going to find it very easily in the population which meant that our predictions for the popular diseases per se, was very good. But for the rare genetic diseases, it was not as good as I would have liked. So what we realized that since this is a multi-class multi-level problem, and was not doing a good job of preventing these rare categories of diseases, I need to kind of improve my prediction methods. And of course, we are running multiple jobs, with Spot instances trying to reduce the cost. But we’re trying to create so many classes out there was not very helpful. And of course, the problem was that we were running with the same set of features for all the categories. We were saying that, “Okay, these are the set of features “That are used to predict the popular diseases, “I’ll use the same set of features “To create my rare diseases.” Which is not correct from a machine learning point of view, Which meant that attribute to custom machine learning and feature engineering for each of these categories and that is what we exactly did. And we realized that if I have to solve this issue, then I have to approach it from a very different way.

And the way for it is to kind of do custom feature generation for each category as I mentioned that, I was treating the popular and the rare diseases together. The best way to do it, is treat them separately, have separate feature engineering pipelines and custom features for the rare ones, slightly different ones for the more popular,

or the ones which have more incidence in your data And then have individual binary classification models running on each category. So from one model, which was trying to predict (indistinct) categories we then arrived at EP models, predicting ATM categories. And then be at the final classification, was then a summation of all of these ATM models, and the classification was based on custom logic, which we had to write, to identify the top end categories. Because of course, there are comorbidities and dependencies that the person has diabetes, that the chances of him having hypertension or diabetes related comorbidities is high. Which meant that I had to do custom property enhancement for if a person has diabetes, and then there is such a percentage chance that this person has a more comorbidity condition as well. So we had to write a custom category logic based on which we identified the top 10 categories. And to do all of this at scale, we ended up writing Terraform and Boto3 sripts to launch EMR multiple clusters for each of these categories. So at any point in time, I could just launch EMR clusters, and which will just do all of the hyperparameter optimization for each of that model. And then dump the final model and the outputs into an AWS S3 bucket where you’d store the outcomes. So this in a way, is a broad walk you through the broad

problem statement that we had.

What was the broad data profile? Why did we choose Spark? And what were the issues that we face with Spark? And how did we solve it? So if I could summarize, the talk and the session here, I would essentially say, the best practices for ML on Spark, and again with the caveat, if it fits the use case that we had prior to resolve, then these would be my best practice summary. Of course, use parquet formats. Don’t use CSV or txt file formats, if you’re working with Spark. Use narrow data frames, Spark hates wide data frames. Try to be less greedy, try to reduce the number of joins. Define a schema for your dataframes. Try to understand what data types you’re working with and try to resolve the data to the best possible format for that. And, of course, before your training, use sparse matrices. There are some use cases where it does give reduced performance compared to dense matrices. But it broadly, it works as good as dense and it reduces your requirement for RAM headache around managing the infrastructure for machine learning massively. And of course, if you are encountered with a multi-class multi-label problem, like the one that we had, it is almost always advised, that you choose a hierarchical style classification using the binary classification approach that I just briefly talked about. And which would of course, if we were doing custom feature engineering research on each categories that we work with. And so that would kind of be most (indistinct) of my session.

You can of course, reach out to me on my email. If you have follow up questions, connect with me on Twitter.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Manas Ranjan Kar

Episource LLC

Manas Ranjan Kar is a Associate Vice President at US healthcare company Episource, where he leads the NLP and data science practice, works on semantic technologies and computational linguistics (NLP), builds algorithms and machine learning models, researches data science journals, and architects secure product backends in the cloud. He's architected multiple commercial NLP solutions in the area of healthcare, food and beverages, finance, and retail. Manas is deeply involved in functionally architecting large-scale business process automation and deep insights from structured and unstructured data using NLP and ML.