Visualizing MongoDB Streaming Data

Large stream through forest
Joao Branco @ unsplash.com

MongoDB Change Stream, Socket.io, & Plotly.js

DRAFT

In this era of big data and AI, it’s easy to overlook that seeing the here and now is really all you might want.

Quickstart: go to github.io/ravenOSS/ and download the repo. Use the readme instructions to fire up the app.

MongoDB Datastore

Using MongoDB as your datastore, you can employ change streams, a feature released with Mongo 3.6, to capture events such inserts or updates, but to also, for example, use filtering to detect data thresholds to be acted upon. Change streams sets up a watch on the operations log of a Mongo replicaset. The oplog is the record of operations on the databases/collections and a replicaset is an instance of three or more Mongodb nodes that assures data durability.

This Tech Note describes how to use node.js to build a server, configure a change stream and then push data to Plotly.js in the frontend for data visualization. The code has been whittled down to expose the app logic but does provide core functionality upon which to build real apps.

Mongodb Replicaset

First of all we need a Mongdb replicaset. The easiest way to do this is to setup a free Atlas cloud account at Mongodb.com. See this Tech Note. The alternative is to configure a replicaset on your local machine. Use this Tech Note. The ‘example.env in the repo needs to renamed ‘.env’ and the URL edited to your own connection string.

Data Generator

Next requirement for testing is a data source. For this we’ll use a simple data generator that produces a timestamp and a random number for data. The moment.js library is used for time. Note that while we produce a timestamp, for memory or network constrained devices where we don’t need really accurate time data, the insert time can be extracted from the database document object._id.

// Stream data to MongoDB
const dotenv = require('dotenv').config()
const MongoClient = require('mongodb').MongoClient
const assert = require('assert')
const moment = require('moment')

const url = process.env.atlasURL

const client = new MongoClient(url, {
  useNewUrlParser: true,
  useUnifiedTopology: true
})

client.connect(err => {
  assert.strict.equal(null, err)
  if (err) {
    console.log('Got a problem!')
  } else {
    console.log('Connected successfully to MongoDB')
  }

  const db = client.db('plottingData')

  setInterval(() => {
    const now = moment().format()
    console.log(`Time: ${now}`)
    const data = Math.round(Math.random() * 100)
    console.log(`Data: ${data}`)
    insertData(now, data)
  }, 3500)

  const insertData = (time, data) => {
    // Set the collection
    const collection = db.collection('streamTest')
    // Insert data
    collection.insertOne({
      TimeStamp: time,
      Data: data
    }, function (err, result) {
      assert.strictEqual(err, null)
      assert.strictEqual(1, result.result.n)
      assert.strictEqual(1, result.ops.length)
    })
  }
})

Express Server

The core logic will be served by a simple Express server that serves pages as static assets. The following app.js code creates an HTTP server and binds an instance of socket.io.

const express = require('express')
const path = require('path')
const logger = require('morgan')

const app = express()
const port = 3300

app.use(logger('dev'))

/* chart page options */
const options = {
  root: path.join(__dirname, './views')
}

/* GET chart page. */
app.get('/', (req, res) => {
  res.sendFile('./plotlyChart.html', options, (err) => {
    if (err) {
      console.log(err)
    } else {
      console.log('Chart Sent')
    }
  })
})

const server = require('http').createServer(app)
const io = require('socket.io')(server)
// note that socket is attached to server not app

/**
 * Listen on provided port, on all network interfaces.
 */
server.listen(port, () => console.log('Server listening on port ${port}'))

module.exports = { io } // export socket instance

Socket.io

Socket.io is not a websocket implementation and is incompatible. Socket.io creates a connection over HTTP and then upgrades through a series of steps including polling, to create an event driven, bi-directional communications channel. There is server side code and client (browser) code. Ref: socket.io. Note that when the HTML page is served from the same host providing the socket server, the client code can be included by reference and does not need an HTTP URI.

In app.js we export the io socket. In changeStreamCore we require app.js and namespace io to use within the code.

Change Streams

changeStreamCore.js is required into app.js but since we do need to access any of its functionality, we do not need to assign it to a variable nor export anything from changeStreamCore. Core requires app.js to access the socket as .io

const dotenv = require('dotenv').config()
const assert = require('assert')
const io = require('./app').io

const MongoClient = require('mongodb').MongoClient
const atlasURL = process.env.atlasURL

const client = new MongoClient(atlasURL, {
  useNewUrlParser: true,
  useUnifiedTopology: true
})

client.connect(err => {
  assert.strictEqual(null, err)
  if (err) {
    console.log('We've got a problem')
  } else {
    console.log('Connected successfully to MongoDB')
  }
  const db = client.db('plottingData')
  const collection = db.collection('streamTest')
  const pipeline = {
    $match: {
      operationType: {
        $in: ['insert']
      }
    }
  }

  io.on('connection', socket => {
    console.log('chartPage connected: ${socket.id}')
    socket.on('disconnect', () => {
      console.log('chartPage disconnected!')
    })
  })

  const transmit = packet => {
    io.emit('chartData', packet)
    console.log('packet emitted: ${packet}')
  }

  (() => { // IIFE; add test for cursor available
    console.log('startStream')
    const changeStream = collection.watch([pipeline], {
      fullDocument: 'updateLookup' })
    changeStream.on('change', document => {
      const packet = []
      packet[0] = document.fullDocument.TimeStamp // could parse from object:_id
      packet[1] = document.fullDocument.Data
      transmit(packet)
    })
  }
  )()
})

We have to define what events and data we’re looking for so a pipeline is defined where we look to match the insert event.

The IIFE creates the change stream. Each insert event triggers parsing of the newly created document (returned as: fullDocument: ‘updateLookup’) and extracts the timestamp and data to an array named packet. The transmit function is called to send the packet to the frontend.

Plotly Chart Page

<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <meta http-equiv="X-UA-Compatible" content="ie=edge">
  <title>Charts</title>
  <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/css/bootstrap.min.css" integrity="sha384-ggOyR0iXCbMQv3Xipma34MD+dH/1fQ784/j6cY/iJTQUOhcWr7x9JvoRxT2MZw1T" crossorigin="anonymous">
  <script src="/socket.io/socket.io.js"></script>
  <script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/moment.js/2.24.0/moment.js"></script>
</head>

<script src="https://code.jquery.com/jquery-3.3.1.slim.min.js" integrity="sha384-q8i/X+965DzO0rT7abK41JStQIAqVgRVzpbzo5smXKp4YfRvH+8abtTE1Pi6jizo" crossorigin="anonymous"></script>
<!-- <script src="https://cdnjs.cloudflare.com/ajax/libs/popper.js/1.14.7/umd/popper.min.js" integrity="sha384-UO2eT0CpHqdSJQ6hJty5KVphtPhzWj9WO1clHTMGa3JDZwrnQq4sF86dIHNDz0W1" crossorigin="anonymous"></script> -->
<script src="https://stackpath.bootstrapcdn.com/bootstrap/4.3.1/js/bootstrap.min.js" integrity="sha384-JjSmVgyd0p3pXB1rRibZUAYoIIy6OrQ6VrjIEaFf/nJGzIxFDsf4x0xIM+B07jRM" crossorigin="anonymous"></script>
</body>
</html>

The Plotly.js charting library documentation is at https://plot.ly/javascript/

plotlyChart.html gets the needed libraries and sets up the socket connection. It also imports chartLogic.js where the charting layout and logic is defined. The div “pchart” is the target where the chart will display.

chartLogic

const layout = {
  title: 'Streaming data to Plotly',
  height: 650,
  width: 1000,
  font: {
    family: 'Lato',
    size: 20,
    color: 'rgb(100,150,200)'
  },
  plot_bgcolor: 'rgba(200,255,0,0.1)',
  margin: {
    pad: 10
  },
  xaxis: {
    title: 'Timestamp',
    titlefont: {
      family: 'Verdana, Sans-serif',
      color: 'rgb(100,150,200)',
      size: 18
    },
    type: 'date'
  },
  yaxis: {
    title: 'Data Measurement',
    titlefont: {
      family: 'Verdana, Sans-serif',
      color: 'rgb(100,150,200)',
      size: 18
    },
    type: 'linear',
    range: [0, 100]
  }
}

// Plotly requires an initial trace for animation to work
// Create a couple of x and y coordinates in arrays
const xArray0 = [moment().subtract(10, 'sec').format(), moment().format()]
const yArray0 = [0, 0.2]

let trace0 = {
  x: xArray0,
  y: yArray0,
  type: 'scatter',
  mode: 'lines',
  line: { shape: 'spline' }
}

const data = [trace0]

Plotly.newPlot('pchart', data, layout)

const dataPoints = 15

const socket = io()
socket.on('chartData', (packet) => {
  console.log(`packet.Time: ${packet[0]}`)
  console.log(`packet.Data: ${packet[1]}`)

  if (xArray0.length < dataPoints) {
    xArray0.push(packet[0])
    yArray0.push(packet[1])
  } else {
    xArray0.push(packet[0])
    xArray0.shift()
    yArray0.push(packet[1])
    yArray0.shift()
  }

  trace0 = {
    xArray0,
    yArray0,
    type: 'scatter',
    mode: 'lines+text',
    line: { shape: 'spline' }
    //  line: {simplify: false},
  }

  console.log(`xArray0 after new data: ${xArray0}`) // debug
  console.log(`yArray0 after new data: ${yArray0}`) // debug

  const data = [trace0]

  Plotly.animate('pchart', {
    data,
    traces: [0],
    layout: {
      xaxis: {
        range: [xArray0[0], xArray0[xArray0.length - 1]]
      }
    },
    transition: {
      duration: 2500,
      easing: 'cubic-in-out'
    }
  })
})

After the packet has been shoved into the frontend, this is the logic that tears it apart and builds the chart.

“Const layout” defines the plotly layout including axis titles, fonts and size the chart.

In order to use animation, an initial trace is required from which to interpolate new data. X (using moment.js) and Y arrays containing two datapoints are created and rendered using the layout defined in trace0.

Presently, animation only works with scatter traces. However, either the axis or the trace can be selected for animation. Docs.

The socket ‘listens’ for the ‘chartData’ event coming from the backend and parses the packet to push data into X and Y arrays until the number of datapoints is reached after which a datapoint is shifted from the front of the array and new data is pushed to the back of the array.

After the arrays are updated with a new datapoint, the chart is re-rendered.

Because we are dealing with dynamic timeline data, the X axis does not have a directly quantified number of ticks or range. Instead the data array is built and Plotly computes and renders the new layout.

However, on the Y axis, the initial layout configuration defines a range and this prevents the chart continuously autosizing to the new data.

Plotly.animate is then called to update the chart.

Launching App

To launch the app, open a terminal window in the project root directory and issue the command “node generator.js”. Similarly in another terminal issue “npm start” and you should see some start-up messages including success connecting to Mongodb. Navigate to localhost:3300/plotlyChart.html in the browser and, lo and behold, you should have a chart dynamically rendering. In the terminal running the app, a socket identifier will be logged. If not, to ensure that data is arriving at the frontend, open a console in the browser dev tools to see if the arrays are building.

Wrap up

This code can be adapted to different charting libraries. The way in which the data arrays are built is a little different in other libraries where a single array is usually defined with both X and Y data.

Obviously, the chart type can be changed. For example, a chart defined with a single bar could represent a material quantity in a storage bin.

I found the Plotly.js documentation a bit hard to get around because of the lack of search functionality. The docs are comprehensive but you need to know the term you’re looking for in the indexes. Like many chart libraries, plotly is built upon D3.js and some useful D3 features such as d3.json leak through. Again, takes some digging to find.

Plotly is a comprehensive library serving many use cases. However, because R and Python are supported you can spend quite a bit of time on Stackoverflow, etc, tripping over non-javascript code examples.

I hope that this demo of the integration of mongodb change streams, socket.io and plotly.js provides a useful foundation for your projects.