Most enterprises have business critical code that is well maintained and high performance. The switching costs to rewrite or port this code can often prevent adoption of new frameworks due to the level of technical debt. Adding another level of indirection through network proxies often results in an unacceptable performance hit. This problem is particularly acute in edge compute workloads where high throughput sensors feed real-time processing and storage pipelines. Illuminate Technologies’ threat detection solutions for 5G networks apply Spark and custom Data Sources to implement this workload efficiently. In this talk we pursue an alternative approach of integrating proven native code with the power of the Spark DataSourceV2 API. This allows the power of the Spark platform for ETL, structured streaming and data formatting to be combined with the data processing logic of existing code.
The talk will walkthrough the structure of a custom datasource that can be used in streaming or file modes that wraps a native C++ processing engine in a single JAR. Techniques such as JNI wrappers, autoloading shared libraries and maven build integration will be shown. The talk will also cover pitfalls such as multiplatform support and library dependencies. The talk will include demos, benchmarks and framework code will be made available on github.
– Hi there, I’m Doug Carson, I’m a Customer Solutions Architect at Illuminate Technologies. I’ve been a Spark user for probably about five years now, 2015, looking at various optimization techniques, and when I start to look at Spark as a way of storing data and also ingesting data into our systems as well.
So, this is a kind of walkthrough of some of the challenges we had to integrate legacy code and hopefully be useful to look at Spark as having an alternative data source in what you’re trying to do.
Okay, so, here’s our agenda. First of all’s, why native code? Well, I think our workload is gonna give some illustrations in that everybody has their own unique vault workload, everybody has their own unique, secret sauce, so this is a way of looking at it. You’ve always got legacy codes and you’ve always got build-or-buy by decisions about whether it’s better to start again, or do you take your old cord and wrap it up.
And one of the ways to think about it, is RFC 1925. So if you’re following along, you can Google that, and you’ll be in on the joke and why that is (murmurs), there’s actually some wise words in there, 1925 as well. So in the mean, submit to the, the thing that we’ll look at is integrating native codes. We’ll look at taking some C++ code, which is actually the Pcap library. It’s an open source library that we’re gonna wrap and turn it into spark datasource. Now, to actually bring it into Spark itself, there’s a few tips and tricks you need to do to actually load that, perhaps, so we’re gonna have a look at that as well. So we’ll look at the datasource version two life cycle, and also how you plug into the API to create yourself a fully-formed, independent Spark datasource, (mumbles) to actually make these data sources work, we’ll look at a file example where we’re just reading from a file and ingesting the data into Spark, which is the I/O access pattern, and we’ll have a look at some of the bits that are missing in APIs and how you can work around that. And then lastly, we’ll look at streaming because that’s very often you may have large-scale sensor data, where you’re want to ingest that data in real time, and Spark Streaming is a great way of getting data into your systems and we’ll have a look at that. And there’s a couple of tricks that we want to look at within that as well.
Okay, let’s have a look at our workload. What our systems do, is look at cybersecurity aspects of mobile devices and mobile networks. These are carrier-scale networks with, potentially millions of users on board. And we have, tools that will help look at the security that now work and ensure that network’s secure. So clearly, there’s a lot of data to deal with. And we deploy sensors within the network, that can look at Internet traffic, we can look at interconnect traffic, and then we can look at traffic related to the radio site as well. So we can archive that data on, and then a Security Operations Center within a telecoms provider can look at that data. And the great thing about Spark is, we can slice and dice that data in lots of different ways. Lots of different analytics, or lots of different use cases. So for example, on the internet attack surface, you might want to look at potential hacks or intrusions on the device, you might wanna do packet forensics, to see how the network was intruded. There might be critical voice services during assurance on that, and again, we can do some tools that help with that.
The stuff that we can do in the RAN, attack surface, which is the radio access, so we can do things like emergency geolocation assist for example, by looking at historical data within that. And then on the interconnect, we can actually look at attacks that are coming in from interconnected networks and doing stuff. So we actually deal with quite a lot of data. Obviously, that data is all in real time, and there’s quite a lot of data as well. Probably about a billion events. So we need a way of ingesting that data. So, why would we want to do a data source, if we’re already doing that sort of thing?
Well, if we think about where we want to get to in 5G networks, we’ve got two little bubbles there: the first one in 2019, is probably what you feel now with 5G now, where, the access network changes, so the radios change. You can see it’s a very small part of the network. Moving forward five years, the core of the network’s gonna change and that’ll be a completely different way of how the network’s controlled and how new services are added. And unfortunately, the reality is, the rest of the network is actually fairly old. In some cases, it goes back 20 years. That’s a lot of legacy codes, that’s a lot tradecraft that were built up over the years and a lot of domain knowledge, and we don’t want to throw any of that away. So we need to get a way of being ready for the new technologies that coming in 5G, but, have all these legacy codes and bring that in, and that legacy code is being written in C++ for performance reasons. So that was kinda driving a decision when we took a look at Spark. Okay, we need to have a better way of getting data into the system and we don’t want to lose that.
So, native code, why would you do it? Well, we’ve said that there’s got a build-or-by by decision, you can use some, so you might ask, why would you want to use some crufty old code written by somebody like me, when you can use some great new language and great middleware. And that’s some valid point of view, you’ve always got build-or-by decisions. There’s some very good reasons why you want to build on what you already have rather than taking new stuff.
Within any network or any domain, RFC 1925 gives you some fundamental truths of that. The first one that says, “The problem was already solved.”
Your code has to work, some things networking, can never be fully understood. Until you’ve actually been there, you’ve only barely just achieved it. And, there’s always the same problem rehashed over again. So what this means, is that your existing code has value. And you have domain knowledge, you’ve tested it, and you’ve learned from your mistakes. And, if you’re gonna change that code and have new code, then you really just gonna revisit those problems again, in a different way.
The other reason is your code’s already optimized, and quite succinctly with sufficient thrust, pigs will fly. You can always throw more CPU at the problem, or you might have optimized it to the limit anyway. So there’s no point in trying to write it in a different language and expect something else to happen. “You can’t increase the speed of light”, is quite a good observation. And also, it takes a long time to optimize code. Writing the code’s only 20% of it. The optimization and debugging is usually 80% of that. So you can have good, fast, and cheap: pick any two. Usually you want to get there fast, and you want it to be good.
And for every resources, you’re always gonna need a bit more. And in fact, even though it’s legacy codes, the requirements to your domain will increase anyway. So you’re always got to, try and get ahead of the game and do better stuff in there.
So the summary of that, is Moore’s Law won’t save you. Your domain will always have new requirements, you’ll always have to take your baseline code and make it go faster for more subscribers, more simultaneous connections, whatever metric you use, and also remember that yes, you can have as much CPU power as you like, and clouds can scale, fantastically, but those cloud compute costs aren’t zero. It’s basically, a number of cents per CPU per hour. So the less CPU you use, then the better, so, optimized code is a good thing.
And, the other one is: by changing horses, you’re gonna introduce new problems, or you’ll discover the ones you already have. So, it’s always possible to move the problem around, you can always add more levels of indirection, you can always take multiple problems that have already been solved and create a horrible, interdependent solution, and it’s always harder than you think. And there are no silver bullets in that either.
So, yes, things like Kafka could’ve, for example, can give you another level of indirection, but make sure you don’t make it Kafkaesque. Who’s gonna do the checkpointing? Who’s gonna do de-duplication? Who’s done the streaming analytics? How many network operations are there? Who’s doing the storage? Kafka has a lot of these features and so does Spark and you want to make sure that (mumbles) good partition on that. So think carefully about who does what. And if your code already does some of this stuff, that’s fair enough, but don’t try and do it in duplication, just use the minimum you want to have.
So, the bottom line is, that it’s in the 12th rule in RFC 1925, “Perfection is reached, not when there’s nothing left “to add, but when there’s nothing left to take away.” So really, you don’t have to do any more work, and you’ve optimized everything, so that, every component is doing the minimum that it needs to do, the minimum viable. Now that point there, you’ve got rid of all the interdependencies and all the other issues. So the way to look at it from a Spark perspective, (mouse clicks) and (mumbles) the summary is, that if Spark does data storage and analysis, your codes will do the data collection and the processing. And that’s the perfect split: there’s nothing left to add, there’s nothing left to take away.
Okay, so what we’re gonna cover, here, well, we’re gonna cover writing a custom datasource that’s gonna look at pcap. Now pcap is the standard way of collecting network packets for troubleshooting and analysis on a network. And usually those libraries are in C and C++, they’re written in native code. And they’re actually quite, it’s such a common format and a lot of I/O heavy that it’s quite a nuisance to port it to a new language because the libraries are usually heavily optimized. And it’s such a data format that’s used throughout the industry. So, what we’re gonna do first of all is, how do we wrap our native code in an auto-loading class? Because I said there’s gonna be native stuff. So we’re gonna create a class called PcapNative, that’s gonna basically put all this stuff, that’s all the natives code’s gonna be wrapped up in there, and then we’re gonna make the code look like Java to the rest of the system.
The next thing we’ll do is look at the basics of a datasource, “what sort of calls, do you need in the data source?” And, “how do you integrate it into your system?” And also, “how do you build a datasource?” as well.
Next thing we’re gonna look at is, I/O access patterns. So, “how do you access the data?” And, “how do you marshall it so that Spark can understand it “and make it look like a data frame?”
And then we’re gonna understand streaming flow control. So when we’re gonna look at a streaming source, and then we’ll look at how the data streams, and how we do flow control on that. And then finally, within streaming, it’s very important that you understand how to do a Singleton I/O Reader. Because you can’t keep looking at the data over and over again, sometimes you might wanna create perhaps a user thread, that’s written long lived user’s thread** that’s looking at the data, it might be listening on a network socket, it might open a file, and stream a little bit at a time. So it’s pretty important that you have a Singleton I/O reader rather than what’s a different one.** And it’s a clever trick that you can use to do that.
Okay, first off, how do we bridge C++ and Java in a single class? There’s a couple of libraries that we can use for that. One is Java CPlusPlus, which is a really good library for wrapping your codes, so they can access native code. And it creates a whole lot of boilerplate underneath it in C++, that’ll wrap that, very clever tool. The other thing that we want to do is, be able to load that in a shared library. And the clever thing about the native-lib-loader is that our shared library that holds all our C++ code, can be loaded automatically using the boilerplate code that you see there. Single line in a static block will actually automatically, bring in the native code from a jar file, which is pretty clever.
Quick tip from that on, Java CPP.
If you’re gonna do marshalling data it’s better to deal with the raw data types in C++, and then marshall it into Java using a wrapper one. So basically, you’ve got one class that bridges everything necessary (murmurs), the Java native class. You can see in the C++ side, anything that’s a native class has its analog in C++, and not just a low-level stuff. So let’s take the open file, for example, that’s gonna read in a path which is a string, you can see it matches and the Open File call in the other side, and then you’ve got a Read Buffer call as well and you can see a sign along on the other side there.
The other important part of this is the buildology of it. We use Maven to do the building, and you can actually bring in JavaCPP as a command-line on that, that will generate all your boilerplate codes. Then for C++, we’ve got an example here of the sort of thing that we are doing, which is using Cmake, that will compile the boilerplate codes, and then actually build a shared library. And you can see that’s gonna inject it in the byte plot the codes,*** then finally, we do Make to build that shared library, and then the compilation proceeds as normal. So by doing those compilations ahead of time, the integration is pretty seamless. And we only have to have one area where we change the code and change the interfaces, which is great.
Okay, let’s consider the datasource lifecycle. In your codes, what you want to do is basically use the Spark API, a nice succinct syntax, really great way and really consistent. So, what you would like in your code is basically, and this is piece of Python code, it wants to open that file called traffic.pcap. I’m gonna tell it’s a pcap datasource, which we’re gonna write custom hosts that, happen in the data source lifecycle. Well, first of all, the system needs to know that whatever’s called pcap, is your data source. And that’s done by having a datasource registration file in the Meta interface, and, it actually gives the class name of the pcap datasource which is cleverly called, PcapDataSource. How does it know that it’s associated with the name P-C-A-P? It’s the short name function call that does that.
When that happens, it creates something called a PcapFileReader, and then it creates some partitions.
Then you create, as many file partitions as you want, depends how much parallelism you want in the job, also depends on perhaps how many executors you have in the system. Could be how many threads you have in the system, the choice is yours. What you can also do, in the PcapFilePartition is have preferredLocations. You can actually see which nodes you want to send out to as well.
What the system then does, which is quite clever, is it looks at those locations and it then serializes them, sends across the wire to the Worker Nodes, and then it’s sent to the executors and then it’s deserialized on the other side, so it starts still actually the same object, and then it creates a PartitionReader on the other side.
So there’s a PartitionReader on the other side, and then that does the, basically, there’s an API that does the I/O in there, and the data’s sent back all the way back. So that’s the sort of lifecycle, it’s important to realize that there’s a serialization in the middle of that. So let’s have a look at what we have to do on the on a file datasource.
I’ve mentioned before that really the key part of this is the FilePartitionReader. Now remember, the FilePartitionReader is created on the Driver side, and then serialized, and then sent over to the executor. And that creates a bit of a problem to access the I/O resources. The reason is that normally you’d put your initialization in the Constructor, but the Constructor happens on the Driver side, so we need to do something that, does the initialization when you arrive on the Executor side. Now unfortunately, there’s no API call to do that. It’s a bit of an oversight in Spark API, but you can hack around that, by basically looking at the next call, which actually advances to get the next row into the table, and just have a little binary flags that you initialize there, so that the first, time you go in there, you call an init routine, which we’re doing here, and you can see it’s actually creating the PcapNative class that we’re gonna use to access our data. And you can see it calls down into the open file, so it initializes the native thing there. And then we can go on the next part of it, we do NextPacket, and then read that in. And then, in our case, if the length’s zero then you bail, otherwise you keep on going.
What happens after that, is the Next calls every time, “is there another row?”, “Is there another row?”, To actually read the row, you get the Get call, and, you need to fill in the blanks basically. So you need to create a structure called row, which is a GenericInternalRow class, and then fill in the blanks. In this case, column zero, is a String, column one, is a Timestamp, and the third column, second column, column three, if you like, is, the Link type. Okay, so that’s how you write a file datasource, to do a streaming source, you have the same sort of thing, but we have to do something else in the FileReader, sorry, on the StreamReader API. So let’s have a look at what we need to do in the streaming side.
So in the StreamReader, there’s another couple of calls. Normally we just call planInputPartitions.
On the streaming side, there’s a couple of extra calls because every time you’re gonna read in a partition, you want to advance things and make sure there’s been no errors as there’s a bit of flow control going on. And that’s done by a couple of calls, one of which is setOffsetRange, which is basically the system, updating your StreamingReader, to say, “okay, this is as far as it goes. “This is the last thing I’ve seen.” And you need to update those internal values as well. And it checks that, using a couple of calls called getStartOffset, and getEndOffset. Fortunately, within Spark, those classes are already defined. So it can be any structure you like, I’ve used Longs in this case. And what you also have to do is provide a deserialization function, because the communications happening between the driver and the executor, they have to be serialized. I’ve chosen to use JSON as the serialization function. So that’s the templates that you can work with and it works extremely well.
Now, we mentioned before, that within a streaming partition, you do not want to be creating a load of I/O resources, every time you read a partition, because you could be doing this every few seconds, and actually, you may want to keep some context as they’re there as well. So usually, it is a Singleton pattern, for example, it might be an I/O thread, you might have a socket opened as well. And this has already been experienced by the Kafka plugin. So there’s a reference there to what you have to do, and you can actually use the Spark infrastructure for that. So in this case, you actually keep a reference to the partition reader, inside the broadcast variable. And you do that,
when you create it in the Constructor there, which means that when you come to createPartitionReader, you basically just read that value there, so that makes sure that it happens across serialization and then executor.
So it’s created on the driver and then read in the executor. And keeps that global thing there. So, it means that it’s always the same, every time you create a new partition, which is exactly what you’re looking for.
So the use cases for that are your threads, it might be an I/O descriptor, it might be a piece of hardware, for example, that you wanna keep a File descriptor, or a Descriptor block too, or it might be some sort of middleware sync as well. So it’s a very common, pattern that is wise to follow, and you can see that Spark’s actually got some fairly good and portable ways of handling that.
So, in summary,
perfection’s reached not when there’s nothing left to add, but when there’s nothing left to take away. That’s what we’ve got to hear. Your code’s doing I/O, it’s doing high performance data processing, it’s got your crown jewels, your proprietary algorithms, your secret sauce, call it what you like, it’s all within your code, any innovation’s locked within your code as well, anything to do with proprietary hardware, or for that matter, driver hardware, is in your code, and you’ve already solved the problem, so you’ve got high productivity. So the ability to wrap that within a datasource is fantastic, you don’t have to rewrite things. Now what you get from Spark, is all the other good stuff, that you don’t have to retrofit to your codes, or use another framework for. So data storage, you can handle all the different data formats that you’ll ever want, data de-duplication’s handled for you, horizontal scaling’s handled for you through partitioning, data format translation’s handled as well and underlying schemas, if you want to move from JSON to CSV, no problem. Well clearly, you’ve got machine learning that’s part of Spark, and then you’ve got data resilience as well in terms of checkpointing. And you’ve got a documented and stable API, SDK as well. So once you’ve compliant to the datasource API, you’re gonna be there for the longer term. Okay, so, thanks for attention, but what we’re gonna transition to now, is we’ve got a demonstration. We’re gonna show some access to native code, then we’re gonna bring in a file demonstration, and then lastly, we’re gonna source stream our demonstration.
Okay, let’s show some auto-loading, and binding to native code in action. So I’ve got a test file here that’s actually gonna pull in a class which is gonna give us native access to the pcap access function, and we can see here that the first thing it does is open a file. And it takes in the name of the file here. Now it’s analog in Java world as this PcapNative class and you can see it’s got a method here, which is openFile, And then within C++ land, we’ve got an interface here, that’s gonna talk to the low-level, access functions in here and actually open it and return an integer. So the way that we build all this together is, a couple of things. On the C++ side, we’re actually gonna take in the PcapNative file here, which is the kinda header file,
so that we’ve got a well defined interface into C++ Library. And then we’re also gonna compile and link in these two, automatically generated and boilerplate code and that’s gonna be generated by a preprocessor, that’s gonna take in the Java file and link that one in, that’ll produce a library, and then we’ll produce a shared library, that’s gonna be actually pretty inside the jar file here. So, to orchestrate all that we’ve got a Maven file, and then we’ll use the the JavaCPP tool to do that. So, let’s run that once that’s finished. (types on keyboard)
Okay, so that’s all built that and it’s generated the boilerplate code, and now, we can go ahead and actually just add the compilation of the C++ code. (types on keyboard) (types on keyboard)
This is calling through all the sub-libraries, and then each sub-library is gonna create an archive, and then the archives are what are gonna get linked together with a boiler plate and outside code on that.
And then lastly, we do the link phase. And you can see, that we’ve linked in, all the Java libraries automatically. The libraries that are within the the archive itself. And, and we’ve also linked in to the pcap library as well, so it is just on everything automatically. What we can now do is compile all the Java bits as well.
(types on keyboard)
And we’ve created an archive : a JAR file that has everything in it. We can have a look at that, (types on keyboard)
And you can see that actually the shared library’s in here, so that when we run this file, it’ll automatically load that shared library, and then the code should work. So let’s see if it all works.
So, just created a little alias here for the code, so we’re basically gonna run Java, just add that JAR file to the class path and then run the native test and supply a test program to that.
So, it automatically loaded it. It’s got all the package there, and we can see we’ve read 21 frames, and we’ve got the various size of the packets there as well. We just, show there’s no cheating here. We can see that within the raw file, it says 21 frames and the first length is 84 and then 308, which matches that, all good.
And I’ve got a native class, which can access underlying libraries in C++. We can use that within a Spark datasource, and when we use things in Spark, we get the full Spark experience: really nice, tidy, and consistent syntax. So in this case, we’re gonna open up the pcap source, which is actually just gonna load that file, and then turn it into Spark data frame. So we’ve already seen the source code for this, and underlying that, we’re gonna have a Spark FilePartition, that’s gonna work with our native library class here, to do all the accesses. And gonna get loaded to the Spark datasource, it’s automatically gonna load the C++ shared library in there as well.
So let’s go ahead and run that, again, we’ve got an alias for it,
and this time, we’re gonna use spark-submit. And basically, the jar file has the data source and the test code. We’re gonna pass in the, the sctp.pcap file, and we just call the FileTest as a class. So, run that.
(types on keyboard)
So it’s brought in the FileReader, and it’s doing all the right things in Spark, and then if we even have a look at that, it’s come through as a Spark data frame, and, sure enough, we’ve got all the right figures there: 84 and 308, which remember from our native access is all correct, and then further down, we should see that we’ve read 21 frames, which corresponds to the count that we got back, so the data frame is exactly the same as the stuff we got from the native library before, all good again.
Okay, lastly, we’re gonna test the Spark Stream here. So we’ve got a Spark Streaming source, that’s gonna have pcap data, I’m gonna do synthetic data, and it’s gonna send 10 rows every time we pull it, or every time we generate a partition. We’re gonna see that we’re gonna trigger that, every five seconds, we’re gonna store the partition and append it to data store in an orc file, and then we’re also gonna show the first five lines of that
as well, and we’re gonna run this for about 20 seconds. Okay, so let let’s run that.
And you can see that it’s fired up, and we’re getting a report every time, we can see that the number of end rows there was 10, and you can see the start and end offset working as well, so the flow control’s working. And we can also see at the top of the screen that we’ve got the batches are working, and you can see that the lines are going there. Frame timestamps are approximately correct, and we’ve got the dummy data, and the count, there’s decrementing from nine down to zero, so all good. We can have a look
and see if we’ve got some data that’s been stored off, and it has.
So we’ve created that data store, and then we’ve got an orc file for each partition that was read, all good. Thanks for your attention. What we’ve covered here is taking a native library, which in this case was the Pcap library, then we’ve wrapped it up, so it’s been in the Spark infrastructure, and then file sourcing that so that we can read a pcap file, and then we’ve taken that on a stage further to be a streaming one. So I hope you’ve got an insight into actually how easy it is to use the Spark API for datasources, and hope you’ve been inspired to go out there, and try it yourself on your own native code, so that you can be able to ingest data directly into Spark, and then analyze it using all the great techniques and tools that are there. So again, thank you for your time, and don’t forget to rate and review the sessions, and don’t forget to remember that the channels are open for any chat as well.
Doug Carson is a Solutions Architect working for Illuminate Technologies in Edinburgh. During his career he has architected measurement and processing solutions for leading edge telecoms technologies such as intelligent networks, voice over packet, mobile networks and recently, virtualized networks. He holds nine patents in telecommunications protocol processing techniques. He has been using Spark since 2015 and is a co-author of a paper that applied genetic improvement to Spark queries presented at the BCS Real AI 2015 conference and GECCO '16. He is currently using Spark for cyber threat detection in our telecommunications infrastructure that employs custom datasources.