If you are here, chances are that you already know Apache Spark. This article builds on the assumption that the reader is familiar with the world of analysing large amounts of data in parallel. In that world, Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.
Augmenting your existing data, be it web logs with user agents or an analytics Event Streaming application, with Device Intelligence is a way to produce exciting new analysis and data visualizations, and ultimately gain new insight into how users are taking advantage of your service.
In this article, we will show you how you can integrate WURFL’s device detection with SparkStreaming to open up to a host of new possibilities.
We will focus on the WURFL Microservice API specifically. If you are in a hurry to give the solution we present here a spin, you can obtain WURFL Microservice from the three major Cloud Providers (i.e. AWS, Azure and GCP) marketplaces without need to engage your procurement department. Alternatively, you can contact ScientiaMobile for a trial version of our WURFL Microservice for Docker (references later).
Note: If you already own a license to the WURFL OnSite Java API, you should be able to adapt the code with little effort. You can contact ScientiaMobile through your dedicated support channel if in doubt.
This document hinges on a demo application that exemplifies the salient aspects of WURFL integration with Spark.
Here are the prerequisites for creating and running the application.
- Spark 2.x and above.
- Java 8 and above.
- The netcat command.
- A WURFL Microservice instance, obtainable from the AWS, Microsoft Azure, the Google Cloud Platform marketplaces, or directly from ScientiaMobile in the form of a Docker image.
- WURFL Microservice for AWS, Getting Started Guide.
AMIs available here. - WURFL Microservice for Azure, Getting Started Guide. VMs available here.
- WURFL Microservice for GCP, Getting Started Guide. VMs available here.
- WURFL Microservice for AWS, Getting Started Guide.
The demo application shows the use of WURFL Microservice (client and server) to add Device Detection to a Spark Streaming Java app.
Here is the application workflow:
Getting User Agents into Spark Streaming
We will use the netcat command to pipe HTTP request headers (including user agents) in JSON format to our application via a socket bound on port 9999.
Note (taken from Wikipedia): netcat (often abbreviated to nc) is a computer networking utility for reading from and writing to network connections using TCP or UDP. The command is designed to be a dependable back-end that can be used directly or easily driven by other programs and scripts. At the same time, it is a feature-rich network debugging and investigation tool, since it can produce almost any kind of connection its user could need and has a number of built-in capabilities.
The JSON input looks something like this (content is always a JSON array):
[{ "Save-Data":"on", "Accept-Language":"en", "Accept-Encoding":"gzip, deflate", "X-Operamini-Features":"advanced, camera, download, file_system, folding, \ httpping, pingback, routing, touch, viewport", "X-Forwarded-For":"103.38.89.102, 141.0.8.173", "Accept":"text/html, application/xml;q=0.9, \ application/xhtml+xml, image/png, image/webp, image/jpeg, \ image/gif, image/x-xbitmap, */*;q=0.1", "User-Agent":"Opera/9.80 (Android; Opera Mini/39.1.2254/163.76; U; en) \ Presto/2.12.423 Version/12.16", "X-Operamini-Phone-Ua":"Mozilla/5.0 (Linux; Android 9; moto g(6) plus \ Build/PPWS29.116-16-15; wv) AppleWebKit/537.36 (KHTML, like Gecko) \ Version/4.0 Chrome/73.0.3683.90 Mobile Safari/537.36", "Device-Stock-Ua":"Mozilla/5.0 (Linux; Android 9; moto g(6) plus \ Build/PPWS29.116-16-15; wv) AppleWebKit/537.36 (KHTML, like Gecko) \ Version/4.0 Chrome/73.0.3683.90 Mobile Safari/537.36", "Event":"VIDEO_OK", "Video-id":"TPhZnruRPsM", "Forwarded":"for=\"103.38.89.102:49931\"" }]
Note: The User-Agent HTTP header (occasionally accompanied by other headers, as in this case) is the standard way for a device to signal information about itself to the HTTP server. For historical reasons, those headers are not simple to decipher across the board.
The application uses the headers contained in the JSON data to perform a device detection and enrich that same JSON stream with additional information such as device brand, device model and operating system. The console prints out detection data along with aggregated info count of how many devices were detected for each brand.
The application output in the console will look like this:
Get the demo application
You can find the Spark Streaming application code here
https://github.com/WURFL/wurfl-microservice-examples/tree/master/event-streams-spark_java
Compile the application
From the example application root directory, run:
event-stream-spark-java-demo/<version_number>/\ event-stream-spark-java-demo-<version_number>-jar-with-dependencies.jar
Installing and running the application on Spark
To install and run the app on a standalone Spark installation, run the following command (for Spark cluster installation please check Spark documentation)
<SPARK_HOME>/bin/spark-submit \
--class com.scientiamobile.wurflmicroservice.eventprocessor.SparkProcessor
--master local \
--deploy-mode client <path_to>/event-stream-spark-java-demo-<version>-jar-with-dependencies.jar
<WURFL Microservice IP address>
Sending data to the Spark application
cd
into the project root directory and run the following command:
nc -lk 9999> < ../event-streams-java/event_streams_min_compr.json
This will send the content of the JSON file to a socket that is waiting for our data . Repeat this command two or three times to see the results appear in the console. In order for the example to work, the JSON file should contain an array per line to play well with the netcat
command (see example here).
Note: our application processes the data it receives at 30 second intervals. This means that you may have to wait a few moments before the console shows some action.
Application code
Spark can run applications written in multiple programming languages: Scala ( Spark’s “language of choice”), Java, Microsoft .NET and Python. For the purposes of this tutorial, we picked the Java WURFL Microservice client API. As you are probably aware, Spark Scala applications can leverage Java libraries like our WURFL Microservice Java client for use from within a Scala application as well.
Let’s take a look at the class SparkProcessor.java
package com.scientiamobile.wurflmicroservice.eventprocessor; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.scientiamobile.wurfl.wmclient.Model; import com.scientiamobile.wurfl.wmclient.WmClient; import com.scientiamobile.wurfl.wmclient.WmException; import org.apache.spark.*; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*;
We import the WURFL Microservice, Spark Streaming and GSON API libraries. GSON is a JSON parser for Java.
As Spark starts the application, certain self-configuration steps will take place. The following code sets up the following information:
- The application name.
- The time interval at which it must process the JSON data it receives.
- Process will run on a local Spark server using up to 4 threads.
All configurations are enclosed in a context object (jssc) that handles the application lifecycle.
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("WurflDeviceDetection"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(30));
We then create a receiver socket (listening on RECEIVER_HOST:RECEIVER_PORT) to which JSON event data will be streamed.
JavaReceiverInputDStream stream = \ jssc.socketTextStream(RECEIVER_HOST, RECEIVER_PORT); // localhost,9999
The stream object is the Java implementation of a Spark Dstream, a “discretized stream” (Spark lingo) of data for a given time interval (streamed data are Spark Resilient Distribution Data Sets (RDDs), more about them here)
Before performing a device detection, we need to parse the input JSON data and turn it into Java objects:
JavaDStream<EnrichedEventData[]> events = stream.map(s -> { // We use a Gson parser to parse events into Java objects Gson gson = new GsonBuilder().setPrettyPrinting().create(); return gson.fromJson(new StringReader(s), EnrichedEventData[].class); });
The stream.map(…) operation maps each element of the data stream to the result of the inlined function, in our case a Java array of EnrichedEventData objects that holds the parsed JSON data. At this stage, the objects still hold empty values for the WURFL properties, as the ‘enrichment’ phase has not happened yet. It takes place in the code below:
JavaDStream enrichedEvents = events.map(evs -> { WmClient wmClient = WmClientProvider.getOrCreate(wmServerHost, "80"); for (EnrichedEventData evItem : evs) { ... HttpServletRequestMock request = new HttpServletRequestMock(evItem.getHeaders()); Model.JSONDeviceData device = wmClient.lookupRequest(request); evItem.setWurflCompleteName(device.capabilities.get("complete_device_name")); evItem.setWurflDeviceMake(device.capabilities.get("brand_name")); evItem.setWurflDeviceModel(device.capabilities.get("model_name")); evItem.setWurflFormFactor(device.capabilities.get("form_factor")); evItem.setWurflDeviceOS(device.capabilities.get("device_os") + " " + \ device.capabilities.get("device_os_version")); ... } return evs; });
The events Dstream that we got after JSON parsing is now enriched with a new anonymous function that dynamically invokes the WURFL Microservice Client API. The API relies on a HttpServletRequest mock implementation (initialized with the input data) to perform device detection.
Note: we could have taken the User-Agent header as the key for our Device Detection. We elected to provide the WURFL API with all the headers. Let’s just say for now that this is more “future-proof”.
The getOrCreate method creates the WM client with the given host and port to connect to, or just returns it if it has already been created. Creating a new WM client for each operation is time consuming and would pretty much knock out its caching layer. We address the problem with the introduction of a “pseudo-singleton”. A ‘production’ Spark cluster is likely to run multiple instances of the WURFL Microservice client, one for each JVM on which the cluster runs on.
The event.setWurfl* properties are set by querying the capabilities of the device detection data using method device.capabilities.get(<capability_name>)
More on WURFL capabilities here
Having performed device detection on all elements of our dstream, we can print some device data and count how many devices of each brand (Motorola, Apple, Samsung, etc.) have interacted with the service:
enrichedEvents.foreachRDD(evList -> { Map bcount = new HashMap<>(); evList.foreach(eev -> { for (EnrichedEventData e : eev) { ... System.out.println("Complete device name: " + e.getWurflCompleteName()); System.out.println("Device OS & version: " + e.getWurflDeviceOS()); System.out.println("Device form factor: " + e.getWurflFormFactor()); … if (!bcount.containsKey(e.getWurflDeviceMake())) { bcount.put(e.getWurflDeviceMake(), 0); } bcount.put(e.getWurflDeviceMake(), bcount.get(e.getWurflDeviceMake()) + 1); } System.out.println("-- BRAND COUNT ---"); bcount.forEach((k, v) -> { System.out.println(k + ": " + v); }); ... }); });
For sake of this tutorial, detection count is computed at RDD level, which means that detections are counted for every chunk of data sent to Spark in each execution cycle. The sum of all detections from the application start is not counted. You can code this enhancement as an exercise in your free time.
Finally, to get the party started, we must tell the application to start receiving data streams This is easily achieved:
jssc.start(); jssc.awaitTermination();
The methods are pretty self-explanatory: the first starts the streaming context, allowing data to flow and be processed by the application. The second waits for the stream to terminate.
Conclusions: Spark Streaming as Device and User Analytics Tool
Apache Spark has been used as the in memory data engine for big data analytics, streaming data and heavy-duty data analysis workloads. Thanks to WURFL, Spark can now process web data to add a new dimension to data analysis and understanding of how end-users are adopting a service.