Scoring at Scale: Generating Follow Recommendations for Over 690 Million LinkedIn Members

Download Slides

The Communities AI team at LinkedIn generates follow recommendations from a large (10’s of millions) set of entities to each of our 690+ million members. These recommendations are driven by ML models that rely on three sets of features (member, entity, and interaction features). In order to support a fast-growing user base, an expanding set of recommendable entities (members, companies, hashtags, groups, newsletters etc.) and more sophisticated modeling approaches, we have re-engineered the system to allow for efficient offline scoring in Spark. In particular, we have handled the ‘explosive’ growth of data by developing a 2D Hash-Partitioned Join algorithm that optimizes the join of hundreds of terabytes of features without requiring significant data shuffling. In addition to a 5X runtime performance gain, this opened the opportunity for training and scoring with a suite of non-linear models like XGBoost, which improved the global follow rate on the platform by 15% and downstream engagement on LinkedIn feed from followed entities by 10%.

Watch more Spark + AI sessions here
Try Databricks for free

Video Transcript

– Hello, everybody. Thanks for listening in on our session today. I’m excited to be presenting back at SPARK SUMMIT, and I hope everybody’s staying safe and healthy in the current environment. Today we’re gonna talk to you about how we’ve achieved scale for our Follow Recommendation system, which is one of the most data intense systems at LinkedIn.

Our Follow Recommendations, power, or give recommendations for members, to follow or connect with multiple entities and we’ll get into more detail about that later on in the talk. Today you’ll get to understand or get more detail about how our Follow Recommendation system works and also about our innovative 2D Partitioned Hash join algorithm that allowed us to achieve scale.

Today, we’re gonna be going over an overview of Follows Relevance and objective that we’re striving for when, in our Follow Recommendations product. And then I’m gonna hand it over to Emilie. She’s gonna get into more detail about our Offline Scoring Architecture and dive deeper into that work, through the Hash-Partitioned Join algorithm that allowed us to achieve scale.

So then to start it off with Introduction of Follows Relevance system.

Product Placements

Before we get into actually the detail, I’d like you to experience the recommendation system at play so if you can take a moment opening up your app or going through to through your browser and clicking on the my network tab, which is one of the main placements, our, Follow Recommendations appear in, they do appear in a lot of other placements, but this is the most prominent one. So you can get a feel of our system, how the recommendation system works.

Communities Al

Follow Recommendations is actually owned by the Communities AI team. And the mission of our team is to empower members to form active communities around common areas of interest so they can have conversations. We’ve distilled the problem into three main areas. One is Discover, which is empowered by the Follow Recommendations and will be the main focus for our presentation today. The second one is Engage, to allow members to join conversations after they’ve followed entities that share their interest. And then finally, when the members are ready to contribute to assist them to reach the right audience, for example, using the appropriate hashtags. There’s detail about what our team does in the blog post I’ve linked below, and also our recsys industry session presentation.

So the Follow Recommendation system, like I said, recommends entities which can be members or audience builders, pages like company pages, events, hashtags, newsletters, groups, to our members or the viewers of those recommendations.

Discover: Follow Recommendations at Scale

Now we have hundreds of millions of members and millions of entities. So this creates a key challenge or a scaling challenge. We have hundreds of trillions of possible pairs. So the (stutters), Follow Recommendation system allows us to navigate this scale and recommend the best entities or the most relevant entities for a member to follow.

Recommendation Objective

The way we define what is most relevant or the best entities or other words our recommendation objective, is consists of two parts. First, we wanna recommend entities that the member finds interesting. So that means, we wanna increase a probability a member or a viewer is gonna follow an entity, given that we’ve recommended that entity to them, and then want them to engage with the content that’s produced by that entity after the member follows that entity.

So that act of following or forming an edge, actually creates a lot of content that shows up in members’ feed and that actually those follow edges contribute a big chunk of engagement that we see on our LinkedIn feed. So it’s a very important piece of our content ecosystem.

More concretely the mathematical formulation for our own Follows objective function, has the two components probability for Follow and the engagement or the utility after a follow edge is formed and there’s a tuning parameter that allows us to trade off or to combine those two quantities in an optimal way and that’s controlled by our alpha parameter pair. So this score is what defines the final ranking that the viewer or the member sees, from the followup recommendations.

So with that, I’ll hand it over to Emilie to talk, to give you more details about our Offline Scoring architecture and how 2D Hash-Partitioned Join algorithm. – All right, thanks Abdul for this introduction. So in this section, I’m gonna talk about the Offline Scoring Architecture that we had in Follows Relevance and the scalability challenges that we faced. So first of all, in Follows Relevance. We need to generate recommendations for the entire LinkedIn user base, so more than 619 million members, and we need to make the distinction between two types of users and I’m gonna explain why in a bit.

Active vs Inactive members Recommending entities to follow for every LinkedIn member

So first, we have the active members who are LinkedIn users who have performed recent actions on LinkedIn and the inactive members on the other side are either brand new users to LinkedIn or users who have registered in the past, but haven’t really used the website or the LinkedIn app, recently.

And so obviously for active members we have a lot of data about their past activity. They have a full profile, so we can leverage that to generate personalized recommendations. However for inactive members would love to generate those personalized recommendations as well, but because we don’t have enough data, we have to fall back to the segment they belong to, to generate recommendations. So obviously to generate personalized recommendations we need a very heavy Spark offline pipeline to precompute those because the recommendations are gonna be keyed by viewer. On the other hand for inactive members, the offline pipeline is a lot more lightweight because the recommendations are keyed by segment. So in the rest of the presentation, we are gonna, we must be focused on the scalability challenges related to the generation of personalized recommendations.

Scoring Architecture

But before that, I wanted to show you, what the end to end pipeline looks like for active members and how the personalized recommendation, integrate into these pipeline. So you can see first that we have, the pipeline is split into the offline parts, where all the pre computations happen in Spark and everything at the top of the slide in yellow is everything that happens online in real time, whenever a client calls us to get full of recommendations for viewer X. So we run the offline flows on a regular basis, and we push the recommendations to key-value stores, which make the communication between the offline and the online. And so you see here in red where, the pre-computed personalized recommendations are pushed. And so whenever a client calls us, we first query these key-value store to fetch the personalized recommendations. However, I think it’s interesting to mention that we do not exclusively use those personalized recommendations because obviously those are pre-computed and so do not take into account the very recent activity of the member. And so, in addition to those recommendations, we also fetch the, what we call contexts of the viewer, which are real time signals of the, the activity that the viewer just performed. So for example, if the viewer just interacted with the post or just followed an entity. And so based on those contexts that are queried in real time, we fetch additional precomputed recommendations, which are context based.

And so at the very end, we merge those two sets of recommendations. We do a final scoring filtering and blending and return those ranked recommendations to the, to the client. So once you have the understanding of the whole end to end pipeline, now in the rest of the presentation will focus on the, on the pre computation of the personalized recommendations, that happens offline.

Feature Categories

So let’s take a look at the different feature categories that we use to generate those recommendations. We have three main categories of features. The viewer features, the pair features and entity features. The viewer features reflect the general consumption behavior of the viewer. So for example, how many times the viewer comes to the website when he or she sees updates? How many times, we interact with the, with the updates, and in addition to that, we also leverage the profile information of the viewer. So industries the viewer belongs to, country he lives in and so on. Similarly on the entity feature side, we have the similar features except that we focus on the creation side. So how many times an entity posts something on the feed, for example, how many viewers interact with those updates, and so on. So we have a fairly small number of viewer features that we generate, medium number of entity features, but the most valuable, and, the most valuable features that we generate are the pair and interaction features, and those are generated in very large number.

So I’m not gonna go into the details of which pair features were generated, just, I did some examples here. So basically we can use the viewer entity past engagements, but we can also derive viewer entity affinity features based on graph data or based on browse map scores.

We also leverage embedding features and many more.

Joining Features How to manage the explosive growth of members / entities

So with that in mind, so we need to generate those features for all viewers and all entities. So we have millions of distinct active members. We also have millions of recommendable entities, including all the entity types that Abdul mentioned previously. So members, companies, hashtags, newsletters. And so this means that potentially we would need to generate pair features for trillions of possible viewer entity pairs. These would require a huge amount of resources, and this is obviously not possible for us. So as a first step, we apply your candidate selection based on some pair feature values. I’m not going into the details of the candidate selection, but at the end we still end up with hundreds of billions of viewer entity pairs, which is still a lot. So the main challenge becomes how can we join all those features together and meet an acceptable runtime performance?

Ist Option 3-way Spark Join

The first very naive option is just to do a 3-way Spark join so you have the three features tables. And so you could first join the viewer features to the pair features, because those tables are not pre partitioned, the default join is the shuffle Hash join in Spark that will be done on the viewerid key. And these would generate hundreds of terabytes of shuffle. And once you have these first intermediate table, then you would also need to apply your second join with the entity features. This time using the entityid key, and this again generates hundreds of terabytes of shuffle. And on top of that, this join is very skewed because some entities are recommended more often to viewers, for example, the popular entities. And so these creates a lot of skewness and some tasks running longer than others.

So this option is what’s prohibitive for us, but it was too expensive.

2nd Option Partial Scoring with Linear model

So the second option, which is the one we initially adopted in Follows Relevance is the Partial Scoring using a Linear model. So let me explain what this is. In the picture you see, you recognize, you recognize at the top, the three features table, member features, pair features and entity features. And so the idea is instead of joining all the features together, to try to reduce those inputs as much as possible, so that the way, the 3-way join becomes manageable. So the idea is to directly score the viewer features in a separate job to generate the score. And so the way we do that is by, because we have trained a linear machine learning model, in our case, logistic regression, we can extract the viewer specific coefficients from the model and directly score the viewer features to generate a single viewer partial score. We can do obviously the same thing for the entity features, generate a single entity partial score and for the pair of features and so instead of having to join all the features, now we only have to join tables that have two or three columns. So basically the viewer id and the partial score. And so this is a lot more lightweight record, requires a lot less memory.

So this is a manageable 3-way join. This is an acceptable solution but it has some disadvantages. First of all, there is a lot of scoring overheads and a lot of intermediary outputs that are generated. Each partial scoring job, outputs its data towards HDFS and then we’ll load those again to join the scores. And we have to apply these pipeline to each single terminal objective. So if you’re using the PFollow scoring, but we do the same thing for utility scoring, and it becomes more complicated with the number of models that what we need to score with. But maybe the most, the main constraint that we have here is in terms of relevance. We are forced to score with the linear model. So because we need to do a partial scoring, we cannot score with nonlinear model. And so this was a big constraint for us. And so a year ago we started the big refactoring project of the offline scoring pipeline, trying to make it more scalable, but, above all, trying to, remove the constraint of scoring with linear models only. And so this was made possible by the, by the adoption of the 2D Hash-Partitioned Join.

Goal: Avoid huge shuffles

So let’s quickly go back to our main, challenge here. So we have those three tables, hundreds of billions of viewer entity pairs, and we need to join all those features together. And so we’ve seen in the previous section that the main bottleneck is the very large and wide table of pair features with very skewed entity distribution. So we cannot really manage to shuffle those data. This is really prohibitive. So in this case the new question becomes, can we manage to join all the features together but without having to shuffle the pair features twice? Once on the viewer key and the second time on the entity id key.

And so we have developed a solution in collaboration with our Hadoop team at LinkedIn, which is the 2D Hash-Partitioned Join.

2D Hash-Partitioned Join” Partitioning of the 3 feature tables Viewer Features Entity Features

So the key idea behind the 2D Hash-Partitioned Join is, to do a smart partitioning of the three features tables. So if we look at the picture on the right, imagine that we have re partitioned the pair features table and for each single pair partition, we are able to figure out which is the corresponding viewer partition that we need to load into memory and similarly the right entity partition that we need to load. This way, we just need to load those two partitions and perform a local join for all the, all the records in the pair partition. So this is the idea behind the 2D Hash-Partitioned Join. So how do we partition those tables? For the viewer and the entity features table this is quite simple. We just apply classic hash partition. So we partition the viewer table into V partition, the entity features table in E partitions, and for the pair features table it’s slightly more complicated. So we partition it into V times E partitions but we use a two dimensional custom partition function on both the viewer id and the entity id, which will allow us later on to join on those two keys at the same time.

So how do we pick E and V? So basically depends on your use case, but basically you need to pick those large enough so that, every member and entity partition can be loaded into memory. So obviously it depends on your cluster resources, your input data size and the executor memory that you wanna set in your, in your job.

There are a lot of, optimizations that are performed under the hood in the 2D Hash-Partitioned join. So if you wanna check out the blog that was written, I added the link to this slide, if you’re interested.

So just to illustrate, this partitioning logic, so for the viewer and the entity table, this is kind of simple. You just take the hash key of the viewer id and the entity id respectively. For the pair table, we compute the partition number using these two dimensional partition function. And so if you look at this equation, basically we notice that for each pair partition P, we always have a single corresponding viewer partition and entity partition, and those can be found using those simple formulas. So P divided by E and P modulo E.

So let’s take a quick example, If you look at the right side of the slide, so let’s take partition P equals 120. I can find the corresponding viewer partition equals to V1 the corresponding entity partition E20. And, now that I know that, I can just load those partitions into memory, in the single executor and perform the local join.

So with that in mind, now that we have partitioned the three tables, how does the join algorithm work? So we first launch a mapper for each pair partition. So this is done by just calling dot map partition, in the Spark dataset API. And so each mapper, is gonna load the entity partition as an in-memory hashmap using the formula that I described in the previous slide to find the right partition to load. And similarly, we also load the corresponding viewer partition. So either into a hashmap or another optimization is to use the stream reader and I’m gonna talk about it in a minute. And so once we have those two partitions loaded, for each pair features record in the pair partition, we can look at the entity features record by entity id and do the same for the feature, the viewer features record and join the three sets of features together. And so now that all the features are brought together, we can score those right away before storing to HDFS. So just to go back to the stream reader optimization, once you, when you read partition the viewer and the pair tables, you can also sort by viewer id, so that when you’re, so that you don’t have to load the viewer partitioning to memory, but you can use a stream reader that iterates through the viewer id and so this way you only have to load the entity partition into memory and you can just use the stream reader for the viewer partition. In our case, we decided to use the stream reader with the viewer table because it’s larger, although we have fewer features, we have a lot more viewer ids than entity id, so it’s longer.

New Offline Scoring Pipeline

And so after we adopted this 2D Hash-Partitioned Join and refactored our scoring pipeline, this is what it looked like on the right, on the left side, you have the old scoring pipeline. And so the first improvements that we see is first that we do not need to shuffle the pair features table during the join anymore. Now the tables are pre partitioned, and we can just apply the 2D Hash-Partitioned Join. The second improvement is that we do not have intermediate data anymore stored to HDFS. This whole logic of joining the features together and scoring those with our machine learning models happens in a single Spark job and so we save a lot of disc space, obviously.

And last but not least, the main improvements that we got from this refactoring and this 2D Hash-Partitioned Join introduction is the ability to score using a nonlinear model, such as XGBoost and so, this model takes into account a lot of interactions between the futures and brings a lot of the relevance improvements.

Gains After adopting 2D Hash-Partition Join

And so to summarize the gains that we obtained after adopting the 2D Hash-Partitioned Join, so first in terms of runtime performance, we improved our Cost to Serve, measured in Gigabytes hours of the offline scoring pipeline by five X, which is a lot. And in terms of HDFS storage, we saved more than eight X of disc space. And in terms of relevance, to being able to transition from a linear model to a nonlinear model. So logistic regression to an XGBoost model after running an AB Test, we measured the total number of follows going up by 17% and the downstream engagement after following those entities was by 11%, which is a big win for us.

All right, so that concludes our presentation. Feel free to ask questions in the chat, if you have some.

I added my contacts and also Abdul’s, if you have follow up questions.

I really wanna thank the LinkedIn Hadoop team and in particular, Fangshi Li for implementing the algorithm and helping with its adoption in Follows Relevance. And finally, I just added two links to some interesting blogs. The first one, talks about the challenges that the Communities AI team is trying to solve. And the second one is more engineering focused, it talks about the various scalability challenges at LinkedIn across various teams and the different solutions that were developed to try to solve those. And so it includes some details about the 2D Hash join in particular.

Watch more Spark + AI sessions here
Try Databricks for free
« back
About Emilie de Longueau


Emilie de Longueau is a Senior Software Engineer (Machine Learning) in the Communities AI team at LinkedIn, focused on driving member engagement through personalized and scalable Follow Recommendations for hundreds of millions of members. She has 5 years of industry experience in Data Science/Machine Learning and building Big Data solutions and algorithms using Spark. Emilie holds Master's degrees in Industrial Engineering and Operational Research, from the University of California (Berkeley) and Ecole des Ponts ParisTech (Paris). Her expertise in Apache Spark has helped her team modernize its offline scoring infrastructure to improve scalability and relevance of Follow Recommendations.

About Abdulla Al-Qawasmeh


Abdulla Al-Qawasmeh is an engineering leader who has managed artificial intelligence (AI) and big data teams. He has held engineering leadership positions at several companies in diverse industries including: Social Media, Financial Technology (FinTech), and Marketing Intelligence. He currently leads the Communities AI team at LinkedIn, which uses AI to help build communities centered around areas of interest on the LinkedIn platform. He holds a Ph.D. degree in Computer Engineering. His Ph.D. research focused on optimizing scheduling in large-scale data centers.