Managing big data stored on ADLSgen2/Databricks may be challenging. Setting up security, moving or copying the data of Hive tables or their partitions may be very slow, especially when dealing with hundreds of thousands of files. Procter & Gamble developed a framework (to be open-sourced before the conference), which takes performance of these operations to the next level. By leveraging Apache Spark parallelism, low level file system operations, as well as multithreading within the tasks, we managed to reduce time needed to manage ADLS files by >10x. Finally, ADLS files security management can be done by any Data Engineer without profound understanding of ADLS REST API. It also provides new capabilities to Apache Spark applications, to easily move files/folders/tables/partitions with just a line of code. This presentation will show problems, which we are solving using this framework as well as previous solutions, which did not work well. Next we will present in details how this problem was solved using Spark API and what higher level methods are available in the framework. We will walk through available options and planned extensions to the library.
– Hi everyone, my name is Jacek Tokar. I’m a lead data engineer at Procter & Gamble. And today I’m going to present to you the tool which we developed at Procter & Gamble, which is greatly helping manage ADLS Gen2 storage through Spark clusters.
As you may know, ADLS Gen2 is a storage account and it’s in Azure, which is a really great tool, but it’s lacking a bit of functionalities. So with OctopuFS, we’re trying to fill that gap.
The agenda for today is the following. First, I’m going to present to you the use case, so why we developed this in the first place. And then, just for a start, I’m going to show you something simple, which is going to be the function which will allow you to get to know the size of your data on the storage, which is not that simple with ADLS Gen2.
Then I’ll walk you through distributed file copy, which is using Spark in the cluster to copy the files. Then we’re going to walk through some basic file operations like deleting and moving the files. And at the end, we will end up with managing ACLs, so security, on ADLS Gen2 storage. With all these functions, it was quite easy to develop other functionalities like detecting delta in files, or using metastore abstractions to perform some data movements on your environment. So, this is something we’re going to walk through, as well. At the end, I will present to you how you need to set up your cluster in order to run OctopuFS. And also, what were the challenges and learnings coming from development of the tool. And, at the end, I will present to you the link to GitHub where you can get OctopuFS and start using it.
So, first, let me quickly introduce P&G for those that don’t know what Procter & Gamble is. We are a consumer brands company for improving consumer lives for almost 200 years now. And hopefully, with this presentation, we will be also improving your development life when doing software on Spark clusters.
Globally, we are present in many locations for data engineering and machine learning engineering. The main location is Warsaw and Cincinnati and Guangzhou. We have also a site at San Jose. And for data science, it’s Cincinnati, Geneva and Guangzhou. If you’re anywhere close to these locations, or you would be willing to relocate, let us know, pgcareers.com We’re hiring so you should be able to find interesting offer for you at that link.
Okay, so let’s start with the use case. Why would we do that? The operations which I mentioned in the beginning sounds pretty simple and pretty straightforward, moving files, deleting files. So what was the use case? What was the scenario that we wanted to use these operations and why it was important for us to have it as efficient as possible.
Imagine, like any other application, we have a reporting layer. This reporting layer is really data on ADLS Gen2 storage. Again, we’re operating on Azure and the entire framework is written in Scala. So, reporting layer is something that people have access to, and before we promote any data to the reporting layer, we want to make sure that in case it’s wrong, we have a backup. So we have the Prev table and what we need to do is we need to copy files, copy the data of the reporting layer to a different folder so that in case we spot a data quality issue with the data, we can rollback quickly. Then, the other requirement came, which was let’s do as much as we can of the validation before the data actually gets to the reporting layer. So we have the pre-PROD tables or pre-PROD folders where we validate the data before we can show that to the users.
So in order to promote data from pre-PROD, if we decide that this is good enough, first we need to delete data from reporting layer. Then, we have to move new data to reporting layer. But when we move data, security on files is not modified in any way. Files are simply copied on ADLS Gen2 with the same security as they were in the source. So they don’t inherit ACLs from the parent folder of reporting layer. So in order to enable people to access the data, we have to set up ACLs. Hence, as with majority of cases in Spark, we’re working on big data, we have hundreds of thousands of files. So in order to do that efficiently, we have to develop a tool, which is OctopuFS, of course. One of the assumptions or one of the design approach that we decided for was to use Hadoop file system libraries instead of using any APIs directly. So as you may know, Storage Account has its own API. It has its own Azure CLI comments, and with those comments, you can do many things on reporting layer. But when we’re using Hadoop file system, whatever we developed is going to work, also, on all the other file systems that Hadoop file system is supporting.
So, in order to make it interesting or to encourage you to focus, this is the comparison of performance. The first measurement here is Spark Read/Write/Copy. So basically, it measures Spark reading some folder of data and then writing that to a different place without any transformation. As you can see, OctopuFS is over three times faster on the same cluster as the Spark method. The second one is comparison of a move operation. So imagine you want to move a folder with 21,000 files, or you maybe just want to move files from one folder to another without touching the actual folder name or folder path.
So, there are really no alternatives. Db utils move operation took over 70 minutes and it was killed. On the same dataset, with OctopuFS, it is less than two minutes. And the third measurement was for setting up ACLs. So in order to set up security on 16,000 paths, it was only 19 seconds to do that. And, there are really no alternatives. The only user-friendly tool which we have for storage accounts is Microsoft Storage Explorer, but it is lacking recursive setting of ACLs. And, of course, you cannot do that normally from the cluster. So, this should be very fast method of setting ACLs on your datasets, at any point in time, without the need to copy files over and over again.
Okay, so as a warm up, let’s start from something simple. Let’s say that you want to get to know the size of your data. I got that question some time ago and I really couldn’t find a way to answer this without writing my own code. So, in order to make our life easier, also within the data engineering team at Procter & Gamble and in the community, I developed a very simple method called get Size.
And this get Size method accepts as a parameter the path only, and it scans through, recursively, through the ADLS storage and is collecting all the files together with the size. And, as a result, you have standard output printed out the number of files and the size of the whole dataset underneath this path. It returns an object, which is really a collection of whatever we gathered from Storage Account. So what you can do, you can easily drill down through that directory tree and retrieve any size of any folder or file which is located somewhere on lower levels in the same folder, without sending any requests to ADLS Gen2. So it’s really fast once you gather the paths and the sizes with the first setup. The second step takes 3 seconds. The first step, of course, depends on number of files. But with these 21,000 files, I think it took 20 seconds or so.
Now, let’s move to something more serious. File copy. So imagine that you want to copy terabytes of data. And you can use Spark, as I showed you, but it’s going to be a bit slower than with OctopuFS. So, how can we do that in the most efficient way? OctopuFS has this object called distributed execution and one of the methods there is copy Folder. And I will explain how we do ours on this particular example. So, it accepts two main parameters, which is source Path and destination Path.
That’s it. And of course, it needs to run on Spark. So an implicit parameter is Spark Session. Currently, it is configured in a way that, by default, it will launch as many tasks as you have files. So, each file will be processed by exactly one file. I will come back to that with a learning section, but it really improved the way how we did was executed because I could get rid of the execution tail in the copy operations because the files previously were not distributed properly and it was taking longer than it should. The performance of the copy operation, as it should, it realized mainly on a network throughput. And I will show you that on a few screens in a second.
You can copy between different file systems so first of all, you can copy between different blobs on Azure, you can copy between blob and ADLS Gen2, but you can also copy between S3 or any other file system that is handled by the Hadoop file system, like ours. So it really offers a wide spectrum of options.
Here you can see the screens from Ganglia on database, of course. And it is showing the first phase of copy operation. Namely, this is scanning ADLS Gen2 to get list of files to copy to a different location. So as you can see, nothing really happening here. Only a spike, very short spike of network traffic, which is basically the number of requests sent to ADLS Gen2 to get all the paths, recursively, for the folder which was being copied to the target folder. As I mentioned before, only 21 seconds for over 20,000 files.
The next screens are even more interesting because here, we will compare two things. One is how characteristics of the cluster look like for distributed execution using OctopuFS. As you can see, memory usage stays on the same level, cluster usage in terms of CPU is more or less on the 50% level. And the network is pretty much saturated. So you’re getting the biggest throughput that we can get from the set up, which our cluster offers. So of course, the numbers here, so number of gigabytes per second, depend on the machines that you have in your cluster and also the size of your cluster. On the other hand, Spark version is marked here in red. So as you can see, much higher memory utilization, much higher CPU utilization, and much lower network balance. So if you compare even levels of network usage between OctopuFS and Spark, it actually reflects this difference in time. So OctopuFS is usually over three times faster than the Spark method.
Just to summarize, we have OctopuFS, which is three times faster than Spark read/write method. It uses all worker notes on the cluster, so also of course the network out of it. And it maximizes usage of network throughput, which should be the goal in the operation, which is copy operation.
Okay, so we executed the copy and so now what we want to do, we want to delete old files and move new files. So this is something I called here basic file operations. And our goal was to do that as fast as possible so that we don’t interrupt the work of the users. Because we don’t really have any migration window during the day, maybe except for the weekends. But we wanted to do that even when users are actively using the cluster and simply offer them a minimum interruption that is possible. So, all these operations, move and delete, but also setting up ACLs, as I will be talking in a second, they are purely metadata operations on ADLS Gen2.
So there is really no need to employ entire cluster and to use all these resources just to send, at the end, a http request to ADLS Gen2.
This framework is written in Scala, so I used Scala Futures to run all these requests in parallel. And, the default parallelism for the execution is 1000 and it comes from the fact that the limit on storage account is 20,000 requests per second. And more or less, each request takes 50 milliseconds, sometimes faster, but I took that 50 as an average. So that’s why I ended up with 1000 as a default, but you can change this. So you can override implicits in the path as you can see here on the screen. And you can override the thread pool in a way that you can run less or more operations. Please remember that the number of requests per second is shared across all the usage for the storage account. So, in case your users are using the cluster, or the storage itself, they are also sending requests. So if you maximize the limit and you use the 20,000 requests per second, you might end up with problems on the reporting side because user will not be able to list files or the delay, basically, for them will be much higher. It has the retry mechanism built in.
So, first it executes all the tasks and of course gathers the results of the tasks, whether it was successful or not. And in case it was not successful, it will attempt up to five times to do it again. Actually, I don’t remember that ever was needed, but in case something is wrong with the network or storage account is overloaded, then it may become helpful to have this retry.
Also, when we’re going to talk about this metastore operations in a second, so operations on tables, operations on partitions, which combine delete and move operations, there are proper safeguards to avoid situations that when the job fails and you want to rerun it, it will actually safely complete the task and not delete everything you already moved and then move the rest to the target folder. So in case it fails, it should be easy and safe to rerun such job.
And so again, to summarize, a lot faster than db utils, you can move and delete your files from the Spark job, or from the cluster, or even from your machine. Because the driver requirements are quite low and it does not require cluster to run. You can run it from your PC, if your PC has access to the storage network, then you can do that and I did such tests some time ago and it ran perfectly fine. I was able to get the size of the folders simply from my computer. And the only real limitation for the performance of these tasks is Storage Account request throughput. So, if we could run even more, then probably we could increase the default setting, which is now 1000 to do the move or any other operations, which you have seen here, much, much faster than even then it is today.
We deleted the old data, we moved new data, but as I already said, when you move the data on ADLS, the move does not impact the security of the files. So ACLs stay exactly the same as they were in the source. Of course, in the reporting layer, you want users to access the data. Sometimes they’re accessing through the tables and you don’t need to worry about that, but in our case, we have actually in both cases. One is you’re accessing through the tables and the other type of access that we allow them to have is access to the folders and to the data itself on the storage. So, In order to make that happen, there were two functions developed.
One is simply returning a permission object. Permission object has four parameters. One is are you setting up security for a group or for a user? Then, excuse me, there are access details, so the type of access, really, that you want to give to the user. So is it read, write, or maybe execute on folders, it is of course the list permission. The third parameter is access or default. If you ever used the storage account Explorer from Microsoft, when you modify ACLs on the folder, you have two options. One is access, which basically means, this is the ACL that you will set on this particular folder and the default, which means that every element, folder or file which is going to be copied or created under this folder will get certain privileges. So you can set both and then the last parameter is the object ID. On Azure, when you want to get an object ID of a group or of the user, you need to go to the Azure portal, then active directory, and then you need to find your objects, so a user or a group, and get object ID from the properties of the group or of the user. And then, the only thing that is left is to apply ACLs on the path. It will apply ACLs on the path itself or on the folder, and also on all the underlying objects in the folder tree.
So when you do that, and if there are folders underneath the path, these folders will have access type ACLs set up and also the default ones. Again, it is very fast. It took 20 seconds to run it for 16,000 files and I did it on even much bigger datasets and also, it was taking literally two or three minutes even for hundreds of thousands of files.
Here we move to something a bit more complex related to ACLs, but it also reflects our scenario. Imagine that you have a reporting layer, which in this case is a source, because it will become a source for our ACLs. It is a pattern that we want to reflect on the target folder. So, in order to do that, we develop a function called synchronize ACLs. And what this function does, it will copy ACLs from the source tree to the target tree for all the folders which match. So first of all, source and target, because they are explicitly provided in the location of the function, they are treated as matching. So source ACLs will be copied to target ACLs. And then also, folder one ACLs will be copied to folder one ACLs on the target site. But Folder 3 and File 1 does not have any corresponding objects in the source. So what will happen is for all these orphans, let’s say, so objects that don’t have their representation in the source folder, ACLs will be derived from the parent folder. It really doesn’t depend on how complex the structure is, it will always try to find the nearest parent folder which has some ACLs defined. And it has, actually, quite interesting, consequence because imagine that you have a target folder and there is also, so you copied the files and they don’t have any security.
With synchronize ACLs and with providing the same folder twice in the invocation of the function, you actually will propagate all the ACLs from the target folder to the files because it will copy target ACLs to target, so it won’t change anything. But then it will take care of all the objects in the directory tree that don’t have ACLs. So, it will get target ACLs and apply to all the other objects in the folder tree. Very useful. That was more like a side effect than actual functionality but it was helpful several times already in our work. So, I can imagine that it can be a bit confusing, so let me summarize it again. The function takes ACLs from the source folder and applies these ACLs to all the elements in target folder tree.
If there are matching folders, it copies ACLs from one folder to another, or file, and for the rest that are not matching, it is deriving or getting the ACLs from the nearest parent folder that has ACLs set up.
So that’s how it works. And again, if you provide the same parameter twice, and the parent folder has some ACLs set up, it will allow you to propagate these ACLs to all the children of this particular folder.
Other functions, I will not go into details, but if you would like to use them in your application, they are ready to use. So modify table ACLs, basically it is very similar to what you’ve seen a second ago, but it goes through the metastore, finds out where the location of the table is, and then applies ACLs for all the paths of that folder. And then, of course, you can provide the collection of paths to apply ACLs for, and you can get ACLs themselves.
Okay, so with all these functions, we can list, we can copy, we can move, we can set up ACLs, I got a question from Jason Hubbard, “Okay, so do you have delta operations here? “So can you actually move files from one place to another? “Can you make sure that both folders are in sync?”
And it was actually quite easy to do once we had already all these functions that I mentioned before. So, file delta works in this way. Imagine you have a source folder, and there is some kind of target folder. And this target folder is having some files the same and some files are different. In this case, File 1 is the same, File 3 is a new file in the source folder, and File 2 is something that already disappeared from the source. So, as a consequence, what we do, we detect what has changed. So first of all, we see that File 2 was deleted, and then we see that File 3 is a new file, so we want to make sure it is copied to the target. So there are really two functionalities we’re talking here. One is get Delta. So find out the differences between source and target. That’s what you saw. So File 1 was deleted, File 3 is new in the source.
So then you want to delete File 2 and copy over File 3. So, get Delta will detect what has changed. And synchronize method will execute all the operations in a way to make sure that both the target is in line with the source. So it would run the delete operation that I was talking about a few minutes ago. And then it would execute distributed copy for all the files which are missing from target folder, but they exist in the source folder.
So, again, with all these fancy functions, and because we are very often operating on the tables/metastore abstraction, it was quite important for us to have also these movements of data implemented on hive tables.
And we have several functions to do that.
They are copying partitions between tables. They can copy, they can move, they can delete partitions. So, you can operate on an abstraction of table and partitions instead of operating on the files. And what is happening, really, there is the queries sent to the metastore and we are getting the list of files, for example, or the location of the tables so that we know where to look for the data. And in the backend, what is really happening is movement of files or folders for the particular set of tables. And at the end of this operation because we’re working on metastore level, we ought to make sure that everything is up to date in metastore after we are done with these operations, so refresh table and recover partition functions are ran and these functions make sure that the list of files is consistent in the hive metastore cache, comparing to what is actually on the storage. And recover partitions add or remove partitions if our operation impacted partitions in a certain table. So, these were the main functions, let’s say, but there is a bit more in the package and you can see that on GitHub. Let’s move to what is really needed to run this on the cluster, and specifically on Databricks. So, first of all, what you need to make sure is that you have of course, access to the storage. And one of the misleading things that cause a lot of troubles was that if you have your cluster working on some tables and it’s connected to certain storage, and you can read DataFrames and operate on DataFrames, actually all the operations that we are talking here won’t work because the way how the framework was implemented was that it uses RDD API and not DataFrame API. So you have to explicitly provide different security setting, which is available under this link. And this setting will allow you to access the storage and execute all the functions which we were talking here. The second thing is something that you may want to consider. I don’t think we’ve ever seen that happening in reality, but I can imagine that if you have many files and their size varies a lot, that can be a problem. I would suggest turning off speculative execution of Spark so that it won’t start another parallel task and try to execute exactly the same copy as the task that is already running. Because, as you can imagine, copying one file to the same location in parallel is not going to do anything good. The last thing, which is quite self-explanatory if you look at the function definition, are the implicit values that you must deliver. So, for distributed copy, since it’s Spark distributed, you have to provide Spark Session implicit parameter or explicit parameter, if you prefer to do that. And the second one is Hadoop configuration. Again, you can very easily create it in a way, as you can see here on the screen.
So now, let me talk a bit about the challenges that I was facing and quite interesting learnings.
So first one was Hadoop configuration. If you’ve ever developed your own function the task, you probably know already that the task is not aware of Spark Session. So basically what it means is that you cannot get Hadoop configuration from Spark Session, because there is no Spark. So, one of the things I tried was to send, somehow, as a parameter to the function, Hadoop configuration. Unfortunately, Hadoop configuration is not serializable, so I found somewhere on Stack Overflow, a class which was encapsulating the configuration and then I had to unpack this configuration in the task function. So I had additional class, I had to create this weird variable, only to unpack it a few lines below and use it in the task. Actually, it turns out there is a much simpler and better way to do that. You can simply broadcast this Hadoop configuration using SerializableWritable object created, like you can see here on the screen. And then, in the task, it is as easy as getting the value of broadcasted value,
and using that as a parameter for a Hadoop file system classes.
The second one was really, the other one was also interesting, but this was something that I found really, really cool. So if you notice on one of the first slides, you could see that there was this tail of execution. I was talking a bit about that when I was talking about the distributed copy. But here you can see these tails. So the execution of the copy was not ideal. And what was happening is that the default partitioner of RDD was not distributed evenly the files. So what I had to do, I had to create my own partitioner and it actually took two steps. One, first I had the list of files that I wanted to copy so I indexed this list from zero to length of the collection minus one. And then I had to provide information to Spark to which partition I want each line, each path, to go to. And in order to do that, you have to extend partitioner and you have to provide a function called get Partition, which is returning integer and integer is simply the number of partition to which you want this particular line to go to. So very interesting because that’s how I learned how to distribute the data exactly in a way that I want to. And in this case, of course, it’s trivial because it’s operation only, but it allows you to, first of all, if it’s default, a number of partitions is exactly called the number of files, every task will get exactly one file. And then even if you decrease number of partitions, it will still evenly distribute the file count, the maximum difference between number of paths in the Spark partition can be only one. So still quite nicely distributed.
So, finally, how to get OctopuFS. OctopuFS is actually, I think, the first open-sourced package from Procter & Gamble. You can get it from our corporate GitHub site. Just enter this address, there is a source code. You can view it, you can open issues, you can download the jar which is in artifacts section. And of course, if you would like to contribute, you are very welcome to, I guess there is still a lot of things that can be improved there or new functions can be written. Especially if you can test this on different storage than ADLS Gen2, on other clouds maybe, then that would be very, very helpful.
So, with that, I would like to thank my NAS development team at Procter & Gamble. They were the first users of the tool and they suffered from the backout of the tools as well. They also provided a lot of interesting feedback and this tool would not be as it is today, a quite solid and stable tool. And the second thanks goes to Jason Hubbard from Databricks. Jason helped me with a few interesting issues and gave me interesting hints during the development of the solution, so thank you, Jason. Your help was really priceless.
With that, I would like to thank you for this session. And hopefully you enjoyed that. Hopefully you will also enjoy the tool itself. And again, if you have any problems with it, if you have any questions, if you wold like to get any improvement in this framework, let me know via GitHub. Open an issue, we will look into that and hopefully we’ll enjoy that in your later work. So thanks a lot. Hopefully you have great virtual Spark Summit.
Procter & Gamble
Procter and Gamble Advanced Analytics Lead Solution Architect with previous experience in traditional Data Warehousing. Passionate of getting the most out of available tools, solution architecture and development. In daily job working on complex ETL/ML projects on Azure/Databricks architecture.