At Facebook, Apache Spark handles large batch workloads which at times may deal with sensitive data that require protection and isolation covering all surfaces of authentication, authorization, and encryption. With jobs from multiple teams running across data-centers and geo-distributed regions, spark actors (driver, executors, shuffle service) need to securely communicate over networks spanning large geographical areas. Spark at FB also operates in a multi-tenant environment with strict access control policies that need to be enforced to guarantee data protection and job isolation. Operating at this scale presents several scalability challenges and we’ll share our approach to solving a few such challenges in this talk.
More specifically, as part of this talk, we’ll share how we deployed TLS encryption for Spark jobs to secure data in transit over an untrusted network, and discuss the implications and overhead of doing so. In addition to this, we’ll cover how tenant isolation, security and fine-grained access control (i.e., row/column level security) are designed and implemented, along with our work on scaling the generation and validation of signed access tokens and jobs resource distribution (files, archives and jars).
– Hi, my name is Ankur, I’m a software engineer on the Spark team at Facebook, and the talk today, along with my coworker and co presenter Abdul, we wanna talk about how we go about handling security in Spark applications, and some of the challenges we have faced and the failure scale.
So looking at the agenda for today we gonna talk about a bunch of security topics. First we will be looking at the ecosystem the ways actors involved in how we deploy Spark applications, then we’ll look at how we provision identities for these applications, we’ll be talking about authentication, both external and how we authenticate within the Spark components.
We’ll be looking at data access, authorization, we’ll be looking at table level and column level constraints. I will talk about how you go about distributing tokens, and some of the challenges being faced on that and how we evolved our design designed around token management. We will be talking about encryption, how we enable the in-transit encryption and how we implement that. Then we’ll switch gears, we’ll be talking about container security, we’ll look at job isolation. And finally, we’ll talk about untrusted code separation.
So, as the stack is structured, we’re gonna talk about hows, we’ll talk about the whys. We’ll provide a bit of motivation, as to why we’re doing it so attack vectors. Some of the possible threats and vulnerabilities, and then we’ll talk about how we go about implementing this and what has been some of our design choices.
So let’s start off with the ecosystem and how these actors into play, what is our deployment model. We use a client deploy mode, which means that the driver is sitting outside of the cluster where we have executors running. The way these applications come to life is from via the data pipelines. So when data pipelines have their tasks scheduled, and these tasks start running, they’re going to spin up drivers, drivers have to come into life they’re going to make a connection with the resource manager. And after negotiating resources and getting container allocations, the driver is going to spin up executors and we’ll start distributing work to these excavators.
These Spark clusters are fairly distributed, are fairly shared in nature, we’re talking about multi tenant clusters here at any given point, there are multiple executors from across different tenants which are sharing resources on these clusters. And it’s not just at cluster level, even if we look at a single host level, we don’t have any kind of exclusion in place. So even on a given host, we can have multiple executors from across different tenants, sharing resources and being co located. So that means we need to have strong isolation guarantees for these Spark applications. At any point that should not be possible for, let’s say user code that’s running as part of these applications in form of UTF eight or script execution, to go outside of the boundary of its application. It should not be possible for attackers to be able to inject malicious code and gain access into other script spaces of the neighboring applications. Now while these clusters are multi tenant, if we look at the application itself the application is single tenant, or the life of the application that is catering to one single tenant, and a bunch of queries which represent the compute job that was put up by that tenant. Throughout the life they’re going to be making several requests outside, for all those requests, they will be representing the tenant, and they should not have access more than what the tenant really has privileges for.
So let’s talk about job identities now. Let’s look at why we need to have Job identities. We just saw that these applications represent tenants and for their lifetime, they talk to several different external services. We have services like configuration. We have services for elementary metrics, we have services for cluster load balancing, meta store, metadata, data access, permission checks, so on and so forth. So all of these services they have their own authorization rules in place. They don’t want any kind of unauthorized access, and it should be possible for those services to authenticate any incoming requests, which is why these Spark applications need to have a set identity, on the basis of which they’re going to authenticate those external services. And authentication aside, we also need to have some level of a guarantee here. These are fairly large scale applications and we have a lot of executors, and if there’s something wrong going on, it’s very easy for these applications to bring down the service that they’re trying to talk to. If there is a, let’s say surge of requests from one single tenant, number of applications from tenant compounded with the number of executions per party application we’re talking about a large number of requests. So in those cases, it should be possible for the downstream application to have some kind of mitigation in place. They should be able to identify where the load is coming from and they should be able to provide mitigation in terms of having blacklisting for example from the user this is coming from. So now that we know why we need job identities, let’s look at how we go about provisioning these identities. So these applications are going to be.
They are brought to life from data pipelines. So it starts off from the data pipeline level, from data pipeline, we are going to make submission into the scheduler, the scheduler authenticates data pipeline on the basis of the identity that data pipeline provides. And once scheduler has the identity, it’s going to use that identity to spin up the data pipeline task which internally spins up the driver. So driver before it is launched, has its own identity, which is same as the identity that the data pipeline provided to the scheduler. And in between there are a couple of steps involved, the scheduler is going to take the identities, going to go and talk to a central certificate authority service. The service is responsible for maintaining of jobs specific identities or jobs specific TLS certificate. So, these TLS certificates are provisioned, and injected into the driver environment. If we look at the certificates, they are going to have the tenant as the owner, which makes the certificates job level, and not host level. So once the driver has the certificate. It has a notion of its identity, and it’s going to use this identity to talk to the resource manager, when it talks to the resource manager, it is going to do the same loop as the first care user, it is going to go talk to the same certificate authority. It is going to use the driver identity which is same as the pipeline identity, and then it is going to provision certificates, and it’s going to inject the certificates quite similarly, in the executor environment. So TLDR of this is a boat driver and executors before they get spun up, they already have TLS certificate provision in their environment, and they’re injected into the processes.
So moving on to authentication, we talked about it briefly in the previous slide we saw how these applications represent tenants and they need to talk to external services. We talked about TLS certificate provisioning and TLS certificate is the manifestation of the identity. So let’s look at how Spark application dates the TLS identity, and then uses that to talk to we have different downstream services. At Facebook, we have this internal infrastructure, all these services they talk to one another, using trip protocol. So, when Spark application wants to talk to any of the Argonne services, they are going to have thrift clients embedded inside of the application. And these thrift clients have we have consistent mechanisms where we have conventions where the thrift clients are going to look at the TLS certificate in the environment of the application. And when it finds it it’s going to inherit the identity of the container app, and then it’s going to use the TLS certificate to make an outbound connection with the downstream service. It is going to use this certificate to create an encrypted channel. And then on the service side of the downstream service. We are going to have client side certificate validation checks on the TLS handshake, which is going to ensure authentication between client and server. So, we have talked about how Spark applications authenticate with external services. We also wanna talk about authentication within the Spark components, given that Spark is a distributed application itself and there are multiple processes talking to each other. You wanna secure the communication between those processes. So let’s look at some possible attacks around this and why we want to secure this in general the communication between for example, rather ambiguous, we have a couple of Navy SEALs running, both of these processes we have the RPC environment, which forms the control plane of the application, we have signals like launched towers, counsel towers we have heartbeats exchanged over here. There are times when the result comes back with rows, and these are real table rows we are talking about this. It’s a Spark optimization of sending these results back if they’re below our threshold size, but it can give a peek into some of the sensitive data. Then, as we’ll see later in the talk. We also have a token based distribution mechanism on this RPC endpoint we’re extending this endpoint to have a pull based mechanism for pulling tokens from our driver. And for that, we are effectively, we are transporting the token on this channel itself. We want to have authentication if you want to have security around that, so it’s crucial to to secure this RPC channel. Looking at the block transport service, the secondary server BDS forms the underlying layer for block manager. This is where rdds are transported. This is where we send down broadcast variables and shuffle data, taking an example of broadcast hash join, where the smaller table is broadcast over to executors. It is going to go over the wire, it is going to be available via this endpoint, and we are talking about a table is going to contain live rows and there could be fair amount of sensitive information in the table itself. So it’s important for us to authenticate these endpoints. We don’t want any raw processes that just know the protocols, the RPC protocols that driver and executors communicate and to be able to connect to the endpoints, and pull out data unwantedly. So, that talks about why we need protection. Let’s look at how we implement that within Facebook. So at Facebook, we have a custom implementation, which is token based authentication. So we have a notion of crypto R token and of Facebook inter token implementation that executor acquires similar to how it acquires the TSL certificate that is the scheduler injects the token into executor environment. And this token is also representative of the application identity, the executor is going to present the token over to the driver and then driver is going to authenticate execute on the basis of that. We have a Nettie pipeline we are doing it the same way as Spark does to open source authentication handling. So if we look at the Nettie pipeline, it represents the channel, connected state, it’s a set of handlers and firms like the chain of responsibility where every handler is responsible for processing the message that is going down the wire, and it messages, the package, and hands the package the wage lights and then it’s going to forward it to the next delegate in the chain. As an example, we have the SSL handler, which provides an encryption support. We have the idle state handler, we have the encoder decoder on upstream and downstream side, and we have the auth handler which contains the implementation, you were talking about earlier. So this auth handler, it is going to keep challenging the client until it comes back with a valid token, and it is going to mark itself, or the channel in the authenticated state, once we have the token presented from the client. We look at whether this token has, it is succeeding all the validation checks so we are going to these are like we have digital signatures with authentication codes in them. So we are going to validate on the authentication code, and we are also going to look at the identity embedded inside of token. We don’t want executors from other applications to be able to connect to the driver, and be able to pull data out of it. So we wanna be sure that it’s the same application that we are authenticating.
So, moving on to authorization. Why do we need authorization? We have these apps representing tenants and they’ll be accessing data in the data warehouse. If we look at the data warehouse, we have multiple different tables, we have by the way a single data warehouse at Facebook which contains all the teams, all the data, all the tables are contained in one space. And we have authorization rules set up different teams have different views, typically teams have limited view into the data warehouse, they have access to certain tables, and they gain access via business justification. So, different teams are going to have different access, as Computer Engineers, we wanna be sure that we are honoring all of those authorization rules, and we have enforcement around that. So, taking an example if you look at the HR team which is one of the customers of this Spark platform, they own the employee record tables. And they have exclusive rights on that table. We don’t want any data ingested by HR team or any kind of jobs being run by HR team to have visibility from outside we don’t want other teams to have a peek into the tables that are owned by the HR team. So now that we know why, or now that we have found a bit of motivation around authorization, let’s look at how we enforce that within our implementation here. So we have authorization checks, both in the metadata, and the data layer. In the metadata layer of when Spark driver spin stuff, and it starts passing the query, at the query planning time it’s going to take out the table, and the column names, and it’s going to present all of that information over to the metadata layer. Now meta data layer is going to talk with permission servers and any back end service where those authorization rules have been predefined, and it’s going to authenticate the driver on its TLS identity. It’s going to authorize this identity, it’s going to check if the identity has access to the tables and columns, it’s trying to access and if this check succeeds, it is going to come back within with a data access token. This token is sent back to the driver, the token is scoped in nature, so this token is going to have certain parts to the tables that the query is trying to access. So, even if the application wants it cannot go beyond the scope of the query. So once the driver gets the token. It is going to distribute this token over to the executors and executors are going to use this token to access the distributor file system.
So, continuing on the token. I wanna talk about why we are choosing to have token, as opposed to just going with the TLS identity. So there are two reasons, one, these tokens have smaller scopes. As we saw that our scope is limited to the scope of the query itself. So even if the tenant has access to let’s say 50 tables and the query needs to access, two or three tables by the joint, our token is going to be limited to those three tables only. If the token ever gets compromised, it is not going to unlock access to the extent that the tenant has it’s only going to unlock access, limited to the query scope. Second, we wanna have these tokens to be short lived. In case again these are compromised, we wanna be sure that the attack window is very short, in any kind of attacks in on your on in that shorter time duration. So, that takes care of the whys. Let’s look at how we go about doing this and actually, I want to run you through on the, the Evolution around design we have, we had with this one. So when we started, we had a notion of delegation token. That was a identity token, we just had identity details and did not have any kind of authorization details. So the driver would get the token from the token service, it will distribute the token to executors, and then executors is would present a token back to token service, along with the paths, they were trying to access in the file system. Token service would now hand back the authorization token, or the access token and this token would have parts also embedded inside of it. So this token was acceptable by the file system, and executors would unlock the parts that they were trying to access within the file system. This had scalability problems because we had this layer of indirection on execute aside. So all of these tokens they have to be exchanged from wider token service to be able to get to a point where we can unlock access in the file system. So we changed the approach a little bit. We brought that exchange step onto the driver side. Now on the driver side instead of getting designed into key token, we get the data access token. So at the time of query planning, we are going to pass in the table and columns and token service internally has mappings with maps, this, which maps the table names and column names into the fats, and these are parts of the hierarchically stored tables within the file system. Once the driver has that it’s going to send this token to executors by broadcast and executors will not need to have the previous exchange step now. So while this takes care of the previous scalability problem, it had another shortcoming and the way we had this implemented what was why our Spark broadcast variables. So we were broadcasting this token, and in broadcast, we don’t have a way of refreshing the value. So if we need to have tokens that are short lived, we need to have some kind of refreshes also. The way we solved that was wire extension of the RPC endpoint between driver and executor, something I was referring to earlier in the talk. So, executors now issue RPC request to the driver, driver talks to the token service that’s going to refresh the token, send it back to executors and executors do that every time that token is about to expire so we have that periodic refresh of token initiated from the executors side.
A couple mentions around the implementation here, we have layers of caches on both executor and driver side. On executor’s side we have multiple tasks running which are going to try and get their token, and we don’t want all of those requests to land on the driver, otherwise the communication between driver and executors is going to be quite chatty. Likewise, on the driver side, we don’t want all the requests coming from executors to land on the token service, otherwise the token service is going to get all the requests initiated from the executors and that’s going to defeat the entire purpose. So having these layers of caches kind of protect us against number of calls going towards the token service, and we don’t have any scalability concerns.
So I think I’m done with my topics. I’ll now hand off to Abdul who’s going to talk about encryption, container security, untrusted code separation. Up to you Abdul, – Now, Ankur has explained how Job identities are provisioned, and how tokens are generated. You know, I’ll explain more of how in-transit encryption is implemented and used.
In-transit encryption is enabled to install components to driver to obscure or shuffle service are located across untrusted network boundaries such as cross data centers or cross regions. For example, it’s possible for a Spark driver to be running in a different region or a different data center than what the executors had allocated.
Also there are two types of data exchange during the lifetime of a Spark job; controlled data and job data. First, controlled data or real data include all control signals exchanged between drivers in a shuffle service, such as launched tasks, kill task, heartbeats, as well as the broadcast, task results also includes shuffle client calls, as shown in blue in the diagram.
This data is exchanged over a TLS encrypted RPC using the job provision TLS certificate, which is only then distributed through components as I explained to the job identity slide earlier, Our implementation of TLS encrypted RPC uses a idle handler, based on the Java x Salinger library. That also include change center locations to Sparks transport contracts and shuffle client classes, along with helper classes in the network protocols module. It’s worth noting that Spark in a Spark open source there is support for SASL, which is Simple Authentication and Secured Layer. But here we elected to develop and use TLS central RPC since SASL does not handle key store distribution across all components, while TLS keys for symmetric encryption are generated in exchange as part of the protocol. Also this process for TLS benefits from the existing TLS support in Facebook and from. Secondly, for job in-transit data, such as shuffle data or spill data between subsequent stages, Spark components rely on secure threats, using the job provision TLS certificate as well. This is used to communicate with other physical services such as remote storage. It’s also worth noting that there are not for permanent and temporary storage which is also incredibly addressed. One notable challenge that we faced was that TLS and crosses DC channels are not suitable for transferring large job resources, such as files, archive or boundaries. Due to the scale of number of executors per job, which could overwhelm the drivers, that is the handlers and its buffers. Instead of that separate torrent based, find a solution center is used to distribute site packages containing job resources.
Now that we went over in-transit encryption between Spark components, let’s take a look at the container security practice used to ensure job isolation for Spark secures in a multi tenant environment. After Spark driver receives the required resources from the clusters resource manager. It communicates securely with each workers node manager to launch executors. Because the driver provides a signed delegation token, the node manager is able to connect with superior authority to fetch a provision a TLS certificate into the containers, working there. Then it launches the Spark secure process as a unique Linux user that represents the user wanting a job. For example, each container reconducting has exclusive read right execution permission to that user, which runs a particular Spark executor process.
Further, file-system isolation provides protection against responses from other containers running along with the same host from picking into the TLS certificates, all working directories for that belong to other jobs. Further, user codes such as an in script transformation is always run as a low privileged user to guard against external processes for that job or other job from reading that data our injecting files on board. I will show all of that in the next slide. One challenge that is face here is l-tab resolution. As each container launch event requires l-tab resolution, l-tab service, l-scaling were configured and deployed to ensure sufficient capacity and secure base requirements for the system security services daemon or SSD has been deployed locally to provide caches push helps, with ltab resolution.
Finally, on sandbox and user code. There are two types of Spark jobs. There’s Spark jobs with sequence clips which run in a trust Spark environment. And there’s also the user code. In case of script transformation, and which also the user provides custom binaries enabling users to write custom code allows for flexible and efficient facilities being executed. Sparks executor launches the user as user divided binary as an external thought process that is running as a low privilege user and communicates with the overall standard pipes.
Sandboxing user code execution is needed as it runs along the Spark executor in the same container and needs to be prevented from accessing sensitive data such as the provision TLS certificate, for the current job, or even from other jobs running along with the same host. As well as sandbox is used to control external level of communications.
In addition, all policies running in the container are subject to strictly two containers, which control the CPU memory and IO resource usage.
With that, we conclude our presentation on securing Spark application at Facebook and we hope you found this helpful and inspiring.
Abdulrahman Alfozan is a member of the data platform team at Facebook where he works on building and scaling Apache Spark to provide distributed computing as a service. Abdul is passionate about large-scale distributed systems and his primary focus is to enable data scientists and data engineers to be more efficient and productive when using Apache Spark. Abdul studied computer science and engineering at Massachusetts Institute of Technology.
Ankur Pathela is a software engineer in the Data Platform team at Facebook, where he works on supporting batch workloads on Apache Spark. Ankur drives threat model analysis and vulnerability fixes for spark applications at Facebook. Ankur loves working on distributed and large scale systems, and previously worked supporting data ingestion while on the team building Experience Platform at Adobe. Ankur received his Bachelors degree from the Indian Institute of Technology.