This is a guest authored post by Yu Chen, Senior Software Engineer, Headspace. Headspace’s core products are iOS, Android and web-based apps that focus on improving the health and happiness of its users through mindfulness, meditation, sleep, exercise and focus content. Machine learning (ML) models are core to our user experiences by offering recommendations […]
This is a guest authored post by Yu Chen, Senior Software Engineer, Headspace.
Headspace’s core products are iOS, Android and web-based apps that focus on improving the health and happiness of its users through mindfulness, meditation, sleep, exercise and focus content. Machine learning (ML) models are core to our user experiences by offering recommendations that engage users with new relevant, personalized content that builds consistent habits in their lifelong journey.
Data fed to ML models is often most valuable when it can be immediately leveraged to make decisions in the moment, but, traditionally, consumer data is ingested, transformed, persisted and sits dormant for lengthy periods of time before machine learning and data analytics teams leverage it.
Finding a way to leverage user data to generate real-time insights and decisions means that consumer-facing products like the Headspace app can dramatically shorten the end-to-end user feedback loop: actions that users perform just moments prior can be incorporated into the product to generate more relevant, personalized and context-specific content recommendation for the user.
This means our ML models could incorporate dynamic features that update throughout the course of a user’s day, or even an individual session. Examples of these features include:
With the user experience in mind, the Headspace Machine Learning team architected a solution by decomposing the infrastructure systems into modular Publishing, Receiver, Orchestration and Serving layers. The approach leverages Apache Spark™, Structured Streaming on Databricks, AWS SQS, Lambda and Sagemaker to deliver real-time inference capabilities for our ML models.
In this blog post, we provide a technical deep dive into our architecture. After describing our requirements for real-time inference, we discuss challenges adapting traditional, offline ML workflows to meet our requirements. We then give an architecture overview before discussing details of key architectural components.
In order to facilitate real-time inference that personalizes users’ content recommendations, we need to
Our ballpark end-to-end latency target (from user event forwarded to Kinesis stream to real-time inference prediction available) was 30 seconds.
The above requirements are problems that are often not solved (and don’t need to be solved) with offline models that serve daily batch predictions. ML models that make inferences from records pulled and transformed from an ELT / ETL data pipeline usually have lead times of multiple hours for raw event data. Traditionally, an ML model’s training and serving workflow would involve the following steps, executed via periodic jobs that run every few hours or daily:
Users generate events by performing actions inside of their Headspace app — these are ultimately forwarded to our Kinesis Streams to be processed by Spark Structured Streaming. User apps fetch the near real-time predictions by making RESTful HTTP requests to our backend services, passing along their user IDs and feature flags to indicate which type of ML recommendations to send back. The other components of the architecture will be described in more detail below.
ML models are developed in Databricks Notebooks and evaluated via MLflow experiments on core offline metrics such as recall at k for recommendation systems. The Headspace ML team has written wrapper classes that extend the base Python Function Model Flavor class in MLflow:
# this MLflow context manager allows experiment runs (parameters and metrics) to be tracked and easily queryable with MLModel.mlflow.start_run() as run: # data transformations and feature pre-processing code omitted (boiler-plate code) ... # model construction lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42) # training lr.fit(train_x, train_y) # evaluate the model performance predicted_qualities = lr.predict(test_x) (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities) # Wrap the model in our custom wrapper class model = ScikitLearnModel(lr) model.log_params(...) model.log_metrics(...) # record the results of the run in ML Tracking Server # optionally save model artifacts to object store and register model (give it a semantic version) # so it can be built into a Sagemaker-servable Docker image model.save(register=True)
The Headspace ML team’s model wrapper class invokes MLflow’s own save_model method to perform much of the implementation logic, creating a directory in our ML Models S3 bucket that contains the metadata, dependencies and model artifacts needed to build an MLflow model Docker image:
We can then create a formal Github Release that points to the model we just saved in S3. This can be picked up by CI/CD tools such as CircleCI that test and build MLflow model images that are ultimately pushed to AWS ECR, where they are deployed onto Sagemaker model endpoints.
Updating and reloading real-time models
We retrain our models frequently, but updating a real-time inference model in production is tricky. AWS has a variety of deployment patterns (gradual rollout, canary, etc.) that we can leverage to update the actual served Sagemaker model. However, real-time models also require in-sync online feature stores, which — given the size of Headspace’s user base, can take up to 30 minutes to fully update. Given that we do not want downtime each time we update our model images, we need to be careful to ensure we synchronize our feature store with our model image.
Take, for example, a model that maps a Headspace user ID to a user sequence ID as part of a collaborative filtering model — our feature stores must contain the most updated mapping of user ID to sequence ID. Unless user populations remain completely static, if we only update the model, our user IDs will be mapped to stale sequence ID at inference time, resulting in the model generating a prediction for a random user instead of the target user.
To address this issue, we can adopt a blue-green architecture that follows from the DevOps practice of blue-green deployments. The workflow is illustrated below:
Headspace user event actions (logging into the app, playing a specific piece of content, renewing a subscription, searching for content, etc.) are aggregated and forwarded onto Kinesis Data Streams (Step 1 in diagram). We leverage the Spark Structured Streaming framework on top of Databricks to consume from these Kinesis Streams. There are several benefits to Structured Streaming, including that it:
Structured Streaming uses micro-batching to break up the continuous stream of events into discrete chunks, processing incoming events in small micro-batch dataframes.
Streaming data pipelines must differentiate between event-time (when the event actually occurs on the client device) and processing-time (when the data is seen by servers). Network partitions, client-side buffering and a whole host of other issues can introduce non-trivial discrepancies between these two timestamps. The Structured Streaming API allows simple customization of logic to handle these discrepancies:
df.withWatermark("eventTime", "10 minutes") .groupBy( "userId", window("eventTime", "10 minutes", "5 minutes"))
We configure the Structured Streaming Job with the following parameters:
Using Scheduled Job clusters significantly reduces compute DBU costs while also mitigating the likelihood of correlated infrastructure failures. Jobs that run on a faulty cluster—perhaps with missing/incorrect dependencies, instance profiles or overloaded availability zones—will fail until the underlying cluster issue is fixed, but separating jobs across clusters prevents interference.
We then point the stream query to read from a specially configured Amazon Kinesis Stream that aggregates user client-side events (Step 2 of diagram). The stream query can be configured using the following logic:
processor = RealTimeInferenceProcessor() query = df.writeStream .option(“checkpointLocation”, “dbfs://pathToYourCheckpoint”) .foreachBatch(processor.process_batch) .outputMode("append") .start()
Here, outputMode defines the policy for how data is written to a streaming sink and can take on three values: append, complete and update. Since our Structured Streaming Job is concerned with handling incoming events, we select append to only process “new” rows.
It is a good idea to configure a checkpoint location to gracefully restart a failed streaming query, allowing “replays” which pick back up processing just before the failure.
Depending on the business use case, we can also choose to reduce latency by setting the argument to processingTime = “0 seconds”, which starts each micro-batch as soon as possible:query = df.writeStream .option(“checkpointLocation”, “dbfs://pathToYourCheckpoint”) .foreachBatch(process_batch) .outputMode("append") .trigger(processingTime = “0 seconds”) .start()
In addition, our Spark Structured Streaming job cluster assumes a special EC2 Instance Profile with the appropriate IAM policies to interact with AWS Sagemaker Feature Groups and put messages onto our prediction job SQS queue.
Ultimately, since each Structured Streaming job incorporates different business logic, we will need to implement different micro-batch processing functions that will be invoked once per micro-batch.
In our case, we’ve implemented a process_batch method that first computes/updates online features on AWS Sagemaker Feature Store, and then forwards user events to the job queue (Step 3):from pyspark.sql.dataframe import DataFrame as SparkFrame class RealTimeInferenceProcessor(Processor): def __init__(self): self.feature_store = initialize_feature_store() def process_batch(self, df: SparkFrame, epochID: str) -> None: """ Concrete implementation of the stream query’s micro batch processing logic. Args: df (SparkFrame): The micro-batch Spark DataFrame to process. epochID (str): An identifier for the batch. """ compute_online_features(df, self.feature_store) forward_micro_batch_to_job_queue(df)
The orchestration layer: decoupled event queues and Lambdas as feature transformers
Headspace users produce events that our real-time inference models consume downstream to make fresh recommendations. However, user event activity volume is not uniformly distributed. There are various peaks and valleys — our users are often most active during specific times of day.
Messages that are placed into the SQS prediction job queue are processed by AWS Lambda functions (Step 4 in diagram), which performs the following steps:
Pull: This method involves persisting the final recommended content to our internal Prediction Service, which is responsible for ultimately supplying users with their updated personalized content for many of the Headspace app’s tabs upon client app request. Below is an example experiment using real-time inference infrastructure that allows users to fetch personalized recommendations from the app’s Today tab:
Push: This method involves placing the recommendation onto another SQS queue for push notifications or in-app modal content recommendations. See the images below for examples of (above) in-app modal push recommendations triggered from a user recent search for sleep content and (below) iOS push notification from a recent user content completion:
Within minutes of completing a specific meditation or performing a search, these push notifications can serve a relevant next piece of content while the context is still top of mind for the user.
In addition, utilizing this event queue allows prediction job requests to be retried — a small visibility timeout window (10-15 seconds) for the SQS queue can be set so that if a prediction job is not completed within that time window, another Lambda function is invoked to retry.
From an infrastructure and architecture perspective, a key learning is prioritizing designing flexible hand-off points between different services — in our case, the Publishing, Receiver, Orchestrator, and Serving layers. For instance,
Proactively addressing these questions will help decouple the various components of a complex ML architecture into smaller, modular sets of infrastructure.
The Headspace ML team is still rolling out production use cases for this infrastructure, but initial A/B tests and experiments have seen strong lifts in content start rates, content completion rates and direct/total push open rates relative to both other Headspace initiatives and industry benchmarks.
By leveraging models capable of real-time inference, Headspace significantly reduces the end-to-end lead time between user actions and personalized content recommendations. Events stream — recent searches, content starts/exits/pauses, in-app navigation actions, even biometric data — within the current session can all be leveraged to constantly update the recommendations we serve to users while they are still interacting with the Headspace application.
Learn more about Headspace at www.headspace.com.
Try Databricks for free. Get started today.
The post Infrastructure Design for Real-time Machine Learning Inference appeared first on Databricks.