When machine learning models are productionized, they are commonly formed as workflows with multiple tasks, managed by a task scheduler such as Airflow, Prefect. Traditionally each task within the same workflow uses similar computing frameworks (e.g. Python, Spark, and PyTorch) in the same backend computing environment (e.g. AWS EMR, Google DataProc) with globally fixed settings (e.g. instances, cores, memory).

In complicated use cases, such traditional workflows create large resource and runtime inefficiency, hence it is highly desired to use different computing frameworks in the same workflow in different computing environments. Such workflows can be named as superworkflows. Fugue is an open-sourced abstraction layer on top of different computing frameworks and creates uniform interfaces to use these frameworks without dealing with the complexities associated with them. To this end, Fugue can be viewed as a superframework.

In addition, Kubernetes (K8S) is a container orchestration system, and it is easy to create different computing environments (e.g. Spark, PyTorch) with different docker images as everything is containerized in K8S. It is natural to combine K8S and Fugue to create superworkflows for complicated machine learning problems. In this talk, we use a popular graph neural network named Node2Vec as an example to illustrate how to create an efficient superworkflow using Fugue and K8S on very large graphs with hundreds of millions of vertices and edges.

We also demonstrate how to partition the whole Node2Vec process into multiple tasks based on their complexities and parallelism. Benchmark testing is conducted for comparing performance and resource efficiency. Finally, it is easy to generalize this superworkflow concept to other deep learning problems.

Jintao Zhang, Software Engineer, Machine Learning, Square Inc.

Jintao Zhang: Hello everyone. My name is Jintao Zhang. I’m a software engineer in machine learning from Square. Today, I’m going to talk about Superworkflow Concept on Graph Neural Networks with Kubernetes and Fugue. Fugue is a new open source framework and I would give more detailed introduction later.

So first I’m going to give an introduction to the business context of graph neural networks, more specifically graph node embedding. We use one of the most representative graph embedding algorithms, Node2Vec, as an example to illustrate the problem in more details. Then we splitted Node2Vec algorithm in multiple steps and focus on Word2Vec embedding step, which is the bottleneck of the graph embedding computing. So we introduced the Fugue framework briefly and then introduce the concept of superworkflow to reformat the computing workflow of Node2Vec. Finally, we presented some benchmark testing results on Node2Vec with different Word2Vec measures and conclude this work with a summary.

So let’s provide some background on introduction. So what are knowledge graphs? So knowledge graph are knowledge bases forming into graphs. There’s some more good example out there like Google Knowledge Graph, is a webpage graph for search engine. Social graph in Facebook friends, Merchant graph in e-commerce. For example, transaction graphs, buyer-seller relation graphs. So graph learning is a complement to traditional machine learning and it’s focused on graph topology to find critical business insights.

On knowledge graphs there had been a lot of research conducted using graph neural networks to solve critical business problems, especially in e-commerce and FinTech. Some example are fraud detection are very critical for e-commerce and FinTech. The reason graph neural networks can help is that Forrester tend to form a cluster in their activity graph. Another example, the cross-selling and recommendation. Basically, with node embedding, you can easily get item similarity and customer similarity from graph embedding. So they’re helping. Whereas graph neural networks published in the past few years. For example, Node2Vec is a transductive node embedding algorithm on relatively static graphs. And the GraphSAGE is an inductive algorithm on the lamp graphs. We were focused on Node2Vec for its high popularity.

So node embedding is a common type of graph embedding and is to map each graph node to a low dimensional space. So in this context, nodes with same or local neighborhood will have similar embeddings. Only graph topology matters in the embedding process. I’ll be showing this diagram below.

However graph embedding is very complicated. Think about in image embedding, images size are fixed. Your word embedding text is linear and a has fixed sizes with a sliding window. But in graph embedding, graph node numbering is arbitrary with very complicated structure. So the process of Node2Vec, including a few steps. First, create a simple graph and a index every node. Then you have to compile random walks of a given length, starting from each vortex. Then you have to conduct embedding using Word2Vec by treating the random walk task as texts. As you clear that the computing workload it huge on large graphs and Word2Vec is a critical components of Node2Vec.

Our original Node2Vec paper provides long disputed person code, which can only handle very small graphs. But introduced our distributed Node2Vec algorithm in Spark, the entire graph for the storing memory. And then we use adjacency list to represent this graph in a distributed way. And we’ll implement a distributed Breadth-First Search algorithm for random walk. And to this end, our distributed Node2Vec implementation can work on large graph with hundreds or millions of nodes very efficiently.

So to create a graphing memory, we just need to distribute graph storage to many computing instance for random access or nodes on edge. The challenging part is that a deep traversal on large graphs can quickly drain the stack. Another important thing is indexing. Indexing is to convert node labels to a set of a sequential integers. The goal are three-four fold. You can significantly reduce a memory and storage usage. You can also provide a better load balancing and a data partitioning. Also, indexing is also required at the embedding step.

Random walk is a critical step for Node2Vec. The general connection of node path which would be served as a text for Word2Vec embedding. During the process of random walks, walking, there are set of strategies to determining how to hop from one vertex to another in the graph. This strategies controls how to generate the task represented the original graph. From this perspective, you can view Node2Vec as an extended Word2Vec algorithm on graphs.

So as we have observed the Word2Vec embedding set up step is the bottleneck of Node2Vec. We start to explore Word2Vec embedding in PyTorch.

There are some existing Word2Vec implementations. For example, Gensim is a Python package and it has a Word2Vec module, but it can only handle very small graphs like a 10,000 or 50,000 vertices. SparkMLlib has a Word2Vec module, but is not a fully functioning. For example, it does not support a [inaudible]. It has some hard limit on the number of nodes on the graph it can handle. And the round running time performance is not very impressive and either require some large improvement on some large graph.

At PyTorch implementation overcome all the problem falling in SparkMLlib, and it only requires a small CPU cluster or large GPU instance. So this diagram shows the PyTorch Node2Vec process. It requires a Word2Vec pre-processing step to quickly generate all the relative information and data before moving to the PyTorch embedding printing step.

So the Word2Vec pre processing step contains a set of task. For example, a count and a normalized order frequencies, remove rare words, index words, and conduct negative sampling. All this task are highly parallelizable. And it can be accelerated by distributed computing.

So the embedding training step is a iterative process with a maximum number of iterations that contains multiple for loops inside. A GPU is critical for runtime performance in this step. And distributed computing a lot of much help because either you iterative have to the next step is rely on the previous steps results.

So finally, we can create a Superworkflow by running each step in the different computing frameworks with different degrees of parallelism. Each step also have very different time complexity. For example, the random walk step will requires substantially more CPU resource on large graphs. Here, Superworkflow is a workflow with packing and computing can begin at different frameworks that for example, Python, Spark, PyTorch, and Dask. Also this four steps can be managed by a single service instead of four different services. So it’s very important [inaudible] the parallelism and resource at different steps. This diagram summarize the Superworkflow of Node2Vec. So the graph creation index step require a median sized spot cluster with maybe a couple hundred, of course. The random walk step will require substantially bigger cluster with over a thousand of course, and the Word2Vec pre-processing will require a medium to small size cluster with like a 100 or 200 cores maybe. The final step PyTorch embedding and training will requires a lot GPU computing instance is filled with cluster.

So I will be continue to explore how to create a Superworkflow for Node2Vec.

To create a workflow, Superworkflow, for Node2Vec. A convenient tool is Fugue, which had been open source in the github, under the name the Fugue Project. Fugue is a pure abstraction layer and can unify a simplified core concepts of distributed computing. If you have a Python pipeline with very limited codex change, you can use the Fugue to run it in multiple competing frameworks. For example, Python, Spark, Dask. Fugue can also have request a compute and resolve the front Kubernetes for this company and frameworks. Here is a print screen of the Fugue Project. You can give a try by searching the Fugue Project in Google.

So here is an architectural Fugue framework. At the top of different computer frameworks, we first build an abstraction layer called an execution engine that could be a Pandas or Spark or Dask or Python. Then it’s an abstraction of basic operations on top of this engine, it will include us a map, partition, join and a sequence select statement, some example. Then on top of the execution engine, we use direct acyclic graph to describe the workflow is very similar as the airflow it’s back to manage the workflow. And we added programming interface and built-in modules extension to the system. For example, save, load, print in our frames. On the very top level, we provide a new way to express your workflow like Fugue SQL. So also we also have a machine learning and streaming feature in Fugue.

So Fugue also monitoring your execution DAG. They can automatically parallelize independent branches in your work workflow, and also automatically persist your intermediate results. In addition, Fugue workflow is [inaudible] testable. So more errors can be captured during the testing phase. So you can feel faster, you feel have [inaudible] in you’re Spark pipeline. You’re also able to resume execution with checkpoint points, author a pause or failure.

So this diagram demonstrate the architectures of the Node2Vec Superworkflow as a very [inaudible] we have a scheduler like airflow to control the sequencer, which is step. We have four steps here. At each steps we use Fugue to connect path information and degrees of parallelism and a stop in their resource allocation request to Kubernetes along with the proper Docker images. For example, we want a Spark cluster with a Saldon course, a three terabyte memory for one job and the very button or the Kubernetes containers will be taken from the primary cloud providers, example AWS or Google Cloud. Through there, managing the Kubernetes engine like EKS or GKE. There’s a whole host. This thing is very neat and simple, but in the very powerful distributed computing for complex graph their own networks.

So we also have some hyperparameters for Node2Vec to tune. So there is quite a lot of legend. The number of random walks for each vertex, length of random walk, some probability of return the original node or get to a new node, maybe do a hopping from one to another one. The public could control the search behaviors more like a brief first search or deep first search. We also have a few hyper parameter from Word2Vec step, like 90 window size and the minimum word count at threshold, the number of iterations, optimize orders. Hyperparameters require huge CPU, huge [inaudible] workload. So fortunately that hyperparameter tuning is a parallelable even iterating tasks.

So now we’re going to show some testing results on Node2Vec, given different workflows and Word2Vec methods.

Here I have first, introduced the computing environment for conducting testing does your traditional set up of interactive development environment with Jupyter notebook, so your Spark is used. You’re already a standby Spark cluster, if required. Our case made out of Fugue as an abstract computing layer and the only entry point to interact with any of the computing frameworks. So our cluster, it appear on top of Kubernetes and each user can start and stop their own cluster in their own notebooks session. The latency to start a new Spark cluster is on second level. So these are purely on demand and affirmeral so we don’t use any standby centralized cluster. The mission that with AWS, you left, your file says, “Monitor each part, EFS.” Update of code and dependencies will be available in a new Spark cluster immediately. So this is a lot [inaudible] kind of interactive environment to the Fugue and the Kubernetes. So we use this system to conduct a lot of benchmark testing.

This set up curves demonstrated the testing results, but using Fugue to do a benchmark testing on a coordinate Node2Vec computation in words computing frameworks. So here we have Python, we have Dask, we have Spark, local Spark cluster. So the X-axis, number of vertex in the graph confirm a hundred to a million, the Y-axis, the run time in seconds. But generally this graph used some random graph generator with the like a 5% of the probability for every two, every two vertex to have a edge. So it’s like a random graph. So this blue curve is a run time performance, a Node2Vec code from the original author and all the curve off was the performance of the same Fugue code run in different execution engine. We have four, three options, Dask, Spark, local Spark cluster. For the graph with 1 million vertices, the Fugue version on Spark is significantly faster. So this diagram explain why Fugue can be called a super framework. [inaudible] can invoke a set of computing frameworks with rarely the code change.

We also conduct testing on very large graph with unified Spark Pipeline and the [inaudible] Word2Vec using the SparkMLlib. for graph with the 10 million vertices and 300 million edge. The Node2Vec process take about three hours with 500 cores and three terabytes of memory. For even larger graph, with a hundred million vertices and three billion edges, you take about eight hours with 2000 cores and 12 terabytes memory. So each [inaudible] the Word2Vec embedding step takes about 45 or 50% of the total run time, apart you the only 5% of all the computing resource.

So with the same graph, we conducted some benchmark testing by running the Superworkflow using Spark on PyTorch on Kubernetes. For the graph with the 10 million edges, and 10 million vertices, the Word2Vec embedding step takes only about 30 minutes with the instance with the 32 CPU’s and the four GPU’s that’s a representative of a 66% of run time reduction from 1.5 hours to 0.5 hour. For the graph with the hundred million vertices, the Word2Vec embedding step takes about one hour on a larger instance, it was a 96 CPU and a 16 GPU is just the required. This represent a 70% run time reduction from 3.4 hours to one hour.

So with the same two graphs, the cost saving of using Superworkflow is even more significant for the graph with the 10 million vertices, the Word2Vec embedding step, because it will reduce from 750 CPU hours to 20 CPU hours, plus two GPU hours. This would represent more than 90% cost reduction, often including the cost of the Word2Vec pre processing step. Similarly, on the graph with a hundred million vertices. The cost saving is from 6,800 CPU hours to a hundred CPU hours and a 16 GPU hours. Also more than 90% cost reduction. So there’s run time and a cost saving will be a substantially amplified if you’re considered a huge cost on hyperparameter tuning.

So in summary, so when you introduce the concept of Superworkflow using the Fugue framework on Kubernetes. Fugue and Kubernetes are the two critical dependency on developing such efficient and superworkflows on complex graph neural networks. But use Node2Vec as a case study to show how to create a superworkflow step by step and demonstrate its advantages on run time and cost reduction. [inaudible] that the idea of superworkflow can be generalized to other complex deep learning problems.

So the Fugue base Node2Vec can be open-sourced in github under the Fugue project and can be a pip installed. The pack also contains a few end to end example for you to quickly wrap up.

That’s all for my talk. Thank you. Finally, your feedback important to us. Don’t forget to rate and review the session. Thank you.

Dr. Jintao Zhang has obtained PhD in machine learning, and has been working in various internet companies with extensive experience on delivering end-to-end solutions on large-scale machine learning p...

Read more