Stream Join
It’s important to join the streams and join them continuously as data streams in to different streams.
However, this is not as simple as joining two tables which have data pretty much static in nature.
Here the data is coming in, with different timestamps, in different random order and then we may wish to join the streams where data from one stream could be moving faster than the other one.
Most of the time there won’t be same number of events coming in from two different streams.
Further, depending upon use cases, we may wish to join any two data based on some condition or wish to chose the latest event in one or both the streams etc. Therefore, it’s important that the db support more than one kind of join.
Broadly, there are three ways to join;
Active join – One of the two participating streams will be active join stream, other will be passive
Passive Join – One of the two participating streams will be passive join stream, other will be active
There are few types of joins defined and they should be used for different use cases; Following are the types of joins supported in the BangDB;
join_type = 1 means only once join. [ no active passive, both streams will join actively ]
join_type = 2 means passive join (not necessarily the latest)
join_type = 3 means passive join but join with the latest one
join_type = 4 means active join (not necessarily the latest)
join_type = 5 means active join but with the latest one
join_type = 6 means simple cep join, typically it involves only one stream, hence self join case, but used in cep query only (2,4) and (3,5) go together
Active, passive join concept is created to ensure we allow proper join as required by the case. Stream which is joining actively, will be responsible for actual join while stream which is joining passively can participate in the process. This means passive stream will simply check if basic condition satisfies then it will place itself for next join candidate.
Simple join
Here the two streams joins the data based on the condition with the latest data from the slower stream and last non-joined data from the faster stream.
So if we have two streams; s1 and s2 and we have data coming in like following;
join condition is points in both the stream should be same, basically join temperature and pressure streams for same points
s1 s2
t11 v11 p1
t21 v21 p2
t22 v22 p1
t23 v23 p1
t24 v24 p1
t12 v12 p2
t25 v25 p1
t26 v26 p2
t13 v13 p1
Here if we join stream in “simple manner” (“type” = ) then following will be the events in joined stream s3
s3
t22 v11 v22 p1
t12 v12 v21 p2
t13 v13 v23 p1 …
As you notice, both streams are joining actively, and they join with the latest event that are not joined yet and once joined the events are not used further
Also note that the pointer for events in both streams move as it joins the event, and even if speed is different it ensures that multiple redundant join doesn’t happen repeating the events
Once event (t11, v11) and (t22, v22) are joined, even though we got (t23, v23) it waited until next event in s1 was received. It didn’t join with the older event of s1. Therefore, once an event has been joined, the same event is not used for next or subsequent join. Stream manager waits for next event and then it joins with the earliest non-joined event of the other stream.
Active passive join
Here we have one stream which does active join and the other stream which simply participates passively in the join process. Here we have two types of such joins, one is where the join happens with only latest events whenever possible and the other one is where join happens not necessarily with the latest but the available ones. Let’s see examples for each to get the clarity;
This join will invalidate data only if new data is not arrived join_type = 3 for passive, and 5 for active stream joined stream will have following data;
s3
t22 v11 v22 p1
t23 v11 v23 p1
t24 v11 v24 p1
t25 v11 v25 p1
t26 v12 v26 p2
t24 v12 v24 …
As you see, the data kept joining with the other (passive) stream latest data, the moment newer data arrived in the active stream. This is different from simple join(type = 1) or active – active join where once joined same data/event was not used again for the join. Here in case of active-passive (type = 3 and 5), active join will never reuse the joined data but passive will keep using the events for join as we get more events in active stream
join_type = 2
for passive, and 4 for active stream
s3
t22 v11 v22 p1
t23 v11 v23 p1
t24 v11 v24 p1
t26 v12 v26 p2
t24 v12 v24 …
If you notice, you will find this is very similar to the previous one, except we don’t have event (t25, v11, v25, p1) in this joined stream. Basically, passive stream event is kept joining with the active stream new events until passive stream receive new data,
then the latest from passive joins
Now, let see some of the examples here for more clarity;