Time Series(A Collection of Observations)
A time series is an ordered sequence of values(data points) made over a time interval. They can be adopted in any domain that needs temporal measurements.
Examples of time series are:
- Usage of specific words or terms in a newspaper over time
- Minimum wage year-by-year
- Daily changes in stock prices
- Product purchases month-by-month
- Climate change
Many time series system face challenges with storage, since a dataset can grow too large very quickly. Another aspect of a time series is that as time goes by, the smallest granularities lose their values.
In this chapter, we will implement a time series in Redis using the Strings, Hash, Sorted Set and HyperLogLog data types.
Building the Foundation
In this section, we will demonstrate how to create a simple time series library using Redis Strings.
The solution will be able to save an event that happened at a given timestamp with a method called insert
.
It will also provide a method called fetch
to fetch values within a range of timestamps.
The solution supports multiple granularities: day, hour, minute, and second. For instance, if an event happens on date 01/01/2015 at 00:00:00(represented by the timestamp 1420070400), the following Redis keys will be incremented(one key per granularity):
- events:1sec:1420070400
- events:1min:1420070400
- events:1hour:1420070400
- events:1day:1420070400
All events are grouped by granularities, which means that an event that happened at 02:04:01 will be save with an event that happened at 02:04:02 - both happened at the same minute. The same grouping rules apply to the hour and day granularity. Define constants used in the example.
1/**
2 * Define time units
3 */
4@RequiredArgsConstructor
5public enum Unit {
6 second(1),
7 minute(60),
8 hour(60 * 60),
9 day(24 * 60 * 60),
10 ;
11 private final long duration; // in seconds
12}
13
14/**
15 * Define various granularities and its properties
16 */
17@RequiredArgsConstructor
18public enum Granularity {
19 perSecond("1sec", Unit.hour.duration * 2, Unit.second.duration),
20 perMinute("1min", Unit.day.duration * 7, Unit.minute.duration),
21 perHour("1hour", Unit.day.duration * 60, Unit.hour.duration),
22 perDay("1day", null, Unit.day.duration),
23 ;
24
25 private final String name;
26 private final Long ttl;
27 private final long duration;
28}
Define class TimeSeries
with follow methods:
insert
: to register an event that happened at given timestamp in multiple granularitiesfetch
: to get results for given granularity within given time range- other supporting function to calculate keys
1/**
2 * Fetch result
3 */
4@RequiredArgsConstructor
5public static class Result {
6 private final long timestamp;
7 private final long value;
8}
9
10@RequiredArgsConstructor
11public static class TimeSeries {
12 private final Jedis client;
13 private final String namespace;
14
15 /**
16 * Register an event that happened at given timestamp in multiple granularities
17 */
18 public void insert(long timestampInSec) {
19 for (Granularity granularity : Granularity.values()) {
20 String key = this.getKey(granularity, timestampInSec);
21 this.client.incr(key);
22 if (granularity.ttl != null) {
23 this.client.expire(key, granularity.ttl);
24 }
25 }
26 }
27
28 /**
29 * Get results of given {@code granularity} within given time range
30 */
31 public List<Result> fetch(Granularity granularity, long beginTimestamp, long endTimestamp) {
32 var begin = this.getRoundedTimestamp(beginTimestamp, granularity.duration);
33 var end = this.getRoundedTimestamp(endTimestamp, granularity.duration);
34 List<String> keys = new ArrayList<>();
35 for (var timestamp = begin; timestamp <= end; timestamp += granularity.duration) {
36 keys.add(this.getKey(granularity, timestamp));
37 }
38
39 List<String> values = this.client.mget(keys.toArray(new String[]{}));
40 List<Result> results = new ArrayList<>();
41 for (var i = 0; i < values.size(); i++) {
42 var timestamp = beginTimestamp + i * granularity.duration;
43 var value = Long.parseLong(values.get(i));
44 results.add(new Result(timestamp, value));
45 }
46 return results;
47 }
48
49 /**
50 * @return a key name in the format {@code namespace:granularity:timestamp}
51 */
52 private String getKey(Granularity granularity, long timestampInSec) {
53 var roundedTimestamp = this.getRoundedTimestamp(timestampInSec, granularity.duration);
54 return String.join(":", this.namespace, granularity.name, String.valueOf(roundedTimestamp));
55 }
56
57 /**
58 * if the {@code precision} is 60, any timestamp between 0 and 60 will result in 0,
59 * any timestamp between 60 and 120 will result in 60, and so on.
60 *
61 * @return a normalized timestamp based on given precision
62 */
63 private long getRoundedTimestamp(long timestampInSec, long precision) {
64 return (long) (Math.floor((double) timestampInSec / precision) * precision);
65 }
66}
Let’s try it out!
1TimeSeries item1Purchases = new TimeSeries(jedis, "purchases:item1");
2var beginTimeStamp = 0L;
3item1Purchases.insert(beginTimeStamp);
4item1Purchases.insert(beginTimeStamp + 1);
5item1Purchases.insert(beginTimeStamp + 1);
6item1Purchases.insert(beginTimeStamp + 3);
7item1Purchases.insert(beginTimeStamp + 61);
8
9List<Result> results4 = item1Purchases.fetch(Granularity.perSecond, beginTimeStamp, beginTimeStamp + 4); // [ "0:1", "1:2", "2:0", "3:1", "4:0" ]
10List<Result> results120 = item1Purchases.fetch(Granularity.perMinute, beginTimeStamp, beginTimeStamp + 120); // [ "0:4", "60:1", "120:0" ]
Optimizing with Hashes
The previous time series implementation use one Redis key for each second, minutes, hour and day. In scenario where an event is inserted every second, there will be 87,865 keys in Redis for a full day.:
- 86,400 keys for the
1sec
granularity(24 * 60 * 60) - 1,440 keys for the
1min
granularity(24 * 60) - 24 keys for the
1hour
granularity(24 * 1) - 1 key for the
1day
granularity
This is an enormous number of keys per day, and this number grows linearly over time. In the scenario where events inserted every second for 24 hours, Redis will need to allocate about 11MB memory as per the benchmark.
We can optimize this solution by using Hashes instead of Strings. Small Hashes are encoded in a different data structure, called a ziplist
.
The structure is memory optimized. There are two conditions for a Hash to be encoded as a ziplist and both have to be respected:
- it must have fewer fields than the threshold set in the configuration
hash-max-ziplist-entries
, default value is 512 - No field value can be larger than
hash-max-ziplist-value
, default value is 64 bytes.
In order to use Hashes and save memory space, the next solution will group multiple keys into a single Hash.
In a scenario where there is only the 1sec
granularity and there are data points across six different timestamps, the String solution will create following keys:
Key Name | Key Value |
---|---|
namespace:1sec:0 | 10 |
namespace:1sec:1 | 15 |
namespace:1sec:2 | 25 |
namespace:1sec:3 | 100 |
namespace:1sec:4 | 200 |
namespace:1sec:5 | 300 |
And if we use a Hash instead and create groups of three keys:
Key Name | Field Name | Field Value |
---|---|---|
namespace:1sec:0 | 0 | 10 |
1 | 15 | |
2 | 25 |
Key Name | Field Name | Field Value |
---|---|---|
namespace:1sec:3 | 3 | 100 |
4 | 200 | |
5 | 300 |
The Hash implementation will have the same methods. We will highlight the modified lines.
The field quantity
is added to the granularity and used to determine the Hash distribution:
- 1sec granularity: Stores a maximum of 300 timestamps of 1 second each(5 minutes of data points)
- 1min granularity: Stores a maximum of 480 timestamps of 1 minute each(8 hours of data points)
- 1hour granularity: Stores a maximum of 240 timestamps of 1 hour each(10 days of data points)
- 1day granularity: Stores a maximum of 30 timestamps of 1 day each(30 days of data points)
The number are chosen based on the default Redis configuration(hash-max-ziplist-entries is 512).
1/**
2 * Define various granularities and its properties
3 */
4@RequiredArgsConstructor
5public enum Granularity {
6 perSecond("1sec", Unit.hour.duration * 2, Unit.second.duration, Unit.minute.duration * 5),
7 perMinute("1min", Unit.day.duration * 7, Unit.minute.duration, Unit.hour.duration * 8),
8 perHour("1hour", Unit.day.duration * 60, Unit.hour.duration, Unit.day.duration * 10),
9 perDay("1day", null, Unit.day.duration, Unit.day.duration * 30),
10 ;
11
12 private final String name;
13 private final Long ttl;
14 private final long duration;
15 private final long quantity;
16}
Update the insert
method to use HINCRBY.
1/**
2 * Register an event that happened at given timestamp in multiple granularities
3 */
4public void insert(long timestampInSec) {
5 for (Granularity granularity : Granularity.values()) {
6 String key = this.getKey(granularity, timestampInSec);
7 String fieldName = String.valueOf(this.getRoundedTimestamp(timestampInSec, granularity.duration));
8 this.client.hincrBy(key, fieldName, 1);
9 if (granularity.ttl != null) {
10 this.client.expire(key, granularity.ttl);
11 }
12 }
13}
Update getKey
to use Granularity.quantity
instead of Granularity.duration
.
1/**
2 * @return a key name in the format {@code namespace:granularity:timestamp}
3 */
4private String getKey(Granularity granularity, long timestampInSec) {
5 var roundedTimestamp = this.getRoundedTimestamp(timestampInSec, granularity.quantity);
6 return String.join(":", this.namespace, granularity.name, String.valueOf(roundedTimestamp));
7}
Since we need to get multiple keys and fields in one go, we will create a transaction to fetch them from the Hash.
1/**
2 * Get results of given {@code granularity} within given time range
3 */
4public List<Result> fetch(Granularity granularity, long beginTimestamp, long endTimestamp) {
5 var begin = this.getRoundedTimestamp(beginTimestamp, granularity.duration);
6 var end = this.getRoundedTimestamp(endTimestamp, granularity.duration);
7
8 Transaction multi = this.client.multi();
9 for (var timestamp = begin; timestamp <= end; timestamp += granularity.duration) {
10 String key = this.getKey(granularity, timestamp);
11 String fieldName = String.valueOf(this.getRoundedTimestamp(timestamp, granularity.duration));
12 multi.hget(key, fieldName);
13 }
14
15 List<Object> values = multi.exec();
16
17 List<Result> results = new ArrayList<>();
18 for (var i = 0; i < values.size(); i++) {
19 var timestamp = beginTimestamp + i * granularity.duration;
20 var value = values.get(i) == null ? 0L : Long.parseLong((String) values.get(i));
21 results.add(new Result(timestamp, value));
22 }
23 return results;
24}
Let’s run the same script again!
1TimeSeries item1Purchases = new TimeSeries(jedis, "purchases:item1");
2var beginTimeStamp = 0L;
3item1Purchases.insert(beginTimeStamp);
4item1Purchases.insert(beginTimeStamp + 1);
5item1Purchases.insert(beginTimeStamp + 1);
6item1Purchases.insert(beginTimeStamp + 3);
7item1Purchases.insert(beginTimeStamp + 61);
8
9List<Result> results4 = item1Purchases.fetch(Granularity.perSecond, beginTimeStamp, beginTimeStamp + 4); // [ "0:1", "1:2", "2:0", "3:1", "4:0" ]
10List<Result> results120 = item1Purchases.fetch(Granularity.perMinute, beginTimeStamp, beginTimeStamp + 120); // [ "0:4", "60:1", "120:0" ]
Adding Uniqueness with Sorted Sets and HyperLogLog
This section presents two different Time Series implementations that support unique insertions. The first implementation uses Sorted Set and it is based on previous Hash implementation. The second implementation uses HyperLogLog, and it is based on previous String implementation.
Each solution has pros and cons. The proper solution should be chosen based on how much data needed to be stored and how accurate it needs to be.
- The Sorted Set solution works well and is 100% accurate
- The HyperLogLog solution uses less memory than the Sorted Set solution, but it is only 99.19% accurate
Sorted Set Implementation
Similar to Hash implementation, we need to define the quantity of each granularity.
1/**
2 * Define various granularities and its properties
3 */
4@RequiredArgsConstructor
5public enum Granularity {
6 perSecond("1sec", Unit.hour.duration * 2, Unit.second.duration, Unit.minute.duration * 2),
7 perMinute("1min", Unit.day.duration * 7, Unit.minute.duration, Unit.hour.duration * 2),
8 perHour("1hour", Unit.day.duration * 60, Unit.hour.duration, Unit.day.duration * 5),
9 perDay("1day", null, Unit.day.duration, Unit.day.duration * 30),
10 ;
11
12 private final String name;
13 private final Long ttl;
14 private final long duration;
15 private final long quantity;
16}
The field quantity
was changed based on the Sorted Set configuration zset-max-ziplist-entries, which is default to 128.
- 1sec granularity: Stores a maximum of 120 timestamps of 1 second each(2 minutes of data points)
- 1min granularity: Stores a maximum of 120 timestamps of 1 minute each(2 hours of data points)
- 1hour granularity: Stores a maximum of 120 timestamps of 1 hour each(5 days of data points)
- 1day granularity: Stores a maximum of 30 timestamps of 1 day each(30 days of data points)
While inserting into Sorted Set, we add a new parameter, the thing
, as unique value to be stored.
Together with timestamp, we create a unique identifier timestamp:thing
to represent an event for thing
happened at timestamp
.
We use timestamp as the score for the identified so that we can query by time range later.
1/**
2 * Register an event that happened at given timestamp in multiple granularities
3 */
4public void insert(long timestampInSec, String thing) {
5 for (Granularity granularity : Granularity.values()) {
6 String key = this.getKey(granularity, timestampInSec);
7 long timestampScore = this.getRoundedTimestamp(timestampInSec, granularity.duration);
8 String member = String.join(":", String.valueOf(timestampScore), thing);
9 this.client.zadd(key, timestampScore, member);
10 if (granularity.ttl != null) {
11 this.client.expire(key, granularity.ttl);
12 }
13 }
14}
While fetching, we use command ZCOUNT to calculate the unique values for each given time range.
1/**
2 * Get results of given {@code granularity} within given time range
3 */
4public List<Result> fetch(Granularity granularity, long beginTimestamp, long endTimestamp) {
5 var begin = this.getRoundedTimestamp(beginTimestamp, granularity.duration);
6 var end = this.getRoundedTimestamp(endTimestamp, granularity.duration);
7
8 Transaction multi = this.client.multi();
9 for (var timestamp = begin; timestamp <= end; timestamp += granularity.duration) {
10 String key = this.getKey(granularity, timestamp);
11 multi.zcount(key, timestamp, timestamp);
12 }
13
14 List<Object> values = multi.exec();
15
16 List<Result> results = new ArrayList<>();
17 for (var i = 0; i < values.size(); i++) {
18 var timestamp = beginTimestamp + i * granularity.duration;
19 var value = values.get(i) == null ? 0L : (Long) values.get(i);
20 results.add(new Result(timestamp, value));
21 }
22 return results;
23}
Let’s try with some duplicated events!
1TimeSeries item1Purchases = new TimeSeries(jedis, "purchases:item1");
2var beginTimeStamp = 0L;
3item1Purchases.insert(beginTimeStamp, "user:max");
4item1Purchases.insert(beginTimeStamp, "user:max"); // inserting duplicated events
5item1Purchases.insert(beginTimeStamp + 1, "user:hugo");
6item1Purchases.insert(beginTimeStamp + 1, "user:renata");
7item1Purchases.insert(beginTimeStamp + 3, "user:hugo");
8item1Purchases.insert(beginTimeStamp + 61, "user:kc");
9
10List<Result> results4 = item1Purchases.fetch(Granularity.perSecond, beginTimeStamp, beginTimeStamp + 4); // [ "0:1", "1:2", "2:0", "3:1", "4:0" ]
11List<Result> results120 = item1Purchases.fetch(Granularity.perMinute, beginTimeStamp, beginTimeStamp + 120); // [ "0:3", "60:1", "120:0" ]
HyperLogLog Implementation
The HyperLogLog implementation does not perform any key grouping. It uses one key per timestamp.
Compared to the String implementation, it changes the insert
method to use PFADD instead of INCRBY,
and changes the fetch
method to make multiple calls to PFCOUNT instead of MGET.
1/**
2 * Register an event that happened at given timestamp in multiple granularities
3 */
4public void insert(long timestampInSec, String thing) {
5 for (Granularity granularity : Granularity.values()) {
6 String key = this.getKey(granularity, timestampInSec);
7 this.client.incr(key);
8 this.client.pfadd(key, thing);
9 if (granularity.ttl != null) {
10 this.client.expire(key, granularity.ttl);
11 }
12 }
13}
1/**
2 * Get results of given {@code granularity} within given time range
3 */
4public List<Result> fetch(Granularity granularity, long beginTimestamp, long endTimestamp) {
5 var begin = this.getRoundedTimestamp(beginTimestamp, granularity.duration);
6 var end = this.getRoundedTimestamp(endTimestamp, granularity.duration);
7
8 Transaction multi = this.client.multi();
9 for (var timestamp = begin; timestamp <= end; timestamp += granularity.duration) {
10 multi.pfcount(this.getKey(granularity, timestamp));
11 }
12
13 List<Object> values = multi.exec();
14 List<Result> results = new ArrayList<>();
15 for (var i = 0; i < values.size(); i++) {
16 var timestamp = beginTimestamp + i * granularity.duration;
17 var value = values.get(i) == null ? 0L : (Long) values.get(i);
18 results.add(new Result(timestamp, value));
19 }
20 return results;
21}
Here comes the last shot!
1TimeSeries item1Purchases = new TimeSeries(jedis, "purchases:item1");
2var beginTimeStamp = 0L;
3item1Purchases.insert(beginTimeStamp, "user:max");
4item1Purchases.insert(beginTimeStamp, "user:max"); // inserting duplicated events
5item1Purchases.insert(beginTimeStamp + 1, "user:hugo");
6item1Purchases.insert(beginTimeStamp + 1, "user:renata");
7item1Purchases.insert(beginTimeStamp + 3, "user:hugo");
8item1Purchases.insert(beginTimeStamp + 61, "user:kc");
9
10List<Result> results4 = item1Purchases.fetch(Granularity.perSecond, beginTimeStamp, beginTimeStamp + 4); // [ "0:1", "1:2", "2:0", "3:1", "4:0" ]
11List<Result> results120 = item1Purchases.fetch(Granularity.perMinute, beginTimeStamp, beginTimeStamp + 120); // [ "0:3", "60:1", "120:0" ]
