Strategies for processing industrial IoT data streams

iot data science Big Data
CHRISTIAN EGEBERG
11.02.2020

I’ve been asked to provide the contents of my hour long video presentation about processing of industrial IoT data streams as an article that can be read and browsed at your own pace. Well, I’m happy to oblige…

Introduction

Streaming industrial IoT data can be complicated to process, because it often involves a large amount of data, received in continuous streams that never end. There’s never one single machine to monitor. Always a number of them. Multiple buildings, one or more factories, a fleet of ships, all of an operator’s subsea oil pipelines. That kind of scale.

Industrial IoT is never implemented just for fun. There needs to be real, reasonably short term, economic value involved. Alternatively it needs to be about fulfilling new environmental requirements.

 

 
Water filled basin at an industrial pump test facility
Industrial pump test facility photo by Christian Egeberg

 

Data sources and data rates

You would need a data rate of minimum one measurement per minute from multiple sensors, in order to detect the need for maintenance of pumps and generators for the oil industry. You might have to increase that to every second or so, if you’re working with engines or production equipment with higher rotational speed.

One of the storage solutions I’ve been working on is currently receiving more than 20.000 sensor measurements per hour. 500.000 values per day. From a single smart building equipped with 5400 sensors. It used to be 125.000 sensor measurements per hour, that is 3.000.000 values per day, but we tweaked the sample rates, based on customer needs and requirements. We also worked on local buffering mechanisms with micro batching. That storage mechanism is almost idling now, ready for scale out, enabling thousands of smart buildings to be connected in the near future.

A single surface mounted microphone will deliver between 40.000 and 200.000 samples per second to those who want to explore sound. By then you might have to fast fourier transform the audio, and run machine learning on the dominant frequencies and their amplitudes. A trained machine learning model could potentially run on an edge device close to the microphone, instead of in the cloud.

Also video streams are interesting, given that you’re able to separate anomalies from what’s normal, preferably before you waste lots of bandwidth.

 
Man sitting in front of his microphone equipped laptop, in front of a large pump, doing experimental analysis of audio data
Pump audio experimentation, courtesy of Arundo and Sebastian Gjertsen

Processing on edge devices

IoT data may need to be buffered for multiple weeks, if your sensors and equipment are at sea. They may need to be transferred to the cloud when the ship is in a harbour, or the data may be brought to shore on removable disks. IoT data streams don’t have to be transmitted in real time to provide value. Though, sometimes you’ll want to perform at least parts of the processing on site. For example optimization of fuel consumption for tankers and tug boats, or detection of anomalies or maintenance prediction for subsea oil pumps.

It’s perfectly possible to run trained machine learning algorithms on quite primitive equipment, like relatives of Raspberry PI, or a small rugged PC. Maybe directly on your IoT gateway, that in some cases can be as powerful as any server blade. You also won’t need a lot of machines to be able to deploy a reasonably redundant Kubernetes cluster, even if your architectural choices become much more limited than they would have been with a cloud provider.

You may actually find a local cloud, like Azure Stack, on an oil rig. But I wish you luck with getting certifications and permission to deploy your own software to one of those.

 

 
Two large tank ships side by side in a harbour
Edge processing tank ships, Courtesy of Arundo and Heitor Sousa


Trivial stream processing

Your stream processing gets really simple, if all you need to do is to call an already trained machine learning algorithm.

You provide for receiving all sensor measurements relevant to the algorithm, using sensible data rates for the purpose, and feed that to the model as the data arrives.

You may also want to store all the data as cheaply as possible in HDFS, a data lake, or in BLOB storage, in a way that makes it easy to separate both equipment and days, for the next time you require data to retrain the algorithm.

This type of data stream filtering is easy to implement using Kafka, or Azure Stream Analytics. And you can perfectly well do this in addition to other stream processing.

But most customers of industrial IoT, require something more tangible, in addition to just a callable algorithm that detects something fancy.

They often require dashboards that show how production does in real time, with fancy graphs, multiple dimensions, color coded performance and downtime.

These dashboards may have low practical utility beyond showing them off at board meetings, or talking about them to people you want to impress. But they can on the other hand be extremely hard to implement. Particularly when your customer also wants parameterizable reports that can compare current and historic periods of time.

The rest of this article will be about architectural choices geared towards solving these kinds of tasks.

 

 
Snapshot of a streaming IoT dashboard showing graphs and color coded statuses
Example of a streaming IoT dashboard, courtesy of Arundo and Jon Alm Eriksen

Aggregation

The key to create fast updating charts and reports, is to store the underlying data on a format that can easily be combined into the desired result, with a resolution that doesn’t require too much work per recalculation. These precalculations are typically aggregations. Grafana and Plotly Dash are two very good tools to implement these kinds of user interfaces.

The aggregated precalculations must be stored somewhere. This storage space is often termed Warm Storage, because it provides high availability access to preprocessed data, with granularity suited for presentation purposes. This is opposed to cold storage, which typically stores all the data, in original form, which usually requires a certain amount of time and effort to retrieve and process before use.

If you store the count of values, the minimum value, the maximum value, the sum of values, and the sum of squares of each value, for a set of time intervals. Then you can easily calculate the average, standard deviation, variance, sum, count, minimum and maximum for any combination of the time intervals, using simple formulas. The example uses PostgreSQL / TimescaleDB syntax:


Table data information

Average and standard deviation can be used to display normal distribution bell curves for values within the combined intervals. Thus the aggregates still retain quite a bit of information about the original raw values.

 
 
 
Illustration by M.W. Toews
Illustration by M. W. Toews, based on figure by Jeremy Kemp, on 2005–02–09, CC BY 2.5


Warm storage

As a Data Scientist, you’ll often find that just observing the values that stream by doesn’t give you enough context to perform complex calculations. You may need a cache of recent data, or a place to store partial precalculations, aggregates, and potentially the output of machine learning models, typically for use in dashboards and interactive parameterized reports. I’ll call this data cache warm storage.

Warm storage is often used used by Data Science applications, and algorithms that call machine learning models, in order to quickly retrieve sufficient sensor values and state changes from streaming data. Warm Storage data may typically be used to detect anomalies, predict maintenance, generate warnings, or calculate Key Performance Indicators, that add insight and value to streaming data for customers.

Those KPIs are often aggregated to minute, ten minute, hour or day averages, and stored back into warm storage, with a longer retention period, to facilitate reuse by further calculations. The stored partial aggregates may also be used for presentation purposes, typically by dashboards, or in some cases reports with selectable parameters. Thus optimal retention periods may vary quite a bit between tenants and use cases.

 

Properties of a good Warm Storage

  • Has a flexible way of querying for filtered sets (multiple series) of data
  • Can match and filter hierarchical tag IDs in a flexible manner
  • Has low latency, and is able to retrieve data quickly on demand
  • Supports storage and querying of KPIs and aggregates calculated by data science algorithms, that potentially call machine learning models
  • Exposes ways of building upon previously calculated data
  • Contains only data that has a potential to be actively queried
  • Has a Python wrapper for easy method invocation
  • Behaves similarly when running on an Edge device

An often overlooked topic is that Warm Storage should be opt out by default, not opt in. Every insert into Warm Storage will cost you a little bit of performance, storage space, and memory space for indexes, before it is removed as obsolete. All data that is scanned through, as part of index scans during queries, will also cost you a little bit of io, memory and cpu, every time you run a query. Tenants that have no intention to use Warm Storage should not have their data stored there. Neither should tags that will never be retrieved from Warm Storage. The first rule of warm storage is: Don’t store data you won’t need. Excess data affects the performance of retrieving the data you want, costs more money, and brings you closer to a need for scaling out.

A good Warm Storage should have a Python wrapper, to make it easier to use from Data Science projects. If you’re using Warm Storage in a resource restricted environment, like on an edge device, the API should behave the same as in a cloud environment.

 

Existing stream processing products

Most of the existing stream processors, like Apache Spark, Storm and Flink, are very focused on performing MapReduce in real time on never ending data streams. That’s obviously immensely powerful, and you can do lots of interesting things to data streams that way. But, it’s overkill, and unnecessarily complicated, if all you’re looking for is some aggregations, calculations of a few Key Performance Indicators, and a bunch of simplistic alarm triggers.

The exception seems to be Kafka Streams, KSQL for Kafka, and Apache Samza. All of those let you consume Kafka topics, in order to send derived streams back into Kafka, in the shape of new topics or tables. Kafka, which primarily is sort of a cloud sized message bus on steroids, is interesting in many ways, but also quite large and complex. It actually feels a bit daunting to install, configure and use Kafka for the first time. You just know there’s a huge learning curve ahead, before you’ll be able to achieve a mature solution. Additionally, Kafka wouldn’t be my first choice to store large amounts of sensor data for a long time, even though RocksDB, the database engine for Kafka KTables is a good one. Confluent provides a managed Kafka cloud service, which works with Google Cloud, Azure, and Amazon Web Services.

Let’s keep Kafka, and particularly KSQL with KTables, in the back of our minds. In the meantime, let’s have a look at what time series databases can offer us.

 

InfluxDB highlights

InfluxDB is open source, and probably the fastest time series database on the market. It’s capable of running on reasonably powered edge devices. InfluxDB uses a text based format called line protocol for adding data, but has a SQL like query language.

InfluxDB utilizes a completely static data structure, centered around the timestamp, with attached value fields and metadata strings called a tag set. The metadata are indexed, and you can search using both metadata and time ranges within each time series.

InfluxDB doesn’t support updates, but you can delete specific data between two timestamps, or drop an entire time series. You can also overwrite the value fields, if you take care to use exactly the same timestamp and tag set. That’s usually almost as good as being able to update.

InfluxDB performance is closely coupled to cardinality, which is all about how many metadata tags that need to be indexed, and that are scanned during queries. All queries must be bounded within a time range, to avoid performance crises. InfluxDB supports automatic aggregation and calculation of derived values, by means of so called continuous queries. Deletion of old data can be handled automatically by retention policies. Both of these mechanisms cost a little bit of performance when they are used.

InfluxData have implemented their own database engine, written in Go. What scares me a bit, is the number of stories on support forums, about people who wonder how they lost their InfluxDB data. There is a managed InfluxDB service available on cloud.influxdata.com.

 

TimescaleDB highlights

TimescaleDB is an extension to the PostgreSQL relational database. Both are open source. TimescaleDB splits tables into partitions called chunks, based on a predefined time interval, or automatically, as time series data are written to each hyper table. These chunks are really tables themselves, and have individual indexes. Only a few of these smaller indexes need to stay in memory most of the time. This as good as eliminates the major problem with utilizing relational databases for time series data, provided that all your queries confine themselves to reading from a low number of chunks, and that each chunk contains no more than 25 to 50 million rows.

Queries, inserts, updates and deletes across chunks are handled transparently, with standard PostgreSQL syntax. Dropping entire chunks at a time is a very effective way of implementing retention limits, to get rid of expired data.

It’s necessary to use connection pooling, if you want to use TimescaleDB from a REST API. You also need to control the connection timeout and the statement timeouts. You can write only to the master node, when using a high availability PostgreSQL cluster like for instance Patroni, but you can scale reading, by sending queries to all the replicated slave nodes.

TimescaleDB exists as a managed service on timescale.com/cloud, and as managed PostgreSQL for Azure. You can scale both TimescaleDB and InfluxDB by sending unrelated time series data, for instance belonging to different customers, to separate database clusters. This kind of scale out is commonly called sharding.

TimescaleDB and InfluxDB could both handle long term storage and aggregate storage responsibilities in low resource environments, like when your customer doesn’t want to use cloud services, or when you want to analyze sensor data close to the relevant equipment, for connectivity or reduced latency reasons. This is commonly called edge computing or running on edge devices.

 

Cassandra highlights

Cassandra is a distributed cloud storage service based on Google BigTable technology. It needs to run on multiple machines, and isn’t capable of running on single node edge devices in a sensible way. Cassandra has many features that make it particularly well suited for handling time series data.

Cassandra utilizes a masterless architecture with no single point of failure, and scales almost linearly to a large number of machines, VMs or Pods, as the need for storage space grows. Data is stored compressed and redundantly, with multiple copies across several nodes. You should avoid using Cassandra on top of redundant disk volumes. That would simply increase the storage requirements, and lower the performance. Cassandra is optimized for write performance over query speed, updatability and deletes.

The query language is a lot like SQL, and supports complex data structures like lists, sets, maps, json objects, and combinations of these as column types. Cassandra is not a relational database, and doesn’t use joins. All Cassandra queries and commands should be performed asynchronously. Data that belongs together is partitioned on the same node, to improve query performance. It might be a good idea to store multiple copies of data, instead of creating secondary indexes, whenever you need to retrieve that same data using different access patterns.

On paper, hBase and OpenTSDB, a dedicated time series database built on top of hBase, seem like good alternatives to Cassandra. hBase is however more complex to configure, maintain, and code for than Cassandra. Another interesting alternative is ScyllaDB, which claims to be a self optimizing Cassandra, reimplemented in C++, in order to achieve performance gains. ScyllaDB is rather new, and doesn’t yet have the huge community support that Cassandra enjoys.

 

Storage strategies

Storage is cheap. Store IoT data the way you want to read it. Feel free to store multiple copies in alternate ways. That’s called query driven design. Don’t be afraid to utilize multiple storage services for different purposes. Martin Fowler eloquently describes this as polyglot persistence.

 
Polyglot persistence
Polyglot persistence illustration by Martin Fowler

Joins and transactions are as good as irrelevant for IoT data streams. They don’t change after they’ve been sent, and joins cost performance. You may also need multiple aggregation intervals for different purposes. Preferably aggregate each granularity lazily, after you’re comfortable that the underlying data have arrived, and not every single time relevant data arrives.

Utilizing mechanisms like time to live indexes, or time to live tombstoning, can affect performance surprisingly easily. Try to find a way to utilize deletion of chunks or other partitions, whenever your storage mechanism supports that. It can be very inefficient to delete expired data by searching through or recompressing huge amounts of data. Sometimes it’s far better to simply leave expired data where they are, rather than spending time and resources cleaning up. It may in some cases also give you cheaper cloud provider bills.

You can implement your own partitioning, by writing data to separate tables, depending on timestamps within predefined intervals, and then drop the oldest tables when all their data have expired. The disadvantage is that you must create and drop tables dynamically, and may need to read from more than one of the tables at a time. This could be a way to implement a short term buffer efficiently in Cassandra, which is able to handle several millions of inserts per second, when scaled out.

 

Native types and index sizes

Always strive to use native database types for your index columns. Native types are often much more compact than string representations, and are scanned using pure binary comparisons. Definitely never using case insensitive or culture dependent collations, which can happen if you’re not careful about how you define string columns. A native UUID is stored in 16 bytes, and can be compared as two 64 bit integers. A UUID string is 36 bytes long, plus the length marker, and is hopefully compared in a case sensitive ordinal way. A native timestamp with time zone is stored in 8 bytes, and can be compared as one 64 bit integer.

 

 
Native type storage size illustration
Native type storage size illustration by Christian Egeberg


This seems like a small thing, and I know we’re not supposed to pre optimize, but it’s really about the innermost of inner loops. An industrial IoT storage solution surely performs billions of index comparisons every day. The indexes are read from disk into memory cache, often called the buffer pool, where the index blocks are traversed, and hopefully remain until the next time each index block needs to be checked, unless another index block needed the same memory space more urgently. Eventually you can get buffer pool thrashing, where index blocks are read from storage into memory over and over, because the buffer pool is too small for the most frequently used index blocks.

You can expand the size of the buffer pool, until it’s constrained by the memory installed in the machine, or maybe more typically allocated to the virtual machine. But another thing that really helps is when the indexed column values are compact, and simple to compare, for every row. This maps directly to less index data to read from storage, and room for more index data in the same buffer pool. You should also strive to avoid large index range scans, by writing your queries carefully, and by storing data as you want to access it, optionally utilizing multiple copies.

 

Micro batching

Database systems like Cassandra support batching of commands on an API and protocol level, and can execute batches asynchronously on multiple nodes. That’s particularly effective when the entire batch uses the same partition key. But batching can be supported even using SQL, when you know the syntax. Utilize this to write multiple values using the same command, so that the number of network roundtrips is reduced. You’ll spend much less time waiting for network latency. The example uses PostgreSQL / TimescaleDB syntax:

Table data information

 

I encourage the use of arrays, when writing your own REST APIs. Use arrays actively for inserting multiple values, and for retrieving sets of series in the same method call. This makes life easier for data scientists, and reduce the aggregated network latency when using the API. Reducing latency for things you do often, improves performance.

You should strive to send and receive multiple elements per call, when you’re using queues for IoT data. Most queue implementations support this, and utilizing it can reduce the need for scale out significantly. Handling each sensor measurement typically represents very little processing. Thus you might as well receive a number of them at a time, to avoid the aggregated network latency of fetching them one by one.

 

Micro batching performance

Utilizing micro batching all the way through, including synchronizing sample timestamps, or buffering upstream for a few seconds, can have a profound impact on performance. I’ve seen 10.000 sample values written to the short term buffer, with a full copy to the long term storage, registering 4.000 new scheduled aggregations, and updating 12.000 statistics log entries complete in 3 seconds, as a single micro batch.

It’s likely that handling each sample value would take around 100 milliseconds, if those same 10.000 sample values were to be processed individually, on the same storage system. Some of it would definitely be handled asynchronously, but we could easily end up at around 500 seconds total. More than 8 minutes of processing time, for those same 10.000 sample values that micro batching handled in 3 seconds.

You could certainly scale out as an alternative, and process more in parallel. But scale out costs money, in the shape of larger bills from your cloud provider. Not utilizing micro batching may at one point limit the total throughput you’ll be able to achieve, and the difference between using it and not can be dramatic. Besides, you can scale out while using micro batching too.

 
 
 
Chart graph
The effect of enabling micro batching on the same size stream for a resource unit limited CosmosDB

 

Micro batching exceptions

The biggest problem with micro batching is the fact that your micro batch may fail as a unit, even when it contains many perfectly ok sampled values. You need to have good exception handling and logging upstream, including the ability to queue or buffer data for a while, in order to counteract this. Kafka or an IoT Hub that supports MQTT or AMQP may be close to perfect for this kind of queuing.

Be prepared to handle asynchronous retries with exponential backoff, to avoid retransmission congestion, if your storage solution can respond with “server is not available” or “server is overloaded” type exceptions. This is particularly common with storage services that bill you for resource units, for instance CosmosDB. You might have to smooth resource usage spikes using a FIFO queue, in order to reduce resource unit costs. It all depends on selecting your storage services carefully.

Validation issues with any specific sample value and it’s metadata should ideally never fail the the entire micro batch. It should log a detailed description of what went wrong, including the relevant series id, timestamp and sample value, to a log that is constantly monitored. Then the problematic sample data may either be sanitized or discarded, depending on the severity of the problem. Processing of the rest of the micro batch should continue as if nothing extraordinary happened.

We had an issue where some sensors started sending the string NaN, not a number, instead of a floating point voltage. That caused the POST body to be incompatible with the JSON type schema. Our validation code never even ran, and we had to look for the problem in the failed JSON body of the upstream site service. We found it on line 86.442 of a 107.636 line long seemingly valid JSON snippet. It took us around 15 minutes to spot that. Detailed error messages are pure gold. That micro batch was over 2.5 megabytes, and completed in two seconds when the NaNs were replaced with null. We’ve since changed that JSON type to string, and do our own validation to check for invalid numbers. But there can easily be more problems similar to that.

 

Warm storage architecture

This is a quite typical architecture for a warm storage. A small number of persistent stores, with different access patterns. A few processes that run continuously. One or more APIs that implement REST methods that can be invoked using https.

 
Warm storage architecture
Warm storage architecture diagram by Christian Egeberg


You authenticate to receive time limited access tokens that provide a set of basic permissions. You utilize https keep alive to achieve persistent connections, and reduce the effort spent performing cryptographic handshakes.

Communication with the storage services should utilize SSL, and you’ll need to provide certificates for both the database servers and the API implementations. Whether you’ll need to configure encryption of storage service data files or not, is dependent on how paranoid you or your customers are. It might cost you quite a bit of performance

 

Managed storage services

It is advantageous to choose managed storage services, whenever they are available. That is because someone else takes responsibility for actively managing the service, like setting it up with a sensible initial configuration, upgrading it with security patches, providing redundancy and resilience against failure, scaling the service without losing data, providing reliable backup options, and tedious stuff like that. This obviously costs money, and you might choose to take the responsibility yourself, in exchange for lower monthly cloud provider expenses. You should probably choose a fully tested Operator Helm chart to manage your Pods, if you choose to deploy your storage services on top of Kubernetes. Google “awesome kubernetes operators in the wild” for inspiration.

Example of storage service complexity

We’ve been using a high availability PostgreSQL cluster called Patroni, deployed on top of Kubernetes on OpenStack, using the Zalando Postgres Operator. We have a pod with one master PostgreSQL node, which does both writes and reads, and three pods with slave PostgreSQL nodes, which replicate the master, but can be used for reads. These pods are separated between two data centers, after an enlightening incident a while back, where an entire data center lost power. Most of our services went down with it, even though they were supposed to be redundant. Did you realize that you might need to use a georedundant docker image repository for Kubernetes to be able to start new pods?

Anyway. These Patroni PostgreSQL nodes utilize ETCD and the RAFT protocol for leader election and failover, whenever the Patroni health monitor decides that the current leader isn’t sufficiently healthy. Then one of the slaves will be elected as the new leader, and the old leader will be demoted to a slave. This happens once in a while. The PostgreSQL client libraries don’t really know about leaders and failovers, so whenever we see an exception that “the current PostgreSQL session is read only”, we’ll terminate the process immediately, so that Kubernetes restarts the pod, and we can get new connections to the current leader, utilizing Kubernetes service selectors. That works.

The old leader, however, wasn’t demoted until a few tenths of a second after the new leader was elected, and for those tenths of a second, the old leader happily received streaming IoT data through the existing open connections, which sometimes creates an alternate timeline, which prevents the old leader from joining the new leader as a slave. One solution is to copy the additional data to the new leader, and reinit the old leader. Handling stuff like this transparently is sort of what you’re paying for when you utilize a managed cloud service. When you host your own storage solution, it may become your or rather my responsibility to fix it.

 

The short term buffer

The short term buffer is used to calculate aggregates from raw data, and as a data source for any Key Performance Indicators or derived data calculations that may be needed. The short term buffer will contain every single measure inserted through the storage API, but truncated to a retention period typically between 48 hours and 72 hours, depending on how many aggregations that are scheduled, but not yet processed.

Measures with identical timestamps down to the microsecond level, belonging to the same time series, will be treated as duplicates by the short term buffer. Thus only one of the simultaneous measures will be stored and aggregated. This duplicate removal may be handy when working with the quite common “at least once” delivery guarantees of some IoT systems. Measures arriving outside of the current retention period might be written to long term storage if required, but will be too late for the short term buffer, will not be aggregated, and not be included in any performance calculations.

 

Access patterns and polyglot persistence

The access pattern for the short term buffer is very write heavy, with quite a few sequential reads limited by time ranges. All short term buffer data expires, and potentially needs to be truncated on at least a daily basis. This access pattern doesn’t fit Cassandra or hBase very well, because of the transient nature of the data. Queues are commonly described as a Cassandra antipattern, unless you can delete data by truncating entire tables. The short term buffer access pattern fits much better with TimescaleDB, which extends the PostgreSQL relational database with automatic time partitioning of tables. The retention policies of InfluxDB would also match the short term buffer well.

There might be other good choices too, but regular relational sql databases, and quite a few nosql databases like MongoDB, would soon experience performance problems because of indexes that grow to unmanagable sizes, and would need to be swapped in and out of memory a lot.

We could utilize multiple side by side instances of TimescaleDB or InfluxDB, to handle the short term buffers and aggregation schedule queues for different independent customers, or for nonrelated groups of equipment. This kind of scale out is commonly called sharding

 

The long term storage

The long term storage database is for storing raw data that can’t or shouldn’t be aggregated, for example state changes, or alarms and their contexts, or structured json documents. It’s a good idea not to store metadata like time series properties, or asset hierarchy information, in the long term storage directly. Mostly because it grows huge, and may become cumbersome to maintain over time. You want as few updates as possible.

Thus, there is an advantage to using GUIDs for identifying time series in the long term storage. Then you let the time series properties store handle asset hierarchy searches and metadata versus GUID mapping. This also helps with anonymizing data in the long term storage, which is a good thing, and helps separate unrelated data.

 

Access patterns and polyglot persistence

The long term storage database will see a huge amount of writes, some sequential reads, typically caused by dashboards, user interfaces, or reporting. There might not be a need for updates, except for deleting expired data, which might represent a significant performance overhead for some database systems. This access pattern fits very well with large scale distributed storage services like Cassandra, hBase or OpenTSDB. TimescaleDB or InfluxDB might be better choices if you are restricted to an edge device or a low resource environment, and know that the amount of data that needs to be processed and stored will remain managable.

The long term storage database should not be used to store high frequency time series, as this over time will consume a lot of storage space, and represents data that might be difficult to process in a close to real time scenario, which is what the storage architecture is designed for.

If you want batch processing, or want to train models on huge amounts of raw data, it would be better to filter the streaming data in a sensible way, creating a retrieval path that lets you easily get to the right block of data, and then append it into a cheap redundant distributed file system, or in blob storage, rather than clogging up a long term storage database that simultaneously attends to real time responsibilities. This type of stream filtering is very easy to implement with Kafka or Azure Stream Analytics, and can easily be performed in addition to warm storage.

 

The aggregate storage

The aggregate storage is for storing numeric aggregations. Each interval stores the count, minimum, maximum, sum, and sum of squares, for each measure in the interval. From these we can additionally infer average, standard deviance, and variance, for every multiple of the base interval duration. Just like with long term storage, it’s a good idea not to store metadata like time series properties, or asset hierarchy information, in the aggregate storage directly. GUIDs for time series IDs keeps the number of updates low, and separates unrelated data in a good way.

Sensible interval durations are typically between 1 minute and 24 hours. Interval durations must be selected with care for each type of time series, as they represent a compromise between precision and storage size, which is basically what decides how fast you will be able to retrieve or process a result, or display a relevant graph.

 

Aggregate intervals

For example aggregating room temperatures in office buildings to 24 hour intervals is too coarse, because climate control systems usually shut down at night, and commonly during evenings. One hour intervals are probably just right, because that gives you 8 or 9 separate aggregates during working hours, and each interval includes average temperature, maximums, standard deviation and variance. One minute intervals would on the other hand represent wasted storage space, and unnecessary processing overhead, because you constantly need to retrieve a lot of them, just to average them after retrieval.

The main disadvantage of going above 24 hours, is that your intervals will start to mix workdays and holidays, and in some cases, a few of your intervals may become differently sized, for instance around new year’s day, or at the end of months. Inconsistencies like that will increase the variance of the aggregates, and you lose precision. It’s usually a better solution to keep each 24 hour interval separate, and rather average them each time you need statistics for a larger interval.

Sometimes it makes sense to store two base intervals separately. Say you aggregate the power consumption of a climate control system to 10 minute intervals, in order to facilitate day to day comparison graphs. That translates to 144 combinable aggregates per day, 1.008 per week, and 52.560 per year. You would have to retrieve and combine 262.800 aggregates for each climate control system, in order to show a graph that compares power consumption for the last five years. But if you store aggregates for an additional 24 hour base interval, you would only need to retrieve and combine 1.825 aggregates for those five years. That makes a huge performance improvement, and your end user won’t have to wait so long to get his results.

Access patterns and polyglot persistence

The aggregate storage database will have an access pattern quite similar to the long term storage database. Dominated by writes, but with more sequential reads, within time ranges. It should probably be stored in the same data storage solution as the long term storage database. We’ll probably need to use client code in the API in order to combine aggregates, because not all database systems are able to group sums over time intervals.

 

 
Aggregate storage usage example
Aggregate storage usage example, courtesy of PowerTech Engineering


The aggregation schedule

The aggregation schedule helps to implement delayed, or lazy, computation of aggregates. It appears to be part of the Internet’s nature that IoT data can arrive delayed, in the wrong order, sometimes even with duplicate blocks of data, if a tcp/ip packet containing a confirmation incidentally never returns to the original sender. This seems surprisingly common when sending IoT data out of China, even when using SSL. Although common messaging protocols such as MQTT and AMQP both define exactly once reception, not all implementations allow you to enable this. Possibly because of performance considerations.

 

Lazy aggregation

Anyway. The gist is that every time a time series value arrives, with a timestamp within the aggregation interval, it’s likely that a similar value may arrive soon. This is reinforced by the fact that tcp/ip itself is packet based, and that protocols such as MQTT and AMQP, and we ourselves, prefer micro batching. Therefore, we calculate a new delayed aggregation time, which also contains a random factor, so that the recalculations are spread slightly out, within the near future. The aggregation is recalculated from the values stored in the short term buffer, after the aggregation time has passed. This eliminates any duplicates. The purpose of this is to minimize the number of times we recalculate each aggregate interval. We could easily burn a lot of resources by doing this too often.

It’s a conscious decision to never calculate the aggregates until a little after the aggregate intervals have ended. You can easily retrieve the raw data from the short term buffer whenever you want, to access fresh data before the lazy aggregation intervals have passed.

 
 
 
Lazy aggregation chart
Lazy aggregation delay example by Christian Egeberg


Access patterns and polyglot persistence

The access pattern for the aggregation schedule queue is very update heavy. There are a few inserts and deletes as well. The queue might grow uncomfortably large, if the aggregator process isn’t able to keep up with the number of scheduled aggregations. The queue length should be monitored over time, as a measure of the storage system’s health. Consider reducing sample rates rather than relying on lots of aggregations.

The aggregation schedule queue should work well with any reasonably fast relational or nosql database. It doesn’t fit Cassandra or hBase, which disagree with queues, because of all the updates and deletes. An interesting thought experiment is to implement the aggregation schedule as a sorted set in Redis. That would require multiple Redis instances with failover, because Redis isn’t all that good at continuously persisting state to disk.

Aggregation monitoring

I love being able to query queue lengths, worker task execution durations, information messages and exceptions from a database as part of automated health monitoring, which otherwise behaves much like a frequently running small, but full featured integration test. But beware of database connectivity exceptions, and other critical server states like full disks. You might need a persistent queue just to be able to get exceptions like that into the database, when it reaches operational status again. It might be a good idea to use a fully independent log analytics service instead of your regular production database.

Aggregation alternatives

It would be possible to update the aggregates directly, using upsert statements, without reading from the short term buffer, if we were certain that we would never receive duplicates. With PostgreSQL / TimescaleDB that might look like this:

 Table data information

Cassandra and hBase don’t allow referring to previous values when performing upserts. Thus the aggregates would need to be calculated in full, and not incremented on these platforms. We could have used continuous queries and retention policies to implement aggregation with InfluxDB. We could have implemented aggregations using streaming API or KSQL, if we were using Kafka. This would have meant creating a huge amount of tables, used a lot of computing power, and required a lot of memory to handle memtables. It would have worked, could have become slow as the aggregate tables continue to grow, and we might have needed to scale Kafka out quite a bit more than a solution based on Cassandra.

 

Table data information

 

The time series properties

The property store is responsible for organizing the equipment properties, which are usually a hierarchy, and typically include, for example, physical location, factory, building, vessel, floor, room, owner, department, equipment type, equipment ID, sensor type, sensor ID, or other similar properties. Time series properties such as ID used by the PLC, warm storage GUID, aggregation intervals, and storage lifetimes also belong here.

The property store must support search and navigation, such as finding all warm storage GUIDs that belong to rotary speed sensors of spinning machines in a specific factory, so that you as a next step can retrieve historic data from aggregate storage. Also the other way around, such as looking up the building, floor and room for these warm storage GUIDs.

Some customers have a very strong relationship with this information, and want to implement GUID mapping and searching on top of their own existing microservices, which already organize equipment properties. Other customers may be happy to transfer a set of properties they feel is adequate, which you can import into a simple property store, which may offer API methods for importing additional data.

What is entirely certain, is that all customers have different equipment property hierarchies, and that they are never fully defined at the time the data starts flowing. Then it feels very good to have a central place to maintain the data, so you won’t have to update the primary key to 101 million values already in the long term storage and in the aggregate storage. You may want to send aggregation intervals with the data, or have rules for detecting them, so that the data is stored correctly, even if the property store does not contain information about the time series in advance. Then you can define the missing properties in hindsight, without losing data.

 
 
Time series properties illustration
Time series properties illustration, courtesy of PowerTech Engineering


API components

The API methods listed here are not required for all warm storage instances. You won’t need Data Scientist oriented retrieval methods, if your storage needs all revolve around aggregation, and displaying graphs of historical data. So consider this a mixed wishlist of API methods that might go well with varying types of warm storage solutions.

Write API

  • Insert/Update micro batch
  • Set/Delete lookup values
  • Truncate short term buffer
  • Inquire health status

Search API

  • Find GUIDs from properties
  • Find properties from GUIDs
  • Register properties
  • Modify properties

Read API

  • Get short term buffer values for GUIDs / last seen
  • Head/Tail of short term buffer
  • Align buffer GUIDs values to interval / forward fill
  • Aggregate functions and percentiles for buffer GUIDs
  • Histogram/Buckets for buffer GUIDs
  • Get long term storage values for GUIDs / last seen
  • Durations between long term storage GUIDs values
  • Get aggregates for GUIDs
  • Combine aggregates for GUIDs
  • Tumbling/Hopping combined aggregate windows
  • Graph data retrieval helpers
  • Get lookup values (from key value store)
  • Authentication helper
  • Token renewal helper
  • Should have a Python wrapper library

The insert/update method is meant to be used for both ingesting sensor data into warm storage, and for adding calculated values like Key Performance Indicators from code. In general, the larger micro batch you give the insert method, the better it performs, until the memory requirements for encoding and decoding the JSON body starts to give you problems. That shouldn’t happen before you’re processing tens of thousands of tag items per call. It might make sense to use a queue, Kafka, or an internet facing IoT Hub, MQTT server or AMQP server as a buffer in front of the insert API method.

The lookup values related methods refer to a largely optional key value store.

The search API methods are all about mapping metadata properies to time series GUIDs, and back again, plus methods for making sure the metadata can stay current.

The main purpose of warm storage is to be able to quickly retrieve the data you need, at close to the minimum resolution you need them. Note that all the query related API endpoints are set oriented, using an array of time series GUIDs as one of the parameters. I guess some data scientists might have a near infinite list of magic API methods they would like to see available here. Unfortunately, this is data engineering, and we’ll leave the magic for the data scientists to implement.

 

The aggregator process

The aggregator process is idling, while waiting to reserve the first free aggregation time from the aggregation schedule. Any other aggregator processes running simultaneously will stay away, until they acquire their own reservation, which can be when older reservations expire, much like when you try to purchase a concert ticket at Ticketmaster. The aggregation is performed by reading time series values from the short term buffer, writing the sums to aggregate storage, and then deleting the aggregation time, together with the reservation. Then rinse and repeat.

However. Once you have a timer driven process capable of performing aggregations, you can relatively easy get it to do more. For example perform simple calculations, or detect and notify of exceeded limit values. All you need is a math expression parser, and a place to store details about the expressions to be calculated. Then you’ll have a Rule engine.

 

The rule engine

We have chosen to use json objects to define rules, and store them in the same database as the short term buffer. There is an ID, a name, a description, and an enable / disable flag stored with each rule. There will be API methods for defining, modifying and deleting rules, and eventually a web frontend to edit them interactively.

Our rules consist of five main sections for: common time intervals, inputs, mathematical expressions, outputs and alert recipients.

 

Rule engine time intervals

A rule must as a bare minimum have a trigger interval that tells how often the rule should be executed:

Table data information

It may be appropriate to trigger a rule when receiving a specific valueType. In this case, the trigger interval should be 0 minutes.

 

Rule engine inputs

The rule must also have a list of inputs, where each input element acts as a named parameter for the mathematical expressions. Each input must have: a set of time series GUIDs, one or more time interval windows, and an aggregate function:

Table data information

 

The simplest time window is a single interval, with specified duration, which ends at the time the rule is specified to run. Time series are defined by a selector, which specifies filters for a number of properties in the property store. The property store queries execute when the rules are read, when starting the aggregator process. The queries result in sets of time series GUIDs and connector links that are retained until whenever the rules are reloaded.

 

Rule engine connector links

A time series selector can contain connector links. Named wildcards, which are used to connect different inputs and outputs that belong together. The example uses the {serviceLocation} link to connect the sensors belonging to a pump and two generators, which make up a system. The measurement values ​​within this system are complementary, and are included in joint calculations and comparisons.

The connector link value must exist for all time series selectors it is used with. If, for example, there is a {serviceLocation} with only a few generators but no pump, this {serviceLocation} will be excluded from the calculations.

 

Rule engine expressions

The mathematical expressions are calculated in the same order in which they are defined, and can by name refer to the time the rule is specified to run, each individual input, and already calculated expressions: 

 

Table data information

The Warm storage solution I’m working on now runs on .Net Core and Kubernetes. The selected mathematical expression parser, nreco lambdaParser, utilizes .Net, supports all native data types and expressions, can create and use .Net standard library objects, and supports Linq to objects. Incredibly powerful. Still, it’s safe.

The rule expressions to be parsed, are defined in json objects stored in a local database, and are never submitted as part of IoT sensor data. All metrics are parsed as json, and treated as variables and named parameters, never as code. Therefore, no C# IoT value injection attack can occur. It’s possible to run fancy code from the rules, but they require privileged access to change, and the rules themselves are run utilizing an unprivileged local user in a container on Kubernetes.

Some expression names have special significance. For instance alert, which will trigger the sending of notifications, when the expression is evaluated to true. alertMessage allows you to specify the message sent to the alert recipients. Message alerts can refer to connector links, which will be replaced with relevant values ​​before messages are sent.

Rule engine outputs

The output section consists of a list of outputs. Each output can store a single named mathematical expression to short term storage via a time series selector, which can refer to connection links. This means that the time series GUIDs must exist, and must be retrieved from the property store in advance, if anything is to be output. valueType must also be defined, in order to write to short term storage.

Table data information

If you specify expression to be “all”, then the specified run time, all the inputs, and all the calculated mathematical expressions will be written to short term storage as a json object.

You can specify the interval and timeToLive, to aggregate a numeric output in the usual way. If interval is specified as the number 0, the output will be written to long term storage in addition to short term storage.

This can be combined with specifying a condition, which is a mathematical expression that must be evaluated to true for a value to be output.

Rule engine alert recipients

The alert recipients section consists of a list of recipients, and the type of notification they should receive. Usually email, but other integrations should be relatively easy to implement. I would like to see If This Then That and Node Red.

Table data information

Notifications are sent to a message queue, with a unique topic for each type. Thus message broadcasts will never block the aggregator process.

It is also possible to specify a mute interval, which hopefully allows a minimum of peace and quiet between notifications.

KPI calculation inputs

An input can have multiple time interval windows:

Table data information

In this example, all inputs have 10 separate 1 minute window intervals, with 1 minute between each. This represents 10 tumbling windows. The first starts 11 minutes ago. The last 1 minute ago, offset by windowLag, which is set to 1 minute. The inputs are thus retrieved as arrays with 10 aggregated values in each array. It’s possible to override all of these window parameters for each input, and get different sized arrays, but in this example the window parameters are the same for all inputs.

KPI calculation input access patterns

Queries used to obtain values from short term storage are combined below the surface into a single larger query, using union all, which reduces the number of network delays, or latency. The queries are performed asynchronously.

If your short term buffer is on a mirrored TimescaleDB cluster, retrieving rule inputs will benefit from utilizing all the slave nodes, which are otherwise just replicating the master. You can also scale the short term buffer and aggregation schedule horizontally by sharding. Dedicating multiple instances to independent customers or equipment. You can use the same long term storage and aggregate storage for all of them, as Cassandra will continue to scale linearly up to a large number of nodes.

It would be feasible to retrieve rule inputs from long term storage and aggregate storage as well, at least when you’re not running the rule engine on an edge device, or in a limited resource environment. Even then, you might have access to a few months worth of carefully selected events, state changes and aggregates.

KPI calculation expressions

It’s almost magic, to have a parser that supports Linq and C# syntax, when your mathematical expressions have arrays of floating numbers as input, like in this example:

Table data information

If your inputs fetch raw data instead of aggregates, the relevant inputs will be arrays of objects. Be sure to have a short time window interval, and control the number of objects, by specifying limit for these inputs.

KPI calculation machine learning models

Here, we call a trained machine learning algorithm, rather than comparing to limit values. We feed the model time aligned historic values ​​and tumbling window averages, sourced from a highly scalable data stream. That’s pretty cool. The machine learning model is deployed to a Kubernetes pod, in the form of a container, and receives http post requests on port 5337. We utilize a Kubernetes service selector to find the correct host address. The rule engine compounds the specified run time, all inputs, and all rule expressions calculated so far, then transmits them asynchronously as a json post body. When the model responds, the response is parsed as json, and stored as if it were the result of any mathematical expression. The structure of the response object is entirely up to the model.

KPI calculation monitoring

If an exception occurs during the rule calculations, this will be logged, both to the error output, and to a statistics table in the short term storage. This statistics table will contain information about the number of runs, the number of error situations, the number of timeouts, the last error message, as well as the amount of time it took to retrieve inputs, and calculate expressions, with min, max, average and sum per day. The rule statistics table says a lot about the health status of the warm storage solution, and should be monitored. It’s important to detect and change rules that take too long, or run too often. The rule engine will have a configurable timeout, for how long a rule can be allowed to run. Rules can be deactivated automatically, as a result of violating this.

KPI calculation outputs

Output works as previously described. Here we see an example of writing all inputs, intermediate calculations, and the result from the machine learning model to long term storage, as json, each time the model detects a deviation. Hopefully it won’t be very often:

Table data information


Rule engine conclusion

You may achieve better control, and improve performance, by writing dedicated Python applications to calculate value-added information, and detect data stream discrepancies. But rule engines are clearly capable of performing many of the tasks. Even quite effortlessly, provided you are careful about the performance aspects of what you ask for. Using a rule engine like this, sort of becomes the script kiddie variant of data science for streaming.

Thanks for reading

It makes me very happy, if you found this article sufficiently interesting to read through it all. Consider clicking the clap button a couple of times to let me know that you did.

Don’t hesitate to contact me using the response section, if you have questions. Please contact Knowit Objectnet in Oslo, Norway if you know of any related project leads, or want to hire me as a developer or consultant.

If you by any chance enjoy low light photography, please have a look at my photos at 500px.com/nerdcissus.

Läs fler av våra blogginlägg

Flere innlegg