This talk presents how we accelerated deep learning processing from preprocessing to inference and training on Apache Spark in SK Telecom. In SK Telecom, we have half a Korean population as our customers. To support them, we have 400,000 cell towers, which generates logs with geospatial tags. With these logs, we can analyze network quality for a certain cell tower and estimate real-time population in the region by counting the number of connected devices to the cell tower. In order to predict network quality for a cell and population for a certain region, we developed a deep learning based prediction model, which requires to process almost 2 million logs every second and produce prediction results for each cell tower and region. To efficiently handle this huge computation, we focused on optimizing deep learning data pipeline.
First, we tried to optimize deep learning preprocessing by using a new in-memory data store for Apache Spark called FlashBase. Preprocessing is done by reading the ingested data from FlashBase and main operations are processed as Spark’s RDD transformation operations, while some of the aggregation operations are pushed down to FlashBase and these operations are accelerated by using vector processing with Intel’s MKL and AVX-512.
Second, the preprocessed results as Spark’s RDD format are directly delivered to an open source Analytics and AI Platform called Analytics Zoo without any data conversion. Lastly, Analytics Zoo takes the RDD as its input and executes deep learning inference and training operations using TensorFlow models (within Spark’s executors in parallel). These operations are processed by using Intel’s MKL and AVX-512 vectorized operations. By doing so, we could create orders of magnitude faster data pipeline for deep learning based on Spark and Intel Cascade-lake CPUs than the legacy architecture with pure Pandas and Tensorflow.
– Hello, my name’s Hongchan.
I’m from SK Telecom, which is the largest telecommunication company in Korea.
Today, I’m going to introduce our collaboration work with Intel, focusing on our network quality analysis and prediction in this case.
Our presentation is going to follow this order. First, I’m going to introduce a quick demo for network quality analysis visualization. Next, I will briefly cover network quality analysis in SK Telecom, and our own in-memory data store for Apache Spark called FlashBase.
Then I’ll explain how we (indistinct) preprocessing by using vectorization.
From this part on, I’m going to hand it over to Jason, who is a Senior Principal Engineer in Intel and a (indistinct) PMC member and the creator of BigDL and Analytics Zoo. This demo will visualize mental quality analysis visuals at points with different colors on the map and dynamic charts on the right side.
With the points to the blue color it means the better quality and the red color it means the worst quality.
If we move the location or chose the different time spans, graphs and charts are updated right away, according to each location and time span ranges.
The dataset has 100 million records.
(indistinct) is querying more data to aggregate and visualize the points and charts.
This instant update is possible because we accelerated quality processing by using aggregation push down and vectorized processing. OK, let’s see the demo.
Next, I will briefly introduce our use case and our memory data store for Apache Spark, named FlashBase, by summarizing last year’s talk.
As I told you earlier, SK Telecom is the largest telecommunications company in Korea. Our company has half the population of our country as our subscribers.
To support them, we have three thousand, 300,000 cell towers and each cell tower (indistinct) devices that connect our subscribers to our core network, generates a huge volume of logs in a very short time period, like 10 seconds. Each log has geospatial tags, according to the cell tower’s location.
In 2016, we tried to build a real time network quality analysis system which can ingest 1.4 million records per second, meaning 120 billion records everyday.
At the same time, we need to return the network quality analysis report in a few second for a specific time, for a specific location. Looking back, it was challenging requirements which could not be solved with big data technologies at that time.
First, we try to solve this problem, based on how to Hadoop File System and Apache Spark, but Hadoop File System cannot efficiently handle numerous partitions with very small site data, reflecting specific locations and specific time.
So, we try to create a new data store through Apache Spark named FlashBase which can support much more partitions with much smaller data size than Hadoop File System.
So, we gathered the best known open source software. First, Apache Spark for (indistinct) space and query engine.
Redis for DRAM key-value store and RocksDB for SSD key-value store. And, SSDs for short latency and small-sized parallel I/Os. I thought that we could easily be the new data store by just integrating them together. However, it has been very challenging to breathe life into a collection of those softwares and make it as a working memory database system.
But, we have achieved it for the last four years.
As a result, FlashBase has the following features which are mostly record for commercial in-memory database systems such as recovery replication, scale out, and so on. Especially for ingestion performance, FlashBase can ingest 500,000 records per second in a single node, and each industrial operation is handed in an automated way.
If partially our data store can store billions of partitions by taking all the bounties key value store and in memory storage devices.
This can be used to greatly reduce the (indistinct) space for big data queries with better predicates of high cardinality columns like time spans and locations.
This is a query example of network quality analysis for a specific time and for a specific location. By using extreme partitioning and push down filters, we could finally reduce the query time for 0.1 trillion records to one second.
So far, I have introduced our own in-memory data store for Apache Spark. From now on, I’m going to introduce our (indistinct) use case. Since SK Telecom is a telecommunication company, we need to predict various network quality indicators for anomaly detection and real time network inframanagement.
We first try to net our quality prediction in five minutes by using (indistinct) sequence-by-sequence model. But, the model could not predict cellphone peak changes very well. Therefore, we came up with new memory augmented model. It can learn from the periodic changes of past cellular patterns by using its own memory.
For the details of this model, you can refer to the last year’s talk.
This is the test result of our model. Like the sequence-to-sequence model above, below our memory-augmented model can predict the sudden changes very well.
In the latest architecture of data pipeline for team learning, we had inefficient bottleneck points from data source to deep learning inference.
Such as data exportation to disk files, generalization, and deserialization with different data formats. To address this problem we collaborated with Intel and implemented this new architecture of pure in-memory data pipeline based on FlashBase and Intel Analytics ZOO.
Through this pipeline preprocessing and deep learning inference and training can be integrated into just one single Spark cluster. Furthermore, the whole pipeline is based on Apache Spark’s RDD without any collect operation.
After preprocessing job is done by FlashBase, (indistinct) are delivered to Intel Analytics ZOO, as Spark’s RDD.
And, Analytics ZOO (indistinct) inference operations with vectorized profiting by exploiting internal CPUs SIMD instruction set. Consequently we could reduce the end-to-end inference time substantially, and more details will be covered by co-speaker, Jason, later in this talk.
OK, so far I’ve covered our team learning use case.
Now, I’m going to explain how we accelerated in learning preprocessing by using aggregation push down and vectorization.
So, what is push down? It means offloading computations to low-level computing modules, which is closer to the original data stores. In other words, it means near data processing.
Currently, Apache Spark can push down all the data predicates and project it close to (indistinct) data sources.
If aggregation queries are executed by push down filters and projected (indistinct) to the data source first. And then, the only (indistinct) data retrieve to the Apache Spark executors. Finally, Spark executors handles the aggregation operations.
But, what if aggregation can be done in the external data sources? That push down (indistinct) can substantially reduce the data size to be transferred.
With aggregation, push down Spark executors can retrieve only preaggregated widows, thereby greatly reducing the data size, the way the processing and data shuffling.
More over vectorized prophesying in the data source can utilize more than CPU’s native SIMD, S.I.M.D. instruction set such as AVX, AVX2, and AVX-512 which can process up to 16 proteam point data in a single instruction, at the same time.
This is our design for aggregation push down. Because the original Apache Spark does not provide aggregation push down feature, we use Apache Spark’s Catalyst optimization rules.
We define the custom optimization rule named “PropagateAggregationRule” and appended it to the original set of rules. This custom rule is executed after finishing the original optimization rules in the Catalyst optimizer.
This is an example query to retrieve, count, and average height of engineers, grouped by years. The original plan is executed in the following order. First, “relation” plan builds RDD from the data source. Then, the Logical/Relation defines the Attributes in the data source. Next, the data of RDD are refined by the “Filter” plan and the columns are selected by the “Project” plan.
Finally, the pruned and filtered data are aggregated by the “Aggregate” plan.
In order to push down aggregation operations, we created numeration, class, name, RelationForAggregation. And, we placed it with the original Relation class. Then we made new logic for Apache Spark to be able to push down the expression trees of GROUP BY and AGGREGATE Functions to the new relation. This looks simple, but we face many problems to impress, implement this idea. Because currently Apache Spark is not designed to, designed to conveniently push down the aggregation equations.
In more details, our custom rule transforms expression trees into Attribute and Wrap them inside a Project Plan.
Finally, the new RelationForAggregation gets the push down expression trees for aggregation en capsulized by the Attributes in the Project Plan. And, the Logical/Relation is also changed, reflecting the updated attributes in the Project Plan.
And, the path at the bottom right side of this slide shows the query explained (indistinct), which shows the final Optimized Logical Plan, customized by our new optimization rules.
Inside our data source, FlashBase handles the push down aggregation operations and the expression trees. FlashBase has a columnar data structure, having similar data layout to Apache Arrow. So, the computation operations can be easily vectorized.
FlashBase makes groups and for each group, aggregation equations are executed as related by the internal CPU’s SIMD instruction set.
OK, this is the test results.
This is the end of the training data set for the network quality prediction with eight major index columns and 0.7 billion records.
Using this dataset, we conducted our experiments, and the test result is like this. For data normalization in-depth learning, we need min, min, minimum and maximum aggregation values for the entire dataset. As you can see in this table, MinMax operations are isolated by aggregation push down and vectorization, and the performance gain was up to eight times.
Next, in order to produce five minute time interval average values from the original log data with 10 second, 10 second time intervals, grouped by aggregation operations are required. Since the group by aggregation require to create a hash table inside the FreshBase data store, the performance gain was reduced. But, it was still about two times faster than the original version. This is an initial reroute and the performance keeps improving.
And also if we break down the performance gain, for the, the data normalization tasks, current performance gain from vectorized processing with SIMD instruction set was about 1.5 times.
The dominating factor for acceleration is from aggregation push down result which is five times, by reducing the data transfer size and later computations.
Theoretically current SIMD instruction set can enhance performance almost up to two to three times. We discovered that the smaller gain of our vectorized processing was from an inefficiency in FreshBase scan operations. We are addressing this issue now, so we are expecting enhanced performance gain from vectorizations soon. OK, that’s all for my presentation. From this part on, I’m going to hand it over to Jason. Thank you.
– Thanks, Nate. So as was mentioned by Nate, we have been working with SKT to enable the unified entry in the big data antics and pipelines, for SKT the network quality prediction. So, that’s just the entire data analytics and the deep learning training and inference pipeline can run on the same Spark cluster, scale out transparently across a lot of cluster, and everything is done in memory. First I talk about the unified software architecture we have building with SKT using Analytics Zoo. At Intel, we have been focusing on bringing AI deep learning to big data.
We have open source Big DL, which is a distributed deep learning framework for Apache Spark. With BigDL, people kind write to normal Spark code just like they use (indistinct), but then now they can use big data to write new distributed deeply applications on Spark. Yeah, this has been the year that we have also opened a new project called Analytics ZOO, which is a high level software platform building on top of those different frameworks and like TensorFlow, PyTorch, Keras, and as well as the other, the big, big data distributed frameworks like Apache Spark, Ray, (indistinct) The goal here really, bring all of the deep learning technologies, whether it’s TensorFlow or PyTorch through the big data platform and (indistinct) integrated in a distributed fashion.
So, this slide gives you a very quick overview of Analytics Zoo. As I mentioned before, it is built on top of all those different frameworks like TensorFlow, PyTorch, (indistinct) as well.
And, utilize Spark frame Ray essential to build a distributed entry in the index (indistinct) Python. Inside the Analytics Zoo, there are three layers. Its bottom layer, there are the so called integrated index and the Python, which is a horizontal layer. Allows usage to apply the AI deep learning models, like TensorFlow or PyTorch on Spark in a distributive fashion. On top of the Python layer, there is a automated ML workflow layer, which provides automated processing for many of the (indistinct) tasks. For instance, we have built a autoML patch for time series analysis.
And, then the top layer, there is also a set of build team models, such as the Recommendation models, Time Series models, and so on people can directly use in underlying workflow and Python. Because, one can also use (indistinct) TensorFlow or PyTorch models directly (indistinct) as well. So, Analytics Zoo is a open source project and you can refer to the Github link on the slides.
So, let’s look at some concrete example, the, on what we, what we mean by applying deep learning or AI models, into your Spark into pipeline in a distributive fashion. This is one example. Actually, this is used in SKT’s use case.
What allows you to apply distributed TensorFlow models directly write TensorFlow code in line with PySpark code which can then run distributed across your Spark cluster. As you can see, user can use any Spark code to process the data to get a RDD of dataset or data frame. And, then, take that RDD data frame, we provide a API basically take your distributed dataset across the cluster and then conform it into a, essentially RDD of TensorFlow (indistinct) all in memory, all distributed across the cluster. And then you can use any standard kinds of a code. In this case, we are using some Slim code to build a TensorFlow model. And after that, you can use the API provided by Analytics Zoo for distributing our inference. Under the hood, what is actually replicated is the TensorFlow model distributed across the cluster. (indistinct) will feed the partition of TensorFlow (indistinct) we just provided, we just prepared previously, and feed that data into a TensorFlow model, so we can compute a local gradient (indistinct).
And then, we also provided in-memory, our reduced layer, which is built on top of the RDD Brock cache layer. That’s a Spark Brock manager to do, to perform in memory our reduced practicing (indistinct). So essentially, we can transparently run the distributor training, our inference with your standard TensorFlow model and, which uses your Spark data processing and, whether it’s IDD data frame or the input data.
Another example is for instance, a lot of the users are rarely, you know, Spark users, right, (indistinct) built the pipeline in the Spark data frame, and the Spark ML pipeline. And, we allow those users to directly use deep learning models, for instance, Keras models, in this example, inside the Spark ML pipeline.
So, let’s look at the SKT’s use case, As Nate mentioned, after you get out the data stored in your big data cluster, in the Lightning DB, in this case. And, then you can process your data in Spark and Spark SQL. Previously you will need to somehow export your data.
Maybe exporting to a CSV file and preprocessing and there’s a convert it to a different GPU cluster. So, you move the data across, between those clusters. And then, your process your data, maybe you need (indistinct) and then run the AI training and the inference on those data. So essentially, you need to manage multiple frameworks, multiple separate clusters, and then you have basically two workflows, two separate workflow which you need to use some (indistinct) to tie them together.
We don’t work with SKT essentially to transform that into a, essentially a unified and empty end architecture. So, you can have run the preprocessing in Spark SQL, everything in the memory, and then you can get a set of RDD clusters. And after that, use Analytics Zoo to take the TensorFlow model and runs the distributed training inference of those RDD of TensorFlow (indistinct) in memory. So as you can see, there is a large benefit of running a unified end to end architecture in Analytics Zoo. First of all, you reduce a lot of overhead in terms of (indistinct), and then secondly it makes the entire pipeline much more productive because you, essentially you’re writing all the program and the one Spark job or the one Spark program, and it can run on your Spark cluster and everything is automated.
So, let’s look at some of the performance improvement by using Analytics Zoo compared to using a separate GPU cluster. On the left side, there, you see the comparison of the inference performance. As you can see, as we move from a separate GPU cluster to a integrated pipeline using Analytics Zoo, on a single note, I can, we can get about 3X speed up in terms of the entry in the inference pipeline, and then Analytics Zoo allows you transparently scale out your inference pipeline. You do not need to change any code. We leverage the Spark for transparency runs inference pipeline on three nodes and they give you additional (indistinct). On the right side, there is a comparison, comparison of the training performance.
As you can see, unit one server, one, one CPU server, and with the training performance is actually about the same as a one GPU (indistinct). And then again, the benefits of Analytics Zoo, it allows you to transparently scale out your training pipeline. Again, users do not need to change any code. It just can transparently scale out to a, to a distributive fashion. And, you use string node in this case, which again gets about more than 3X speed up. So, we have been working with SKP on this end to end Spark analytics and deeper, me in particular, distributed TensorFlow in a integrated and unified the pipeline using Analytics Zoo. So, as a future work, we are looking for additional solutions to try to improve the time series analysis in Analytics Zoo.
Project Zouwu is a new project we just kicked off. It’s an open source project built on top of Analytics Zoo. It provides a, it provides a high level application framework for usage, to easily develop their time series analysis applications. But as I mentioned before, Analytics Zoo provides other integrator pipelines, a ML workflow, as well as built-in models. On top of that, we have put together a set of, time series use cases and built-in models for time series analysis, as well as a model, culture, AutoTS, which is actually a AutoML framework specified for time series analysis. So, it’s automatically generated features that models and pure and hyperparameters for your time series analysis. So, Project Zouwu is an open source project. You can check out the Github link on the right bottom. Essentially it allows you to easily build your distributed time series analysis pipelines, including feature engineering, as well as deep learning models with (indistinct). And, in particular, we build a lot of use cases for common Telco applications: network KPI forecasting, anomaly detection and so on. And, it’s optimized and scalable using Spark running on top of our largest class of Xeon servers. So, it integrates out of the library, the TensorFlow, PyTorch, MKL-DNN, and scale out, transparently scale out the cluster. So yeah, that’s a future work. I mean, something, some work in progress we are currently working on and you’re welcome to look into details on our website.
Team Leader and Creator of FlashBase (distributed in-memory data store for Apache Spark, optimized for DRAM/SSDs) in SKT R&D Center A Ph.D. and software engineer in data engineering for big data and machine learning systems. Published top tier conference and journal papers including VLDB, IEEE TKDE, and Information Systems.
Jason Dai is a senior principal engineer and CTO of Big Data Technologies at Intel, responsible for leading the global engineering teams (in both Silicon Valley and Shanghai) on the development of advanced data analytics and machine learning. He is the creator of BigDL and Analytics Zoo, a founding committer and PMC member of Apache Spark, and a mentor of Apache MXNet. For more details, please see https://jason-dai.github.io/.