Apache Pulsar is the next generation messaging and queuing system with unique design trade-offs driven by the need for scalability and durability. Its two layered architecture of separating message storage from serving led to an implementation that unifies the flexibility and the high-level constructs of messaging, queuing and light weight computing with the scalable properties of log storage systems. This allows Apache Pulsar to be dynamically scaled up or down without any downtime. Using Apache BookKeeper as the underlying data storage, Pulsar guarantees data consistency and durability while maintaining strict SLAs for throughput and latency. Furthermore, Apache Pulsar integrates Pulsar Functions, a lambda style framework to write serverless functions to natively process data immediately upon arrival. This serverless stream processing approach is ideal for lightweight processing tasks like filtering, data routing and transformations. In this talk, we will give an overview about Apache Pulsar and delve into its unique architecture on messaging, storage and serverless data processing. We will also describe how Apache Pulsar is deployed in use case scenarios and explain how end-to-end streaming applications are written using Pulsar.
– Hi, hello, everyone. this is the presentation on Apache Pulsar, which is a next generation messaging and storing system. And my name is Matteo Merli. I’m an engineer at Splunk, and I’m one of the co-creators of Apache Pulsar. And my co-presenter is Karthik Ramasamy, which is a senior director of engineering at Splunk.
This is the forward-looking statement that we may present statements on future plans of the company, and so on.
So, before starting by talking about Pulsar, I wanted to have a brief interaction on messaging and streaming. We talk a lot about the messaging and streaming and where Pulsar does fit in the space. So, the first thing to realize is that there are kind of this duality. So, typically when people think about messaging, they think about message passing between components or application or different services. So, for example, application A can send a message, and this message might mean, for example, like create a user account. And application B might take these, and these get some action on it.
On the contrary, if you think about streaming, you think about having a stream data source of events that have happened already. Pass that to the message bus, and then use a stream computing system to analyze this event in real time. But you’re analyzing things that already have happened.
So, you can think about messaging and streaming kinda like two different models, two different worlds, but at the end of the day, the infrastructure that supports them and that enables them is the same, and we can support both of them with the same infrastructure.
If you’re digging down on more use cases, for messaging typically, you have online transactions, integration between different components, and the main challenges for messaging typically are latency, because there’s a user waiting on that message to be published, for example, or availability, because if the system is down, the user will see a downtime.
And data durability, if you say, “Create user account,” and that message is lost, then that’s a very bad user experience. And from a patient perspective, typically they require more advanced features like routing to different cues or having a deadline to choose for retries delays, messaging, individual arc for each single message. On the other end for streaming, similar, but typical use cases is that kinda like real time analytics. And in these cases, the challenge is typically these are throughput. You have a lot of events, and you need to be able to present them in a very, if you should matter, you typically have ordinary requirements and stateful processing, so you have to send to the right consumer. And, also you have these duality between the real-time streaming analytics versus the batch. So, you may have some batching system that is processing data in batch versus real-time.
So, at the end of the day what we saw is that, typically like to have, a completed data platform, you need to have three main components. And if each of them has to be very solid, so the first one is storage. Uou need to be able to study data and have durability and scalability at the storage level. Then you need to have a flexible messaging system on it to be able to easily ingest data and process data out of it.
And finally, the computer because once you have data, you need to do something, You need to process this data and analyze it, process it, and modify it.
So, now we can, that we said the context we can dig in to which ones did this design choice for Pulsar.
So the phrase talk that we typically use is that these are flexible Pub-Sub and compute, which is backed by a durable log storage. So, we started by the log storage and what gives us is the durability. The data is on disk and is replicated both and the low latency. So, we need to have lower than five minute cycles and 99 percentile. At the same time we need a high throughput. So, each partition has to be very scalable and available.
This is a good system, but we also want to design it in a way that is resilient so that as long as we have notes available, we can write on it. The right availability is very important. And the cognitive. So we are able to scale up and down very easily, and that’s a key requirement in the modern models of design because now you can spin up new VMs or containers very, very easily.
Continuing on these, we have a unified messaging model. I’ll talk a bit more, what that means. We support like different semantics on a single topic, and we can skip the millions of topics, and we have native compute framework which is based on functions. And finally, Pulsar was designed from the ground up to be multi-tenant to support different users with different workloads in a single class that we don’t have any problems. And this will support geo replication so that we can support application that needs to reside in different regions in different zones.
I talk about the messaging model, and that’s a very basic characteristic here. So in both of your topics and you have producer that send messages to these topics, and now the producer don’t need to know how this debt is going to be consumed. That is determined by your subscription. So, subscription is a durable resource that you create on a topic, and that will keep your position on the topic. And there are different types of subscription for different needs. For example, you can use exclusive, which means that only one consumer is allowed to consume. You can use Failover, which means that you have one active and multiple stand-by consumers that will take over in case the first one is disconnected.
And the bit more fancier one are like the shared subscription. That means that you can add consumers on a single subscription, and each of them will receive a portion of the messages. So, basically you can scale up your consumption just by adding consumers without worry about partitions or increase in number of partitions because each partition will have multiple consumers attach it. And each of them will see different messages. And similar to that is kinda like a crossover. This is the key shared. So, key shared means that you can still attach multiple consumers, but at the same time, retaining ordering in a sense that message would be the same key with relatives and consumer. So, that is a very powerful way to scale up an order consumption.
Pulsar Client. We have multiple client libraries. The official ones that are supported by the project are Java, C Plus Plus C, Python, Go, NodeJS, plus WebSocket proxy and so on. There are more that are third party libraries, like C-Sharp.
There is Rust and so on.
Client Library is very like high-level in the API, that is exporting supports like partition topic. There’s also like a Kafka, proper API, so that if you have application, you can just immediately switch over just by swapping the jars to talk with Pulsar. There is batching, compression encryption, and also End-to-End encryption to remove the encryption from the system, but just passing data from prodcuer to consumers.
So, if we step back and take a view of the architecture of the system, this is slightly different from the other messaging system here.
We have discussed of two layers, so most of the consumers will always talk to brokers, but the broker is a stateless surface.
It doesn’t store any data locally. The data is stored in the possible keeper in the bookie mills, so the distortion mills. This distinction is very important because, first of all, it means that we can add brokers and bookies independently based on need. So, typically, you need more brokers, if you had more network intensive, or if you have many consumers, that’s more like CPU and network bound, while bookies are more like the I/O bounds. So, if you need more writing power or more read power, you can just add more bookies and you can do that on demand.
That also means that, in this case, that a broker is not owning the data of one topic, for example. So that you can quickly move topics from one broker to another. For example, if a broker is overloaded, we can just simply shift some of topics from one broker to another, and we can react very quickly to changing in different patterns. And that is very important in a marketing system. And also mean that the bookie even are not the owner of the single topics. So, we can also like to ramp up bookies to have more I/O if we need it.
Since we’re talking about Apache BookKeepers. So, this kind a like a very project for Pulsar and what provides BookKeeper is repeated log storage. So the API very low-level. It’s very simple, but the premise is very powerful. So, what gives you is low-latency durable writes. You can create a log and append the entries into it and then close it and then read it out. So, what gives you is that it is read consistency is highly available and can store many logs per node. You can even start a millions of logs per each node, and also provides I/O isolation. I/O isolation means that you can separate the right path from the read path in the bookies.
So, by that, I mean that
the right latency and the right throughput is typically not gated by the fact that there are reads on that node. So, if you’re looking inside one single node, you see that these two paths are separated. The right latency and throughput is only gated by the journals. So, as long as we can write a journal, which is dedicated to that, we are good to go. And respective of the fact that, there are reads happening and this kinda like a very sophisticated architecture in which to use a disk, but that’s how we can have a predictable performance that does not rely on page cache, which is very unpredictable and can block threats all the time. So, this is gives predictable latency and the highest throughput that you can get from the disk.
And by the fact that we don’t have any single file per single log, so the console log is logical only we can scale to millions of logs per node.
So, we talked about a BookKeeper and logs and so on. So, from Pulsar perspective a log is a segment, so it is not the topic. So what Pulsar does here is that, if you have a single partition or a topic, it is not a single log but rather is a sequence of logs. So, it’s segments, so each log is a segment. So over time we basically roll over these segments and create a new one and writes in the new one. What happens is that basically we have these segments, but each segment from a story perspective is independent from the others. So, if you have a cluster of and the bookies, the segments will be spread across all the other nodes.
That means that I can just add one more bookie and the new segments might go on that new bookie. That also means that if most bookies are down, I only have two or three up, I can still write new segments. So my vital durability is still there.
So this is the same view, but compare with other systems like Kafka. So, Kafka is more like partition centric. So, our partition has to reside entirely on a single node while in a imposter case, what we do is that since we talk in segments, the size of the partition is unbounded. While in Kafka, the size of a partition is determined by the disk size of what’s going on. So, even if you have a hundred nodes faster your partition can be bigger than a one terabyte if you have one terabyte disk and also if you want to have a new brokers takeover, you have to do this rebalancing, which is copying all the data. While in Pulsar’s case, what we do is that we simply add new nodes and new segments will naturally be created on the new nodes.
Because of this segment architecture, it becomes very natural for us to offload them. So, that means that we use arterial storage architecture to basically move all segments to cheaper storage. So, for example, if you’re in a cloud environment, you might keep the list a few hours, or one day in BookKeeper. And the rest of the data can be pushed over to S-tray for example and gives a limited topic storage capacity and lower cost for other data that might be unlikely to be read but you might want to have the assurance that you can read it if you need to.
And basically is kinda like the true steam-storage, right? So, you can keep your stream forever or for a very long time and very cost effective.
Continuing on these. So if you think about the stream, the one of the important track is that, you need to know which type of data is in there, right?
So we have added a Schema Registry, which is based again on BookKeeper, the whole schema and we enforce the data types and topics. So, if you have a topic to define your schema for the topic, basically, we can guarantee that the data being produced on that topic, is conforming to that schema. And there are different strategies for what kind of evolution to allow and what evolution to not allow depending on the patient needs. And we support JSON, AVRO, and Protobuf. Again, each application can configure if evolution policy is based on needs.
And if you look at the Schema Registry, basically like the way you declared the schema is, it could be either like a specific, like using APIs to find the schema or implicit by writing your application. So, you can write in Java or Python, declare just came up basically, like in this case, you have a PJO which is my class. And you just say, “I want to use JSON with my class type.” And then you send objects. These objects will be validated that this type, the schema for this object is the same, or is compatible to what the topic really has. And because we can safely consume this data.
Change gears a bit. But this is very important for this particular application. So, the Geo Replication for Pulsar is like a first class citizen. It was there since the very beginning. And what means is that you can have multiple data centers and a Pulsar cluster in data centers and these Pulsar cluster are interconnected. What gives you is that you can publish your economic data for any of the postal clusters in this case. So, you always have the local publisher latency and you can decouple like the whole different regions. And this basically simply based on configuration. You just say that, “Okay, for these topics I want them to be “in the center A, B, and C.” For these other topic, it might be only A and B for example, and you can change it anytime dynamically.
And what gives you is a very sophisticated disaster recovery strategy because these cluster basically are isolated. They don’t depend on shared resources for visibility.
So, if a process is gonna be down, the other one can keep going and accumulating the data. Once it wants to be picked up, it will be procured again.
And the one newest addition in this area is, replicated subscription. So, that gives us the flexibility to have subscription that can be migrated at any point in time to a different cluster. So, you might have, say, that you’re consuming from cluster A and Cluster A is completely offline, and you can switch your application to consume from cluster B and your subscription is being kept up-to-date by Pulsar itself, and there’s a registered risk of mechanism using markers and is consistent snapshot of messages across the cluster but from a patient perspective, you don’t need to do anything. You just have to specify that I want this subscription to be replicated, and then you can just switch your service URL and go to the other cluster, and your subscription state would be correct, and we’d be there.
So finally, we talk about multi-tenancy and how was that a single design characteristic of Pulsar? The main idea was that we don’t need to have each team to have its own cluster. That’s very expensive. That’s very error prone. That’s very opposite operation intensive reader that wants to have a single cluster that can support multiple user and any workloads. So, the main features to support these are authentication, authorization baked in, the concept of namespaces, so that each team can use their own topics and their own namespaces to configure it in different ways without affecting how different other tenants are being configured and having admin APIs for that that each tenant can self-administer. And these are at the logical level, but if you look more at the I/O level, there is more work to be done other because to support through multi-tenancy, we chose to provide isolation between tenants, and for that, we need to provide isolation between tenants writes and reads. That’s precisely what the keeper provides.
The main issue here is to prevent different consumers to impact the system performance. So, for example, if you have a customer that is catching up, if he’s reading from disk, that should not impact the performance of some of the tenant that is publishing. But the only way to guarantee that is that we have to have this isolation. Otherwise, you have things like in Kafka, that one like a reader comes in and is trashing the page cache and that would basically impacting all the Pulsar latencies and throughputs.
Other than that you also have this kind of like soft isolation policy in place. We have storage quotas that, once you reach your storage quota the data would be either evicted or Pulsar would be blocked. Some action would be taken anyway. We have full control at any place, back pressure, again, at the different levels at each level in the system. And we also have rate limiting, so we can throttle the producers, throttle the consumers, depending on needs. And finally, if all of these fields, we also have a cost of hardware isolation, so we can configure this. For example, let’s say that one tenant to be restricted to a soft subset of nodes, and that’s typically is done as an operation, like a last resource, so that you can take these tenant and avoid this tenant impacting other tenants or the reverse.
And then with these, I will leave the work to Karthik to talk about the compute part of Pulsar. – Thanks, Matteo. so for the listeners, I’ll be talking about the compute portion of the Pulsar. And so the compute portion of the Pulsar leads with something called lightweight compute. If you back up a little bit and see what are the different type of computes that we can do in a streaming system, there are four different types. One is the producer-consumer pattern, where data is being produced. Consumer computes whatever you want to compute with the data or consumes the data. The second type of compute is called the lightweight compute, where you do very simple tasks like line breaking, converting the data from JSON to XML, or route the data from point A to point B. So, those are all very simple mundane tasks. So, that is called lightweight compute. The third one is called heavyweight compute, which are gravitated towards more analytics processes, where you have a multi-step diag that takes the data and passes the data through these various different diag nodes that process very sophisticated type of computes before it syncs the data back into the storage on the same designated designation. The fourth one is what we call interactive querying on top of steaming data where the data comes to a rest very, very quickly. And on top of that, you can do interactive querying using well-known structured language, collect SQL. So, Pulsar supports three different type of compute among the four different types. So, which is the producer consumer, which already Matteo talked about, and the lightweight compute that I will talk about, then the interactive writing on the SQL. So, in Pulsar Functions, which allows you to do the lightweight compute, we do inspiration from Lambda-like functions of serverless functions that people are taking inspiration from.
So, the user supplies compute against a consumed message. It works on the function is a return as in one of the popular languages and dispatched into the system. And the data comes from the topics, and the function gets involved, and the data is processed in that function, and the output is sent to an output topic. So, this provides server-less functions; provide the simplest possible API, which is the noise to get required because we use the language, a specific function notation. Then there is a slight enhancement to the the language function notation, or we said there’s some SDKs available for advanced features like state, metrics and logging, especially if you use a defined metrics is needed for some other compute functions.
And it is language agnostic. The Pulsar Functions are language agnostic in the sense like you can currently supported languages are Java, Python and Go, easy to add more support languages, and it’s pluggable runtime.
Functions can be executed within the broker. If its performance is highly critical, or it can be executed outside the broker as a process, then the audi can execute on the context of SDK lists like Kubernetes and the parts like containers.
So, let us look at the simple Python function. So you have the notion of a profound Python function called the process, which takes input as a parameter. Then you can return that input with the plus with the added value of a exclamation. So which means the input string will be appended with an exclamation, returned as an input. Similarly, in the Java style, you can derive from the Java util function and implement a material called apply and the input for apply method is a string input and the output for the method is a string output and the logic, offense and exclamation to the input string and returns an output string.
So, the Pulsar Functions are very simple in the sense like the example that I showed you does not have the notion of a state, the sense like the data comes in and the process and data is put into the output topic. But in sophisticated functions like where you wanted inhuman counters or some kind of aggregations we need to store some kind of a state, and the Pulsar provides us in both state storage, because of the BookKeeper story that we come underneath in the Pulsar Functions. So, the state is global and it’s replicated. And they’re multiple in terms of the same function can access the same state, even though they may be running on a different missions. And the function frameworks provides simple abstraction of accessing the state, as well as updating the state, like get state put state for simple counters, be able to know stop increment counters So, the state is implemented on top of Apache BookKeeper. There is something called a table service, and BookKeeper provides a sharded the key value stored based on logs and snapshot, and the state is stored as a bookkeeper ledgers. And warm replicas that can be quickly promoted to leader because of the fact that the data is replicated. If the node goes down, another node has to pick it up. So, it can be quickly promoted to a leader of that particular state. In the case of a leader failure, there’s no downtime or a huge log, to replay to bring that state back to the state where it left off.
So, let us give an example of how a state looks like. So, we have an APA called a context and the Pulsar Functions. So you drive from the Pulsar Function and that takes an input as a string. And then that needs a wide function, which does not return any output. So, the process is the name of the function, which takes input a string and the context parameter. And the thing is split into words, and the word is counting, so the context of the increment counter, which a word with increment counter one. So, this increment counter is a stateful function, which increments the counter at the bookkeeper level when the multiple instance of the same function isn’t on across several nodes, the counter is autonomy so that the count is accurate, and these counters can be accessed using the separate API. which provides a key value like abstraction for getting those data. So, that in case if you wanted to serve the data, the counters for serving function or serving obligation, you can use directly the API.
Now we saw a little bit about lightweight compute in Pulsar. Now let us look at how you can get data in and the data out of Pulsar. So there’s a framework called Pulsar IO, which is based on Pulsar Function, except the fact that source’s input and some data and its input is a void and that output is a data directly dragged in the topic. And sink is the input gets the data from the topic and the output is a void, which in turn has a logic of writing into their own sinks. So, the Pulsar IO a powerful framework. On top of it, you can write connectors very quickly.
So, there’s a lot of the built in IO connectors that Pulsar comes out of the box like Aerospike, JDBC, Twitter, Storm, Kinesis, RabbitMQ. Even we do have connectors into Spark, Flame, then Postgre, Debezium, for which you have the change data capture across several databases like Oracle, Microsoft SQL server, my SQL Postgres. There is a bunch of these connectors that are available that you can take advantage of out of the box.
So, then the second aspect of how to manipulate the data or how to query the data is based on the data stored in Pulsar. As Matteo pointed out, the data that deters into Pulsar is stored in BookKeeper, based on the amount of retention period that you configured on. Now, the question is like, can we query the data that is stored in Pulsar? So, that is what this discussion is about.
So, the Pulsar SQL, is a wrapper that directly access the storage layer for Pulsar, It uses the Presto, which is the interactive query engine developed at Facebook. so it can query both historic data as well as real-time. The reason why the data is real time is because Pulsar’s data that is coming in are ingested into Pulsar is synced with the AC immediately returning to BookKeeper. And within a few milliseconds, the data is available for querying purposes. And since the SQL requires a schema or the data for querying purposes, the Pulsar SQL, or the Presto implementation uses the schema that you state that comes with a Pulsar itself, so that it uses a Pulsar schema registry Pulsar Storage for querying the data. And then you can also, because of the fact that comes with Presto, it can join with the other data sources. If we have my SQL data, for which, you want to join with the data that is in Pulsar, it’s possible to do because of the fact Presto handles it, supports it.
So, Pulsar, the data, how the implementation references the data directly from BookKeeper into Presto under completely bypassing the broker. So that way, what happens is, it’s completely efficient. And in addition to that, the Presto shards are the shards of data that is distributed across multiple nodes of BookKeeper can be parallely queried so that you get much more disk bandwidth, which means multiple shards can reprocess in tandem, thereby giving a quick response time for the queries that you issue. So, that is what many data rates means. Data is split even on a single partition. And multiple workers can read data in parallel from a single Pulsar partition. So, as Matteo pointed out, it’s not only the parties in these single partitions, and its corresponding segments are distributed across the entire cluster.
So, which means even if the query has to keep a single partition because of the fact that segments of partition are distributed across multiple disks. We can read them in parallel and process them in parallel. So, that way, even if the presence of data skew, the queries will run faster. And by default, it provides time-based index. We use the published time to slice the queries based on time, so that the predicates, it could be constructed on power using that queries to release the data, being read from the disk itself.
So, currently there are some work in progress to allow direct access to the data stored in Pulsar. and based on the Presto connector that we have done, so that computing frameworks like the Spark, and directly do some of the batch computations on top of the data stored in Pulsar. So, which means we can essentially marry the air time and batch into one single infrastructure, the storage infrastructure. So, that work is on the way, and it should be available anytime soon.
So, Apache Pulsar is a completely open source project,
which was originally developed at Yahoo and the open source to the three and a half years ago. And that is it has seen tremendous growth over the last three years. There are around 280 contributors so far, in the GitHub then that are 30 full time committers checking and developing various features across different companies. There are hundreds of adopters. The companies that are adopted and the GitHub Stars is getting very close to six K.
And over 600 plus companies have adopted Pulsar and it’s growing at a faster pace.
And some of the companies that is running Pulsar in production includes Verizon Media, which is Yahoo, Yahoo Japan, Tencent and Splunk, recently adopted Pulsar and a big way.
Then Comcast and Nutanix, Overstock, Narvar, OVHcloud large was based on Pulsar, and there is a other commercial company
called The Gulf that launched a service based on Pulsar. OVH even contributed that protocol level compatibility for Kafka on top of Pulsar, then Toast, which is a restaurant devices,
serving devices in restaurants. They are completely adopted Pulsar. So, there’s a list goes on.
So, that’s all we have. Thanks for listening.
Matteo Merli is currently a Senior Principal Engineer at Splunk and one of the original authors of Apache Pulsar. At Splunk, he is focused on integrating Pulsar in various products. Before Splunk, he was one of the co-founders of Streamlio that was acquired by Splunk. He currently serves as the VP of PMC chair for Apache Pulsar and also a member of the Apache BookKeeper PMC. Previously, he spent several years at Yahoo building database replication systems and multi-tenant messaging platforms.
Karthik Ramasamy is a Senior Director of Engineering managing the Pulsar team at Splunk. Before Splunk, he was the co-founder and CEO of Streamlio that focused on building next generation event processing infrastructure using Apache Pulsar and led the acquisition of Streamlio by Splunk. Before Streamlio, he was the engineering manager and technical lead for real-time infrastructure at Twitter where he co-created Twitter Heron. Twitter Heron was open sourced and used by several companies. He has two decades of experience working with companies such as Teradata, Greenplum, and Juniper in their rapid growth stages building parallel databases, big data infrastructure, and networking. He co-founded Locomatix, a company that specializes in real-time streaming processing on Hadoop and Cassandra using SQL, which was acquired by Twitter. Karthik has a Ph.D. in computer science from the University of Wisconsin, Madison with a focus on big data and databases. During his college tenure several of the research projects he participated were later spun off as a company acquired by Teradata. Karthik is the author of several publications, patents, and a popular book "Network Routing: Algorithms, Protocols and Architectures.