Keeping Identity Graphs In Sync With Apache Spark

May 26, 2021 12:05 PM (PT)

Download Slides

The online advertising industry is based on identifying users with cookies, and showing relevant ads to interested users. But there are many data providers, many places to target ads and many people browsing online. How can we identify users across data providers? The first step in solving this is by cookie mapping: a chain of server calls that pass identifiers across providers. Sadly, chains break, servers break, providers can be flaky or use caching and you may never see the whole of the chain. The solution to this problem is constructing an identity graph with the data we see: in our case, cookie ids are nodes, edges are relations and connected components of the graph are users.

In this talk I will explain how Hybrid Theory leverages Spark and GraphFrames to construct and maintain a 2000 million node identity graph with minimal computational cost.

In this session watch:
Ruben Berenguel, Architect , Hybrid Theory



Ruben Berenguel: Hello, and welcome to my talk about Keeping Identity Graphs in Sync with Apache Spark. I’m Ruben, I have a PhD in Mathematics and I’m the Lead Data Engineer at Hybrid Theory, a programmatic company. My preferred stack is Python, Go and Scala and not necessarily in that order, actually. I have developed this talk in three parts, every good talk should have three parts so that’s what I had been told. The first part will be the setup. Then I will introduce what identity graph is and why it’s important and how it solves the problem I have explained in the setup. Finally, I will explain what speed ups are improvements you can have by using Spark correctly or not but cheaply.
So, in this first part, I would explain what ad tech is and why it’s used, what are cookies really because I mean, cookies are really important in web browsing and they are going away and why and how and all that. What cookie mapping is, which is something pretty specific of the ad tech industry. And finally, I would introduce the identity problem, which is what I am trying to solve in this presentation. Now, programmatic ad tech is based on finding users satisfying some criteria. This can be, for instance, a user that has visited a page of a certain category like sports for instance, are interested in concept like a chocolate, everybody’s interested in chocolate or this is more interesting one, are likely to want to buy from one of our clients.
These, as you can imagine, involves some kind of machine learning and there are many models here but it’s one of the situations in programmatic ad tech. Now, we need to find these users. To find them, we need their browse and behavior data because we need to know if they have purchased from our client, for instance, or they have browsed the soccer page or a chocolate page or whatever. Additionally, we need to be able to show ads to them because otherwise, how we can deliver to our clients, which in the end, they want to sell something. This is now with cookies, cookies, the base of cookies that they are used to help websites track events and state as users browse.
What I mean with this? Well, there are two kind of cookies. First-party and third-party. First-party are session and state cookies. And session is whether you are logged in or not. That could be the session information and the state information could be, for instance, that you have something in your checking card or you have some purchase in Amazon, that would be a first-party cookie. Third-party, most of them are used for event tracking, like for instance, Google Analytics, their website analytics for knowing how many people visit your website, that could be a third-party event tracking cookie.
I have some drawings to explain this a bit better. I imagine there’s a user browsing a travel website because we all want to travel eventually. And this website is being served by a first-party server so the owner of the travel website. And on there there’s a small pixel that is being served by a third-party server. And when the user process this webpage, this server is going to set a cookie on the user and the browser. The cookie is a, it’s kind of a file stored in the browser and it’s tied to a user. In reality, it’s tied to the browser but we like to think that there’s a user-browser combination in the end. This will be a third-party cookie. But then when the user logs in into the website, that tend to be a cookie by the first-party server.
And there are two different cookies and they are written by two different servers. So, none of them know about the other. So, the third-party server can’t read the first-party cookie and in reverse. So, in ad tech situation, we get browse data from users on the web from data providers. This is delivered in batch or in streaming, they can be provided in many different ways. It’s a huge amount of event logs. And we have browse data from users browsing our client website. We put pixels on the websites and we drag if the user has purchased something or is visiting a specific product page, anything that could happen.
Another problem, how do we connect these data sources because in the end, imagine that you’re in a situation where you want to find people that are likely to purchase something, you need to see what they do online and whether they have purchased or not, to know that they are positives in a machine learning model. It’s in the identifiers from both cookie providers or in the two providers are different, there’s no way of relating one to the other. And since the servers can’t read each other’s cookies, it’s like they know nothing about them so they are unrelated. And this is where mapping servers and the mapping chain appear, they have more drawings.
We have the same situation, we have a small pixel on the lower right and the user is browsing travel website. This called a third-party server cookie is set into the browser. Crucially, this third-party server relates the user to another third-party server, supports the cookie, different one but calls back to the first server with information of what this cookie identifier is. And this first server or the external one, and this can repeat any number of times. It should then be more than three or four because you don’t want to slow the user experience in the website and this can have a negative impact. And this would be a mapping chain. There’s a relationship between these three identifiers, one, Alpha and square. And this, it serves as a chain because it has been initiated by a server and it has been propagated through all of them.
But now, there’s identity problem. We have a mapping chain or several, which looks like this and now what is a user? With this view of mapping chains you could say, “Okay, a user is a row, just the first one which corresponds to the chain one is the user that has one Alpha square, two circle and gamma triangle.” And this can work, it doesn’t work super well but it’s workable. And this is the basic solution now or what I call the basic solution. So, the problem comes when you try to [inaudible] problematic chains. So, if you have somewhere that you don’t have any information like in chain two, you’re not getting information for partner two. If you saw a further down the timeline another chain with information for this partner and with partner one or three circle, you would just squash it and add information to partner two.
This is what I call the coalesce or merge on nulls. But it’s not as complete as the graph approach because you need to choose. And this choice needs one stable identifier or a business decision and it’s a delicate business decision. And now, I will show a specific example where you have this situation. You have these three chains and at some point in time later, you have an additional one that has a 42, gamma and circle. How do you choose which of the previous one is correct. You can say, “Okay, 42, gamma, circle, that theory on truth and anything I have seen before it’s fake. So, two, circle is wrong and gamma, triangle is wrong. But that doesn’t really suit with reality because here you are making the choice that 42 is the correct one and that overrides any other information.
And by using identity graph, you can keep all of the identifiers together in your problem, which means that you are able to map across providers, which means that you have more users you can target and more users you can analyze on that. In the end, in this situation, data is money and you don’t want to lose it. And here is where the entity graph appears. We have a problem that we don’t know how to solve and if we re-think it as a graph, we can solve it and that will require the computation of quantitative components and in particular in big data, because we’re having data conference, we don’t have any interest. So, imagine we have again, this situation with some chains. Now, re-think it as a graph.
So, these are nodes, we then just connect them. We can think them of directed because somebody has directed the chain. But it’s not necessary, I mean, we could ignore it and think as undirected graph. We remove any information that’s useless like all the others slashes. I know we have these and we can ask, “Okay, what’s a user now before they went rows?” But if we think it as a graph, each component is a user. This is better because when you have the situation of adding this problematic chain, if you think it in isolation it’s problematic. But if you think it is part of the graph, it’s connected to the graph, it had the information to chain three and connects gamma and circle. And now, it means that if each connected component is a user, it means that this extra chain says that, “Actually, users two and three are the same so you can address them with any of these identifiers.”
If we had gone to the other solution, the one that you pick one of them, you will still have three users and you would lose this connection. So, you will not have as much information. Now, we need to compute connected components in a big data situation. I mean, you can imagine that we have many millions of events. The graph we compute has between three and four billion, thousand million nodes and with the Spark product. That’s where we use GraphFrames. Spark has a basic graph framework which is called the GraphX and a message-propagation graph-parallel low level API. The way actually, the GraphFrame developers tried to think of it is that GraphFrames are to DataFrames as a GraphX is to RDDs.
So, RDDs are really useful if you want to do something really low level but they definitely are more convenient and they are better optimized automatically. Likewise for GraphFrames, you can use GraphFrames, you will certainly use GraphX. When we consider what do use for solving this identity problem, we look to what else we have available, I mean, we had the Spark but there were more solutions like Apache Giraph or Neo4j or Neptune, which I think it was released either when we were starting this project, just before or it was pretty much the same thing. The problem with Giraph it was harder maintenance because it’s a bit of an old project, it was harder to find developers so we discarded it.
Neo4j and Amazon’s Neptune, they’re more databases and graph databases are optimized for different problems than a graph computation engine so it’s like to think of GraphFrames. So, if you have a problem that involves search in a graph like all the nodes related to specific node, Neo4j or Neptune or there are many other solutions are better. But if you want to compute something in a really large graph, I think GraphFrames is probably your best bet, I’m [inaudible] as well. All of these, of the GraphFrame power the analytics or processes or APIs, need you to write graph in a specific way. And actually most databases, graph databases expect some form of these shape in what you have sources and destinations.
So, you are storing the data as edges and mutters nodes themselves. And you can prefix the information, for instance, in the case I’m displaying here, we have a source, destination and some additional information, which is in this case is a timestamp. And the way we encode it is particular for the entity problem. I mean, if you have some other kind of graph problem, you would write in a different way. Now, what we need to do to compute connected components in big data? Well, luckily for us, GraphFrames has a really good algorithm, which is called large star, small star. It’s rather recent, I think it’s 2016 and it has been partially superseded by a little bit more performance algorithms, which are for instance, partition aware connected components and union file shuffle.
But GraphFrames has the implementation of large star, small star and that’s the most convenient one because it’s already there, it works and it’s performant. I need to, it’s interesting because it’s based on taking the components and converting them to stars or as I like to think of them are cartwheels or a bicycle wheels because you have a center component and there are radius that go outside. And I will explain how that works because I think it’s super easy to understand and it will make it a bit more easy to see what the whole problem is. So, you can fire off like this one. And we’re saying a random numbering, this can be done here in Spark super easy with monotonically increasing ID, for instance. And to help us as humans understand the algorithm and only there are arrows here from large to small. This is there for us because I want to explain this and it’s easier if we have the arrows.
Now, for the large star part of the algorithm, we need to focus on a node, they just don’t pair node in parallel so it’s ideal for Spark because it can be parallelized per node, which means that it can be easily distributed and its neighbors. And what is the large star doing? What is large? So, what it does is that it destroys edges and recreates then pointing from large to small. In this case, it destroys the edges existing from 10 to seven and points large 10 to three, to the smallest one and gives the seven three. And we do that for all the nodes in parallel and you’d end up with this, easy, large. And now we go to the small star step, which unsurprisingly, it’s still not parallel so you’re focused on a node and its neighbors and it’s called small star.
Like the previous one, it’s going to point small to smallest, so it connects two to one. We’ll repeat all the nodes. And now, we iterate with large star again. And when with this, and this is a star component because it has a sender, one and all the others are pointing out, that’s what’s the maybe of a cartwheel or a bicycle wheel dimension. When you have a graph in this situation, it’s super easy to plot, to store it as a connected component because basically, number one is the connected component ID and you know that all of them are pointing to that one. And the output will look like this, you have the component ID and all the connected nodes to this component and additional information you may have. In this case, we had the timestamp because of cookie considerations.
When you have a graph like this to map from one partner to another, any of them, it’s super easy because given an ID in the form Partner_A_X, which is the encoding we were using, you look for the connected component ID that has this node and you check all the components, sorry, all the nodes in this component that have the shape Partner_B_* and that’s it. Now, you can go from the identifier X for Partner A to anything else for Partner B, which may be one, which was the assumption in the easy solution to this problem or many. There’s no problem to having many identifiers. What was the impact of moving from an ad hoc process like the coalescing of the chains to a graph process? Because we went from one to the other several years ago.
The first one, this was significant, integrating a new partner, a new data provider with a cookie sinking and mapping and all the previous things I have mentioned went from around two month in development time to one week, not because before we need the business rules and when we have the graph situation, all partners are equivalent. There are no rules, it’s just basically you add an identifier to a list and you iterate one more time when you are processing the data. So, it makes it at least the one week basically to the time it takes to confirm everything works before deploying. We had an uplift on users mapped. I don’t really explain what I mean with users mapped here because it’s a bit delicate to explain and it wouldn’t fit the theme. But we saw a significant uplift, 20% may look small or large, depending on your problem. I can tell you 20% was huge for this problem.
And more importantly, the quality of our mapping and again, I’m not defining quality, but you can imagine through that users mapped made us competitive with the industry leaders in this mapping situation. We compare data with them as an information exchange. And we got pretty much, we’ve had in some cases, more users and they’ve had in some cases, more users. We went within maybe within five and 2%, which was our daily boost on our capabilities because we didn’t need to pay them to get this data because we’ve had them ourselves. Now, when you are doing this kind of computation in Spark, there are some tips and tricks that can make it way, way more performant, cheaper, faster, pick one, pick two.
The first one is data cleanup, it’s probably going to have computations as where data cleanup I’ve seen it as more important aside from machine learning. And the first one would be removing invalid identifiers. And why is data as a problem? large star, small star, it’s an iterative algorithm. And it has a really hard problem when you have really large components. It’s a bit … it’s not technical but if you think about how the algorithm works, it basically iterating until the component has been squashed into a star. If you have a component with a lot of identifiers, this takes a long while. If you remove invalid identifiers, you are less likely to have large components because imagine that you have something like NA or zero or XYZ because a connection to a provider database has failed.
Millions of users could have this at some point in their cookie lifetime. So, it will mean that the connected component with NA as an identifier would be huge. So, you need to make sure that you are removing these or fraudulent calls to the mapping server or many things. These are easy to find but they are the first line of defense. Another one is pruning nodes. You don’t want components to be very large because it makes the algorithm slow. In the case of cookies, this is easy because cookies expire. So, a cookie you haven’t seen in a specific number of days, in practice this can be between 30 and 120 days, you are unlikely to see again, it’s a usual task, clean cookies at some point. So, you can expire them and remove them and this keeps components within a reasonable size.
Finally, you can actually destroy component. So, if you saw a component that had a lot of IDs, that’s fake, something went wrong there, there was a data leak somewhere or a board is creating IDs that shouldn’t be there so you just remove any component that’s too large. And it’s too large will depend on your problem. I know they’re the kind of the crux of the matter, so what is the fastest way to build a two-billion nodes graph daily? Well, actually yo don’t do that. You don’t build a two-billion graph every day because that would be too large and too costly. You follow an easy way with just incremental way. How will you process these incrementally? You have an existing graph, somehow. Maybe you have started incrementally from zero or you computed everything from the graph somehow, just paying for the large cluster.
And then, you got the new chain identifiers that appear one day. That’s the situation on the right. You run small star, large star on the new identifiers and you get a new star component. And there are two kind of new star component, those that have so many identifier in common with the existing one and those that don’t. And they are different in spirit. And I like to call them, well I explained them here, so we have the ones that are connected to the old ones and those that don’t. That’s the right, the one highlighted, left those that don’t. And I call the one on the left clean because they don’t have any connection with either old or new it’s just they are unconnected and detained at once.
Because we have seen new data that makes them different, they are no longer a pure component, they have a pure star components, they have a an extra connection. If we think of this extra connection as a new edge, which it actually is, why don’t we just run again, large star, small star on the data component. We have a slightly larger component and we just merge them. And that we end up with a lot of stars but we have computed the large star or all of them twice but only on around 10%, it will depend again, on your problem. But in our case, it’s less than 10% of the full graph, which as you can imagine, that’s a huge difference.
Finally, what can you do in Spark to make the computations fast for large graphs, for connected component? Again, depending on the situation, these may not apply. Third, you should go large and tune up the machines. The process is memory hungry because it has, since it’s a [inaudible] and it has a lot of joints to compute the local identifiers. Since it’s joint-heavy, it’s shuffle hungry. So, if you go for large machines, on few of them, you are going to have less shuffling. And you should leave executable memory because otherwise, you are going to have [inaudible] out of memories. When you have these large machines, you should enable adaptive query execution. Adaptive query execution user runtime statistics to help the cost based optimizer. The cost based optimizer helps optimize joints and it’s the only that’s joint-heavy.
If you can help the cost based optimizer, you are going to help your algorithm. In AQE, you don’t do that and how much? Well, if you use Spark 3.X and enable AQE, you have 30 or 40% speed up, which means a 30, 40% reduction in cost, which is impressive because you just need to add the setting to your Spark config. I mean, you can’t get much easier than that. There are some further improvements that we haven’t explored actually because one, because even if it’s easy, it has a low impact, we’re just moving the storage to Delta Lake. Delta Lake has several improvements to some kinds of joints and in our case, it will improve the computation of the tainted component. But it should have a really small impact on runtime. So, we haven’t even bothered.
Harder one with a higher impact would be implementing union-find-shuffle because the improvement of large star, small star. But this will be harder because large star has more stars already implemented wall-to-wall, has been running in production for several years without a problem. So, we are wary of going there but that’s an option we are considering. And if your data doesn’t fit an incremental model like ours, it maybe your best solution actually. And that’s it. Thank you for the presentation, I hope you have many interesting questions that I haven’t answered. You can get this slides from my GitHub repository, the repository is called identity-graphs and it will be linked somewhere I’m pretty sure. And all your feedback will be welcomed.

Ruben Berenguel

Ruben Berenguel is the lead data engineer at Hybrid Theory, as well as an occasional contributor for Spark (especially PySpark). PhD in Mathematics, he moved to data engineering where he works mostly ...
Read more