The abundance of data as well as regulations protecting people’s privacy created a need for protecting private and personal information in a scalable and efficient way. Personal data includes sensitive and private information such as health records, banking transactions and frequent locations. One of the challenges of data anonymization is when the data anonymity increases its usefulness for analytics or research decreases. This paper provides an implementation of Top-Down Specialization algorithm for data anonymization in parallel using Apache Spark which aims to balance data utility and data privacy. Performance evaluation is done on large datasets of up to 20-million rows in a variety of different cluster environments. The talk analyzes the different speedups achieved using different data sizes. It also discusses changes made to the algorithm to improve performance such as determining partitions size, determining what should run on the driver and what should run on the executor as well as scale-up experiments of the algorithm. Web page for the topic proposed including slides, code as well as the research paper I wrote is here: micophilip.github.io/comp5704/

– Hello, everyone. My name is Macarious Abadeer. I’m a senior software developer with IQVIA, and today I’m going to present “Top Down Specialization Using Apache Spark.”

So today, we’re going to go over k-anonymity purity, which this work is based on, followed by an overview of top down specialization algorithm, how the data was pre-processed, and how the algorithm was adopted for Spark, the test environment that I used, key results from my experiments, and key takeaways that I learnt. So before I begin, I would like to acknowledge the two papers that this work was based on. The first one was “Top-Down Specializaton “for Information and Privacy Preservation,” by Fung and Wang from Simon Fraser University along with Philip Yu from IBM Research. And the second one is “A Top-Down “k-Anonymization Implementation for Apache Spark,” which was the first paper that attempted to adopt this top down specialization algorithm for Spark, where I base my work, and I aim to assess. So what exactly is k-anonymity? So k-anonymity basically is a privacy-preserving, data publishing theory that aims to protect personal data before a dataset is shared for secondary use, such as research or analytics. So it basically states that a dataset is called k-anonymous when for every record there’s at least k minus one records with the same quasi-identifier values. So quasi-identifiers are basically those attributes that when we look at them on their own, they cannot identify an individual, but when they’re used collectively, they can identify an individual. So if we look at this dataset, for example, we see that there are two females with grade 12 education who live in Nepean, two other females with an associate degree, who live in Kanata, but however, we do see three unique records that can identify an individual. So an adversary, for example, who with a pre-knowledge of existence of a male from Orleans with a master’s degree, will be able to tell that they make $50,000 a year. So in this case, the quasi-identifiers are the education, gender, and city, and the income is what we call a sensitive attribute, which is the attribute that we’re trying to protect. So how can we make this dataset satisfy k-anonymity? We notice, for example, that the bachelor’s, master’s, PhD can be generalized to the same parent value, which is post-secondary. We also notice that Chapel Hill, Orleans, and Beacon Hill are all three neighborhoods in the Ottawa East area. So what we can do is that we can cross out these six values and then replace them with their parent generalized value, which in this case is post-secondary Ottawa East for education and city. So now, the same adversary will not be able to tell which one of these three records are the male with a master’s degree from Orleans, so therefore, they will not be able to tell what their income is, and this dataset is now to anonymous, so that means for every record, there’s at least one other record with the same quasi-identifier value. So what exactly is top down specialization then? So it is a way to achieve this generalization that I was talking about, but in a way that aims to balance data utility and privacy. Because as the data becomes more private, it becomes less useful for research or analytics.

For every quasi-identifier attribute in the dataset that we wish to anonymize, there is a corresponding taxonomy tree. So a taxonomy tree is basically a hierarchy that represents the classes that all the distinct values belong to. So what we’re looking at right now, for example, is the taxonomy tree for the education attribute, so the root node is the most generalized value and the leaf nodes are the distinct values in that dataset. So top down specialization goes through each level of that tree, which we’re going to call an anonymization level from now on and it aims basically, to keep specializing values, starts with the most generalized, starts from the root node. It keeps specializing the values until k is violated. So until the data is very specialized. At every level, what we’re going to also call an iteration so every anonymization level is an iteration in the algorithm. For every anonymization level, it calculates what we call a score. So the score basically comprises information gain, per privacy loss. If I were to specialize this attribute to this anonymization level, how much information I’ll gain versus how much privacy will the dataset lose? So for example, I pick all the values that belong into the left subtree and I generalize all those to without post-secondary, and on the right subtree, I take masters, for example, and doctorate and all these values from the leaf node on this right subtree and generalize all those to post-secondary. I make the calculation, calculate the score and I do this for every quasi-identifier attribute and the attribute with the highest score is the attribute that will end up specializing and then keep going all the way from the top down until k is violated and then we stop and this will be the our k-anonymous dataset.

So pre-processing of the dataset involves removal of all the non-quasi-identifiers so that our dataset is basically narrower. So from now on, I’m going to refer to quasi-identifier as QIDs. So those quality QIDs and the distinct values of SAs which is the sensitive attribute in our example, was income, these are all grouped together with the count. Also to make it a little bit simpler, instead of using the numeric value of the income, we’re going to use a binary value for less than or equal to 50,000 versus over 50,000.

Once we have that as you saw from the top down overview is that this is a very iterative algorithm. So we are going to go through that tree over and over again for every anonymization level and can calculate that score. So it would be neat if we can find the parent that this particular distinct value belongs to without having to traverse the tree over and over again in runtime. So it’d be nice if we can do that actually in constant time. So in order to achieve that, well, I’d do what I call building a path map. So we’re going to traverse the tree in a breadth first manner. So we start from the top and we’re going to maintain a queue on the left hand side along with the map on the right hand side. That map is basically the child parent mapping for every child and what their parent is in the tree, and the queue on the left hand side, it will contain the node that I’m traversing as many times as that they have child nodes. So in this case, for example, in the first level, I’m traversing the node any so I insert any twice to that queue because any has two child nodes, without post-secondary and post-secondary. And then I keep moving to the next level and when I traverse without post-secondary, I pop or dequeue one element from the queue, and this represents the parent of this particular node. So on the right hand side, I have the child parent mapping, so I’m saying okay, without post-secondary parent is any and then I traverse post-secondary and then I pop or dequeue another element from the queue, and this also represents any or it represents parent for this particular node, so I have without post-secondary parent as any and post-secondary parent is also any. While I maintain that queue, you notice that I inserted without post-secondary three times because without post-secondary had three child nodes, and I inserted post-secondary three times because it also has three child nodes. And it keeps growing through all these levels until I end up with an empty queue and child parent mapping as you can see on the right hand side, so for example, preschools parent is without post-secondary, and so on and so forth. Now with that map, I can recursively go through it and build a path basically from all the distinct values all of the way to the top. So for example, if I’m interested in finding the path for a ninth grade element, I know that its parent is junior-secondary, whose parent is secondary whose parent is without post-secondary whose parent is any, but since we’re traversing that tree from the top down, so I simply reverse that path and now I have the ninth grade. In order to get to the ninth grade, I grew out from any to without post-secondary to secondary to junior-secondary, and then to the ninth grade. So now with these path maps, I’ll be able to tell the anonymization level that I’m at, for every element in constant time because basically it’s a key lookup with a ninth grade, which is the value in the dataset, I’ll be able to tell that the parent right away is any and then without post-secondary and so on and so forth. So how is this adopted for Spark? So basically, before we start, we needed to generalize all the quasi-identifier to the root of the anonymization level.

So any is always the root for all the taxonomy trees. So in this particular example, you will see here that education, gender and city are all generalized to the top of the corresponding taxonomy tree which is any in this case. I’m also maintaining the aggregate for every collection of quasi-identifier. So for example, the count or the aggregate of records with all any is 21, which is basically the whole dataset.

Once I have that, then for every anonymization level, I need to calculate that score which basically involves the information gain and privacy loss. It uses a formula whose parameters are plugged in using these aggregates. That’s why we need all these aggregates so that we can calculate the information gain and privacy loss at every level. So we have education for example, there are seven records that can be generalized without post-secondary, we have 14 records that can be generalized to post-secondary, and we do the same thing for city and we have the corresponding aggregates for both east and west.

And now, once we have that, we need to score the best option. So all the anonymization levels level values are aggregated at every partition, and then they are merged into a single-row table with the totals. So I need to basically compare the score for education versus the score for city using these aggregates. So now I have here the city and I compare the aggregate with education.

And then once I have all the aggregates, I need to basically merge all these aggregates from all these partitions into one row data frame that has all these aggregates. So the y and the n represent the aggregate of the records that can be generalized to any that has over than 50,000 row as a sensitive attribute, which is the income in this case, and the n represents the second value of the sensitive attribute, which is less than 50k. So for example, if I start from the top there is 21 records that can be generalized to education any there are 12 records that can be generalized to education any with over 50,000 as the income, non-records that can be generalized to any with less than 50,000 and so on and so forth for every node as well as its children. So the idea is that we wanna know what the score is or what the information gain, privacy loss if we were to specialize this any node to its children, which is in our case was without post-secondary and post-secondary. So we do this for all the quasi-identifier attributes, I only included the example from education because as you can see this will be a very wide data frame so I only included the aggregates from education as an example, but this will be for all the quasi-quantifiers that we aim to compare. Once we have all these numbers, these are basically plugged in to the formula that calculates information gain and privacy loss and now I have that one final number that tells me what’s the score if I were to specialize any from the education attribute to its children off without post-secondary and post-secondary and then do the same thing for all the other attributes. Once I have my best option, I need to then re-iterate before I re-iterate, let’s say for example, that city is the highest score anonymization taxonomy tree, so I need to basically remove the root and take its sub-children and add them to other anonymization level set. So in my next iteration, I’m no longer comparing two anonymization level trees, I’m comparing now three, which is basically the sub-children of the highest score taxonomic tree, along with whatever other trees that have in the set that basically did not have the high score. So I keep doing this until basically k is violated and now I have my final score, sorry, my final dataset with all the specialized attributes using these iterations.

So how is this adopted for Spark? So basically started with a pre-processed dataset that was transformed to partitioned data frames, so the pre-processed dataset is the example that we saw earlier, which basically has these group by and count dataset. And then we start partitioning this dataset to n number of partitions, I’m going to go over the partitions as well but we basically go over n number of partitions, and we calculate the aggregates or sort of prepare the sum map functions, which is a built in Spark function to calculate the aggerates for every attribute that we are looking at to calculating their score and then once we have these map functions, then we collect to one single-row data frame that we saw here. And now I have all these total aggregates collected and now I have my parameters that I can plug in to the formula and calculate the score for every attribute and for every anonymization level and we use the Spark’s toLocalIterator function in order to come up or in order to collect these aggregates to one single row.

So I ran multiple experiments basically to assess the parallelization of this algorithm or how well it parallelizes on Spark. So which was the the main actually outcome of this project is that assessment to see how well it parallelizes. So before I begin looking at the results, I need to tell you what my test environment is. So basically, I used an OpenStack cluster with 32 gigabyte of disk, eight gigabyte of RAM, and four virtual CPUs per node. The k was always set to 100 and I ran these experiments on one node 2, 4, 8 and 16 nodes in the cluster. I also used dataset sizes of 250,000, 5, 10 and 20 million rows. The Spark manager for these experiments was the stand-alone which comes out of the box, version 2.4.2 at the time with Scala 12 and Java 8 and the dataset is called the adult dataset, which appears to be a very popular dataset for privacy preserving algorithms research. So basically, it’s very similar to the examples that I use. It has demographic data of education, city marital status, gender, all sorts of different demographic data but it was only 32,000 rows, which was obviously not enough to assess any parallelization data, sorry, experiments, so I enlarged it by generating random values from the list of distinct values in the original dataset to come up with these different variations of different sizes, the dataset. In my experiments, I also used eight categorical QIDs that were tested, that we aimed to assess its score and anonymize. So the first experiment that we wanted to look at is what’s the n number of partitions that we can use to achieve the best runtime.

So for this experiment, I used the eight node cluster and the 5 million-row dataset. So as you can see here, we ran this experiment eight times. The best performing was 32. In this case, if you recall my cluster, every node had four virtual CPUs or four virtual cores.

So the best performing number of partitions was 32, while default was 200. So as you can see that the best performing was a lot lower than the default that Spark came with. So these experiments were necessary in order to see what the best performing partition is. And this number was always used for all the experiments. So when I ran one experiment on one node, I used four on two nodes, I used eight as the number of partitions and so on and so forth all the way untill my 16 node experiment. The next one was assessing the speed up.

So I ran the speed up tests on all the four dataset sizes that I mentioned. So the first one was 250,000 rows dataset test. So as you can see, the bottom line here is the optimal runtime. So you would expect if you double the number of nodes, your runtime will be cut in half because every node is dividing the work, but as you can see here, this was not achieved with the small row dataset of 250,000-row.

The actual runtime was far away from the optimal and as a matter of fact, going from two nodes to four nodes did not improve the runtime at all. And going from eight nodes to 16 nodes also had a flat runtime, the runtime was not reduced at all. So there are two theories that could explain this behavior. The first one is that when your partitions are very small, the overhead of the partitioning and the network traffic is it basically outweighs the benefits that you will get from the runtime because every partition is very small. The second theory is that all this dataset was partitioned over randomly generated unique ID, so it’s possible that a custom hash function could have helped in these small size datasets to achieve better parallelization.

As you notice here, as we keep increasing the dataset, The gap between the actual and optimal runtime gets very close together. So for the 5 million-row dataset, it’s very close to optimal but not quite yet, because as you can see also from going from eight nodes to 16 nodes the runtime was flat, it did not cut in half. The story gets a little better if we keep increasing the dataset. So for the 10 million-row dataset, it got even closer and even better for the 20 million-row dataset, which is the closest that we could get to the optimal as you can see from this graph.

So the scalar performance is basically another way to interpret the previous slide or the previous results for the 20 million-row dataset. As you can see over the all, the 100% increase in the dataset size only resulted in like 55% to 65% increase in time and this could be also explained just by the speed up graph is that going from 10 million to 20 million-row now every node is doing a lot more work than before and that’s why the runtime did not significantly increase, and as a matter of fact, remains almost virtually flat, if you compare it with the dataset size increase. So what did I learn from these experiments? So there are multiple key takeaways that I learned.

The first one is Spark is actually best suited for iterative algorithms, such as Top Down Specialization, and it was actually a research paper that I came across during my literature review when I was working on this project that compared MapReduce framework with Spark for iterative algorithms, and it found that Spark performed up to five times better for iterative algorithms, because you can take advantage of all the caching and partitioning features that come with Spark out of the box. The second point is that there’s no magic number for partitions. It has to be analyzed for every algorithm and for every dataset size. I also came across a lot of literature that were saying, twice the number of cores is what you wanna set your partitions to. These numbers are all algorithm specific as you can see, in my case, double the number of cores did not actually perform poorer than n number of cores when I set the partitions to n number of cores. The other thing is the dataset size. So there’s no guarantee that beyond the 20 million-row dataset, I would have been able to use also n number of cores for my partition numbers because for like 40 million rows or 100 million rows, it could have crashed if I kept the number of partitions at n number of cores. So it has to be analyzed for the dataset size that you’re using, it also has to be analyzed for the algorithm that you’re running on Spark.

The third point did not really have a significant improvement in my runtime or my performance, but it was significant enough to put it down is that if you’re familiar with Scala’s tail recursion, there’s a lot of optimization that comes with tail recursion over loops for the code that runs on executor. So in my implementation

for any iteration that runs on Spark executor, I used tail recursion instead of for loops, and also from a developer perspective, tail recursion is fun to work with than boring for loops, I guess.

The next two points are almost related. So the first one in hide in sight, it was not very surprising, we should always aim to minimize the number of operations, or collect calls such as aggregations to only a maximum of one per iteration. Obviously, this depends on the algorithm. So you may not have the luxury to minimize them all to one aggregation per iteration, but in my case, that was good enough and that’s why I had this wide data frame that I collected from all the partitions and that’s why I have only one map function that aggregates them all at the same time, and collect calls to a wide data frame was actually performing much better up to 16 times better in terms of runtime, than if I do small aggregations on every partition. So my brute force attempt was that okay, I, it makes sense that it would perform better if I run these small aggregations on every partition so that I have less to work with but actually on the contrary, having that huge collect calls from all the partitions to single-row data frame using toLocalIterator call actually had 16 times improvement in the runtime.

And my last point is the hash function. So the partitioning over unique ID, as you can see achieved close to optimal speed up for datasets larger than 5 million, but what about datasets that are less than 5 million rows or 5 million rows and less. Can a custom hash function help with partitioning on smaller dataset? So I guess this is a point for a future research to consider. So all the implementation that I talked about the resources that I used in my research are on this page. I’m gonna pause here for a couple seconds if you wanna write it down, or you can ask me in the chat, I’ll be happy to share. Some of the papers there are behind IEEE and ACM firewall but again, you may be able to get ’em from Google Scholar if you’re interested in one of the papers by the title and also you’ll see the experiments spreadsheet that I have for my experiments and as well as the actual implementation. So that wraps it up for me. So if you have any questions, feel free to ask. And please don’t forget to rate the presentation.

IQVIA

I am a Senior Software Developer with 7 years experience in software development and 4 years in team leadership positions. I am currently working towards my master's in Computer Science at Carleton University in Ottawa, Canada. I have been working with Spark for 4 years in predictive modelling and data privacy transformations. I recently wrote a research paper part of my master's degree on using Spark for de-identifying datasets using Top Down Specialization technique. Companies I worked for include D+H, IBM and currently with IQVIA.