Commit 651b5725 authored by Orne Brocaar's avatar Orne Brocaar
Browse files

Use Redis Stream ID for de-duplication of device events.

The Redis Stream is incremented for each event (by Redis). Using the
PublishedAt timestamp has the risk of race-conditions as the timestamp
is set on constructing the message, not on publishing it. Therefore it
is possible that the latest message has a timestamp before the message
that was published before, in which case the web-interface would ignore
it.
parent 4db18250
......@@ -5,7 +5,7 @@ go 1.16
require (
github.com/NickBall/go-aes-key-wrap v0.0.0-20170929221519-1c3aa3e4dfc5
github.com/aws/aws-sdk-go v1.26.3
github.com/brocaar/chirpstack-api/go/v3 v3.11.1
github.com/brocaar/chirpstack-api/go/v3 v3.12.3
github.com/brocaar/lorawan v0.0.0-20210809075358-95fc1667572e
github.com/coreos/go-oidc v2.2.1+incompatible
github.com/eclipse/paho.mqtt.golang v1.3.1
......
......@@ -72,8 +72,8 @@ github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwj
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2 h1:oMCHnXa6CCCafdPDbMh/lWRhRByN0VFLvv+g+ayx1SI=
github.com/blakesmith/ar v0.0.0-20150311145944-8bd4349a67f2/go.mod h1:PkYb9DJNAwrSvRx5DYA+gUcOIgTGVMNkfSCbZM8cWpI=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/brocaar/chirpstack-api/go/v3 v3.11.1 h1:/CpPFxvaNcF0yEE+Y0t2BJF529sciIMTsK/Wx565Z7c=
github.com/brocaar/chirpstack-api/go/v3 v3.11.1/go.mod h1:v8AWP19nOJK4rwJsr1+weDfpUc4UNLbRh8Eygn4Oh00=
github.com/brocaar/chirpstack-api/go/v3 v3.12.3 h1:/sj8cIpoWrlJWwWznF2lwOxPefLamnKPyHglAFfll6s=
github.com/brocaar/chirpstack-api/go/v3 v3.12.3/go.mod h1:v8AWP19nOJK4rwJsr1+weDfpUc4UNLbRh8Eygn4Oh00=
github.com/brocaar/lorawan v0.0.0-20210809075358-95fc1667572e h1:htxGGoTtAoy4p3qnq42qb0GfupCLe2AXJkSqzLYEPnA=
github.com/brocaar/lorawan v0.0.0-20210809075358-95fc1667572e/go.mod h1:Vlf3gOwizqX4y3snWe/i2EqRT83HvYuwBjRu39PevW0=
github.com/caarlos0/ctrlc v1.0.0 h1:2DtF8GSIcajgffDFJzyG15vO+1PuBWOMUdFut7NnXhw=
......@@ -351,7 +351,6 @@ github.com/ktrysmt/go-bitbucket v0.6.4/go.mod h1:9u0v3hsd2rqCHRIpbir1oP7F58uo5dq
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.8.0 h1:9xohqzkUwzR4Ga4ivdTcawVS89YSDVxXMa3xJX3cGzg=
github.com/lib/pq v1.8.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8=
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
......
......@@ -848,6 +848,7 @@ func (a *DeviceAPI) StreamEventLogs(req *pb.StreamDeviceEventLogsRequest, srv pb
PublishedAt: el.PublishedAt,
Type: el.Type,
PayloadJson: string(b),
StreamId: el.StreamID,
}
err = srv.Send(&resp)
......
......@@ -44,6 +44,7 @@ type EventLog struct {
Type string
PublishedAt *timestamp.Timestamp
Payload json.RawMessage
StreamID string
}
// LogEventForDevice logs an event for the given device.
......@@ -152,6 +153,7 @@ func GetEventLogForDevice(ctx context.Context, devEUI lorawan.EUI64, eventsChan
Type: event,
Payload: json.RawMessage(jsonB),
PublishedAt: pl.GetPublishedAt(),
StreamID: msg.ID,
}
}
}
......
......@@ -61,6 +61,8 @@ func (ts *LoggerTestSuite) TestHandleUplinkEvent() {
}
assert.NoError(ts.integration.HandleUplinkEvent(context.Background(), nil, nil, pl))
el := <-ts.logChannel
assert.NotEqual("", el.StreamID)
el.StreamID = ""
assert.Equal(toEventLog("up", &pl), el)
}
......@@ -71,6 +73,8 @@ func (ts *LoggerTestSuite) TestJoinEvent() {
}
assert.NoError(ts.integration.HandleJoinEvent(context.Background(), nil, nil, pl))
el := <-ts.logChannel
assert.NotEqual("", el.StreamID)
el.StreamID = ""
assert.Equal(toEventLog("join", &pl), el)
}
......@@ -81,6 +85,8 @@ func (ts *LoggerTestSuite) TestAckEvent() {
}
assert.NoError(ts.integration.HandleAckEvent(context.Background(), nil, nil, pl))
el := <-ts.logChannel
assert.NotEqual("", el.StreamID)
el.StreamID = ""
assert.Equal(toEventLog("ack", &pl), el)
}
......@@ -91,6 +97,8 @@ func (ts *LoggerTestSuite) TestErrorEvent() {
}
assert.NoError(ts.integration.HandleErrorEvent(context.Background(), nil, nil, pl))
el := <-ts.logChannel
assert.NotEqual("", el.StreamID)
el.StreamID = ""
assert.Equal(toEventLog("error", &pl), el)
}
......@@ -101,6 +109,8 @@ func (ts *LoggerTestSuite) TestStatusEvent() {
}
assert.NoError(ts.integration.HandleStatusEvent(context.Background(), nil, nil, pl))
el := <-ts.logChannel
assert.NotEqual("", el.StreamID)
el.StreamID = ""
assert.Equal(toEventLog("status", &pl), el)
}
......@@ -111,6 +121,8 @@ func (ts *LoggerTestSuite) TestLocationEvent() {
}
assert.NoError(ts.integration.HandleLocationEvent(context.Background(), nil, nil, pl))
el := <-ts.logChannel
assert.NotEqual("", el.StreamID)
el.StreamID = ""
assert.Equal(toEventLog("location", &pl), el)
}
......@@ -121,6 +133,8 @@ func (ts *LoggerTestSuite) TestTxAckEvent() {
}
assert.NoError(ts.integration.HandleTxAckEvent(context.Background(), nil, nil, pl))
el := <-ts.logChannel
assert.NotEqual("", el.StreamID)
el.StreamID = ""
assert.Equal(toEventLog("txack", &pl), el)
}
......@@ -131,6 +145,8 @@ func (ts *LoggerTestSuite) TestIntegrationEvent() {
}
assert.NoError(ts.integration.HandleIntegrationEvent(context.Background(), nil, nil, pl))
el := <-ts.logChannel
assert.NotEqual("", el.StreamID)
el.StreamID = ""
assert.Equal(toEventLog("integration", &pl), el)
}
......
......@@ -224,11 +224,11 @@ class DeviceData extends Component {
}
let data = this.state.data;
const now = new Date();
if (data.length === 0 || moment(d.publishedAt).isAfter(data[0].publishedAt)) {
// only append when stream id > last item.
if (data.length === 0 || parseInt(d.streamID.replace("-", "")) > parseInt(data[0].id.replace("-", ""))) {
data.unshift({
id: now.getTime(),
id: d.streamID,
publishedAt: d.publishedAt,
type: d.type,
payload: JSON.parse(d.payloadJSON),
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment