Close Menu
    DevStackTipsDevStackTips
    • Home
    • News & Updates
      1. Tech & Work
      2. View All

      Error’d: Pickup Sticklers

      September 27, 2025

      From Prompt To Partner: Designing Your Custom AI Assistant

      September 27, 2025

      Microsoft unveils reimagined Marketplace for cloud solutions, AI apps, and more

      September 27, 2025

      Design Dialects: Breaking the Rules, Not the System

      September 27, 2025

      Building personal apps with open source and AI

      September 12, 2025

      What Can We Actually Do With corner-shape?

      September 12, 2025

      Craft, Clarity, and Care: The Story and Work of Mengchu Yao

      September 12, 2025

      Cailabs secures €57M to accelerate growth and industrial scale-up

      September 12, 2025
    • Development
      1. Algorithms & Data Structures
      2. Artificial Intelligence
      3. Back-End Development
      4. Databases
      5. Front-End Development
      6. Libraries & Frameworks
      7. Machine Learning
      8. Security
      9. Software Engineering
      10. Tools & IDEs
      11. Web Design
      12. Web Development
      13. Web Security
      14. Programming Languages
        • PHP
        • JavaScript
      Featured

      Using phpinfo() to Debug Common and Not-so-Common PHP Errors and Warnings

      September 28, 2025
      Recent

      Using phpinfo() to Debug Common and Not-so-Common PHP Errors and Warnings

      September 28, 2025

      Mastering PHP File Uploads: A Guide to php.ini Settings and Code Examples

      September 28, 2025

      The first browser with JavaScript landed 30 years ago

      September 27, 2025
    • Operating Systems
      1. Windows
      2. Linux
      3. macOS
      Featured
      Recent
    • Learning Resources
      • Books
      • Cheatsheets
      • Tutorials & Guides
    Home»Development»Databases»MongoDB Atlas Stream Processing Now Supports Session Windows!

    MongoDB Atlas Stream Processing Now Supports Session Windows!

    May 29, 2025

    We’re excited to announce that MongoDB Atlas Stream Processing now supports Session Windows!

    This powerful feature lets you build streaming pipelines that analyze and process related events that occur together over time, grouping them into meaningful sessions based on periods of activity. For instance, you can now track all of a customer’s interactions during a shopping journey, treating it as a single session that ends when they’re inactive for a specified period of time.

    Whether you’re analyzing user behavior, monitoring IoT device activities, or tracking system operations, Atlas Stream Processing’s Session Windows make it easy to transform your continuous data streams into actionable insight, and make the data available wherever you need to use it.

    What are Session Windows?

    Session Windows are a powerful way to analyze naturally occurring activity patterns in your data by grouping related events that happen close together in time. Think of how users interact with websites or apps—they tend to be active for a period, then take breaks, then return for another burst of activity. Session Windows automatically detect these patterns by identifying gaps in activity, allowing you to perform aggregations and transformations on these meaningful periods of activity.

    As an example, imagine you’re an e-commerce company looking to better understand what your customers do during each browsing session to help improve conversions.

    With Atlas Stream Processing, you can build a pipeline that:

    1. Collects all the product pages a user visits during their browsing session

    2. Records the name, category, and price of each item viewed, plus whether items were added to a cart

    3. Automatically considers a session complete after 15 minutes of user inactivity

    4. Sends the session data to cloud storage to improve recommendation engines

    With this pipeline, you provide your recommendation engine with ready-to-use data about your user sessions to improve your recommendations in real time. Unlike fixed time-based windows (tumbling or hopping), Session Windows adapt dynamically to each user’s behavior patterns.

    How does it work?

    Session Windows work similarly to the hopping and tumbling windows Atlas Stream Processing already supports, but with a critical difference: while those windows open and close on fixed time intervals, Session Windows dynamically adjust based on activity patterns.

    To implement a Session Window, you specify three required components:

    • partitionBy: This is the field or fields that group your records into separate sessions. For instance, if tracking user sessions, use unique user IDs to ensure each user’s activity is processed separately.

    • gap: This is the period of inactivity that signals the end of a session. For instance, in the above example, we consider a user’s session complete when they go 15 minutes without clicking on a link in the website or app.

    • pipeline: These are the operations you want to perform on each session’s data. This may include counting the number of pages a user visited, recording the page they spent the most time on, or noting which pages were visited multiple times.

    You then add this Session Window stage to your streaming aggregation pipeline, and Atlas Stream Processing continuously processes your incoming data, groups events into sessions based on your configuration, and applies your specified transformations. The results flow to your designated output destinations in real-time, ready for analysis or to trigger automated actions.

    A quick example

    Let’s say you want to build the pipeline that we mentioned above to track user sessions, notify them if they have items in their cart but haven’t checked out, and move their data downstream for analytics. You might do something like this:

    1. Configure your source and sink stages

    This is where you define the connections to any MongoDB or external location you intend to receive data from (source) or send data to (sink).

    // Set your source to be change streams from the pageViews, cartItems, and orderedItems collections
let sourceCollections = {
    $source: {
        connectionName: "ecommerce",
        "db": "customerActivity",
        "coll": ["pageViews", "cartItems", "orderedItems"]
     }
}


// Set your destination (sink) to be the userSessions topic your recommendation engine consumes data from
let emitToRecommendationEngine = {
    $emit: {
        connectionName: "recommendationEngine",
        topic: "userSessions",
    }
};

// Create a connection to your sendCheckoutReminder Lambda function that sends a reminder to users to check out if they have items in their cart when the session ends
let sendReminderIfNeeded = {
    $externalFunction: {
        "connectionName": "operations",
        "as": "sendCheckoutReminder",
        "functionName": "arn:aws:lambda:us-east-1:123412341234:function:sendCheckoutReminder"
    }
 }


    2. Define your Session Window logic

    This is where you specify how data will be transformed in your stream processing pipeline.

    // Step 1.  Create a stage that pulls only the fields you care about from the change logs.
// Every document will have a userId and itemId as all collections share that field.  Fields not present will be null.
let extractRelevantFields = {
   $project: {
       userId: "$fullDocument.userId",
       itemId: "$fullDocument.itemId",
       category: "$fullDocument.category",
       cost: "$fullDocument.cost",
       viewedAt: "$fullDocument.viewedAt",
       addedToCartAt: "$fullDocument.addedToCartAt",
       purchasedAt: "$fullDocument.purchasedAt"
   }
};

// Step 2.  By setting _id to $userId this group all the documents by the userId.  Fields not present in any records will be null.
let groupSessionData = {
   $group: {
       _id: "$userId",
       itemIds: { $addToSet: "$itemId" },
       categories: { $addToSet: "$category" },
       costs: { $addToSet: "$cost" },
       viewedAt: { $addToSet: "$viewedAt" },
       addedToCartAt: { $addToSet: "$addedToCartAt" },
       purchasedAt: { $addToSet: "$purchasedAt" }
   }
};

// Step 3.  Create a session window that closes after 15 minutes of inactivity.  The pipeline specifies all operations to be performed on documents sharing the same userId within the window.
let createSession = { $sessionWindow: {
   partitionBy: "$_id",
   gap: { unit: "minute", size: 15},
   pipeline: [
       groupSessionData
   ]
}};


    3. Create and start your stream processor

    The last step is simple: create and start your stream processor.

    //  Create your pipeline array.  The session data will be sent to the external function defined in sendReminderIfNeeded, and then it will be emitted to the recommendation engine Kafka topic.
finalPipeline = [
   sourceCollections,
   extractRelevantFields,
   createSession,
   sendReminderIfNeeded,
   emitToUserSessionTopic
];

// Create your stream processor
sp.createStreamProcessor("userSessions", finalPipeline)

// Start your stream processor
sp.userSessions.start()


    And that’s it! Your stream processor now runs continuously in the background with no additional management required. As users navigate your e-commerce website, add items to their carts, and make purchases, Atlas Stream Processing automatically:

    • Tracks each user’s activity in real-time

    • Groups events into meaningful sessions based on natural usage patterns

    • Closes sessions after your specified period of inactivity (15 minutes)

    • Triggers reminders for users with abandoned carts

    • Delivers comprehensive session data to your analytics systems

    All of this happens automatically at scale without requiring ongoing maintenance or manual intervention. Session Windows provide powerful, activity-based data processing that adapts to users’ behavioral patterns rather than forcing their actions into arbitrary time buckets.

    Ready to get started? Log in or sign up for Atlas today to create stream processors. You can learn more about Session Windows or get started using our tutorial.

    Source: Read More

    Facebook Twitter Reddit Email Copy Link
    Previous ArticleInclusivity with Voice & Language [SUBSCRIBER]
    Next Article Ecosystem Partnerships: Driving Mainframe Innovation and Future-Ready Solutions

    Related Posts

    Development

    Using phpinfo() to Debug Common and Not-so-Common PHP Errors and Warnings

    September 28, 2025
    Development

    Mastering PHP File Uploads: A Guide to php.ini Settings and Code Examples

    September 28, 2025
    Leave A Reply Cancel Reply

    For security, use of Google's reCAPTCHA service is required which is subject to the Google Privacy Policy and Terms of Use.

    Continue Reading

    How to Budget Smartly for Your First AI Project: A Step-by-Step Guide

    Web Development

    Discriminating Form and Meaning in Multilingual Models with Minimal-Pair ABX Tasks

    Machine Learning

    Domain Setup and Mail Flow Configuration in Microsoft 365

    Development

    Cisco Warns of Critical ISE Flaw Allowing Unauthenticated Attackers to Execute Root Code

    Security

    Highlights

    Machine Learning

    How to Use Git and Git Bash Locally: A Comprehensive Guide

    April 1, 2025

    Table of contents Introduction Installation Windows macOS Linux Verifying Installation Git Bash Basics Navigation Commands…

    Chinese Smishing Kit Powers Widespread Toll Fraud Campaign Targeting U.S. Users in 8 States

    April 18, 2025

    I’m A Mommy Mamacita Shirt

    July 18, 2025

    Implementing End-to-End Testing Using Playwright within Jenkins CI/CD Pipelines

    July 15, 2025
    © DevStackTips 2025. All rights reserved.
    • Contact
    • Privacy Policy

    Type above and press Enter to search. Press Esc to cancel.