We’re excited to announce that MongoDB Atlas Stream Processing now supports Session Windows!
This powerful feature lets you build streaming pipelines that analyze and process related events that occur together over time, grouping them into meaningful sessions based on periods of activity. For instance, you can now track all of a customer’s interactions during a shopping journey, treating it as a single session that ends when they’re inactive for a specified period of time.
Whether you’re analyzing user behavior, monitoring IoT device activities, or tracking system operations, Atlas Stream Processing’s Session Windows make it easy to transform your continuous data streams into actionable insight, and make the data available wherever you need to use it.
What are Session Windows?
Session Windows are a powerful way to analyze naturally occurring activity patterns in your data by grouping related events that happen close together in time. Think of how users interact with websites or apps—they tend to be active for a period, then take breaks, then return for another burst of activity. Session Windows automatically detect these patterns by identifying gaps in activity, allowing you to perform aggregations and transformations on these meaningful periods of activity.
As an example, imagine you’re an e-commerce company looking to better understand what your customers do during each browsing session to help improve conversions.
With Atlas Stream Processing, you can build a pipeline that:
-
Collects all the product pages a user visits during their browsing session
-
Records the name, category, and price of each item viewed, plus whether items were added to a cart
-
Automatically considers a session complete after 15 minutes of user inactivity
-
Sends the session data to cloud storage to improve recommendation engines
With this pipeline, you provide your recommendation engine with ready-to-use data about your user sessions to improve your recommendations in real time. Unlike fixed time-based windows (tumbling or hopping), Session Windows adapt dynamically to each user’s behavior patterns.
How does it work?
Session Windows work similarly to the hopping and tumbling windows Atlas Stream Processing already supports, but with a critical difference: while those windows open and close on fixed time intervals, Session Windows dynamically adjust based on activity patterns.
To implement a Session Window, you specify three required components:
-
partitionBy: This is the field or fields that group your records into separate sessions. For instance, if tracking user sessions, use unique user IDs to ensure each user’s activity is processed separately.
-
gap: This is the period of inactivity that signals the end of a session. For instance, in the above example, we consider a user’s session complete when they go 15 minutes without clicking on a link in the website or app.
-
pipeline: These are the operations you want to perform on each session’s data. This may include counting the number of pages a user visited, recording the page they spent the most time on, or noting which pages were visited multiple times.
You then add this Session Window stage to your streaming aggregation pipeline, and Atlas Stream Processing continuously processes your incoming data, groups events into sessions based on your configuration, and applies your specified transformations. The results flow to your designated output destinations in real-time, ready for analysis or to trigger automated actions.
A quick example
Let’s say you want to build the pipeline that we mentioned above to track user sessions, notify them if they have items in their cart but haven’t checked out, and move their data downstream for analytics. You might do something like this:
1. Configure your source and sink stages
This is where you define the connections to any MongoDB or external location you intend to receive data from (source) or send data to (sink).
// Set your source to be change streams from the pageViews, cartItems, and orderedItems collections
let sourceCollections = {
 $source: {
 connectionName: "ecommerce",
 "db": "customerActivity",
 "coll": ["pageViews", "cartItems", "orderedItems"]
 }
}


// Set your destination (sink) to be the userSessions topic your recommendation engine consumes data from
let emitToRecommendationEngine = {
 $emit: {
 connectionName: "recommendationEngine",
 topic: "userSessions",
 }
};

// Create a connection to your sendCheckoutReminder Lambda function that sends a reminder to users to check out if they have items in their cart when the session ends
let sendReminderIfNeeded = {
 $externalFunction: {
 "connectionName": "operations",
 "as": "sendCheckoutReminder",
 "functionName": "arn:aws:lambda:us-east-1:123412341234:function:sendCheckoutReminder"
 }
 }

2. Define your Session Window logic
This is where you specify how data will be transformed in your stream processing pipeline.
// Step 1. Create a stage that pulls only the fields you care about from the change logs.
// Every document will have a userId and itemId as all collections share that field. Fields not present will be null.
let extractRelevantFields = {
 $project: {
 userId: "$fullDocument.userId",
 itemId: "$fullDocument.itemId",
 category: "$fullDocument.category",
 cost: "$fullDocument.cost",
 viewedAt: "$fullDocument.viewedAt",
 addedToCartAt: "$fullDocument.addedToCartAt",
 purchasedAt: "$fullDocument.purchasedAt"
 }
};

// Step 2. By setting _id to $userId this group all the documents by the userId. Fields not present in any records will be null.
let groupSessionData = {
 $group: {
 _id: "$userId",
 itemIds: { $addToSet: "$itemId" },
 categories: { $addToSet: "$category" },
 costs: { $addToSet: "$cost" },
 viewedAt: { $addToSet: "$viewedAt" },
 addedToCartAt: { $addToSet: "$addedToCartAt" },
 purchasedAt: { $addToSet: "$purchasedAt" }
 }
};

// Step 3. Create a session window that closes after 15 minutes of inactivity. The pipeline specifies all operations to be performed on documents sharing the same userId within the window.
let createSession = { $sessionWindow: {
 partitionBy: "$_id",
 gap: { unit: "minute", size: 15},
 pipeline: [
 groupSessionData
 ]
}};

3. Create and start your stream processor
The last step is simple: create and start your stream processor.
// Create your pipeline array. The session data will be sent to the external function defined in sendReminderIfNeeded, and then it will be emitted to the recommendation engine Kafka topic.
finalPipeline = [
 sourceCollections,
 extractRelevantFields,
 createSession,
 sendReminderIfNeeded,
 emitToUserSessionTopic
];

// Create your stream processor
sp.createStreamProcessor("userSessions", finalPipeline)

// Start your stream processor
sp.userSessions.start()

And that’s it! Your stream processor now runs continuously in the background with no additional management required. As users navigate your e-commerce website, add items to their carts, and make purchases, Atlas Stream Processing automatically:
-
Tracks each user’s activity in real-time
-
Groups events into meaningful sessions based on natural usage patterns
-
Closes sessions after your specified period of inactivity (15 minutes)
-
Triggers reminders for users with abandoned carts
-
Delivers comprehensive session data to your analytics systems
All of this happens automatically at scale without requiring ongoing maintenance or manual intervention. Session Windows provide powerful, activity-based data processing that adapts to users’ behavioral patterns rather than forcing their actions into arbitrary time buckets.
Ready to get started? Log in or sign up for Atlas today to create stream processors. You can learn more about Session Windows or get started using our tutorial.
Source: Read More