summaryrefslogtreecommitdiff
path: root/vendor/github.com/eclipse/paho.mqtt.golang/router.go
blob: 5cfc5e6ca72699c014e5efea4160c0488bc8adb5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
/*
 * Copyright (c) 2021 IBM Corp and others.
 *
 * All rights reserved. This program and the accompanying materials
 * are made available under the terms of the Eclipse Public License v2.0
 * and Eclipse Distribution License v1.0 which accompany this distribution.
 *
 * The Eclipse Public License is available at
 *    https://www.eclipse.org/legal/epl-2.0/
 * and the Eclipse Distribution License is available at
 *   http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * Contributors:
 *    Seth Hoenig
 *    Allan Stockdill-Mander
 *    Mike Robertson
 */

package mqtt

import (
	"container/list"
	"strings"
	"sync"

	"github.com/eclipse/paho.mqtt.golang/packets"
)

// route is a type which associates MQTT Topic strings with a
// callback to be executed upon the arrival of a message associated
// with a subscription to that topic.
type route struct {
	topic    string
	callback MessageHandler
}

// match takes a slice of strings which represent the route being tested having been split on '/'
// separators, and a slice of strings representing the topic string in the published message, similarly
// split.
// The function determines if the topic string matches the route according to the MQTT topic rules
// and returns a boolean of the outcome
func match(route []string, topic []string) bool {
	if len(route) == 0 {
		return len(topic) == 0
	}

	if len(topic) == 0 {
		return route[0] == "#"
	}

	if route[0] == "#" {
		return true
	}

	if (route[0] == "+") || (route[0] == topic[0]) {
		return match(route[1:], topic[1:])
	}
	return false
}

func routeIncludesTopic(route, topic string) bool {
	return match(routeSplit(route), strings.Split(topic, "/"))
}

// removes $share and sharename when splitting the route to allow
// shared subscription routes to correctly match the topic
func routeSplit(route string) []string {
	var result []string
	if strings.HasPrefix(route, "$share") {
		result = strings.Split(route, "/")[2:]
	} else {
		result = strings.Split(route, "/")
	}
	return result
}

// match takes the topic string of the published message and does a basic compare to the
// string of the current Route, if they match it returns true
func (r *route) match(topic string) bool {
	return r.topic == topic || routeIncludesTopic(r.topic, topic)
}

type router struct {
	sync.RWMutex
	routes         *list.List
	defaultHandler MessageHandler
	messages       chan *packets.PublishPacket
}

// newRouter returns a new instance of a Router and channel which can be used to tell the Router
// to stop
func newRouter() *router {
	router := &router{routes: list.New(), messages: make(chan *packets.PublishPacket)}
	return router
}

// addRoute takes a topic string and MessageHandler callback. It looks in the current list of
// routes to see if there is already a matching Route. If there is it replaces the current
// callback with the new one. If not it add a new entry to the list of Routes.
func (r *router) addRoute(topic string, callback MessageHandler) {
	r.Lock()
	defer r.Unlock()
	for e := r.routes.Front(); e != nil; e = e.Next() {
		if e.Value.(*route).topic == topic {
			r := e.Value.(*route)
			r.callback = callback
			return
		}
	}
	r.routes.PushBack(&route{topic: topic, callback: callback})
}

// deleteRoute takes a route string, looks for a matching Route in the list of Routes. If
// found it removes the Route from the list.
func (r *router) deleteRoute(topic string) {
	r.Lock()
	defer r.Unlock()
	for e := r.routes.Front(); e != nil; e = e.Next() {
		if e.Value.(*route).topic == topic {
			r.routes.Remove(e)
			return
		}
	}
}

// setDefaultHandler assigns a default callback that will be called if no matching Route
// is found for an incoming Publish.
func (r *router) setDefaultHandler(handler MessageHandler) {
	r.Lock()
	defer r.Unlock()
	r.defaultHandler = handler
}

// matchAndDispatch takes a channel of Message pointers as input and starts a go routine that
// takes messages off the channel, matches them against the internal route list and calls the
// associated callback (or the defaultHandler, if one exists and no other route matched). If
// anything is sent down the stop channel the function will end.
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
	ackChan := make(chan *PacketAndToken) // Channel returned to caller; closed when goroutine terminates

	// In some cases message acknowledgments may come through after shutdown (connection is down etc). Where this is the
	// case we need to accept any such requests and then ignore them. Note that this is not a perfect solution, if we
	// have reconnected, and the session is still live, then the Ack really should be sent (see Issus #726)
	var ackMutex sync.RWMutex
	sendAckChan := ackChan // This will be set to nil before ackChan is closed
	sendAck := func(ack *PacketAndToken) {
		ackMutex.RLock()
		defer ackMutex.RUnlock()
		if sendAckChan != nil {
			sendAckChan <- ack
		} else {
			DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
		}
	}

	go func() { // Main go routine handling inbound messages
		var handlers []MessageHandler
		for message := range messages {
			// DEBUG.Println(ROU, "matchAndDispatch received message")
			sent := false
			r.RLock()
			m := messageFromPublish(message, ackFunc(sendAck, client.persist, message))
			for e := r.routes.Front(); e != nil; e = e.Next() {
				if e.Value.(*route).match(message.TopicName) {
					if order {
						handlers = append(handlers, e.Value.(*route).callback)
					} else {
						hd := e.Value.(*route).callback
						go func() {
							hd(client, m)
							if !client.options.AutoAckDisabled {
								m.Ack()
							}
						}()
					}
					sent = true
				}
			}
			if !sent {
				if r.defaultHandler != nil {
					if order {
						handlers = append(handlers, r.defaultHandler)
					} else {
						go func() {
							r.defaultHandler(client, m)
							if !client.options.AutoAckDisabled {
								m.Ack()
							}
						}()
					}
				} else {
					DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.")
				}
			}
			r.RUnlock()
			if order {
				for _, handler := range handlers {
					handler(client, m)
					if !client.options.AutoAckDisabled {
						m.Ack()
					}
				}
				handlers = handlers[:0]
			}
			// DEBUG.Println(ROU, "matchAndDispatch handled message")
		}
		ackMutex.Lock()
		sendAckChan = nil
		ackMutex.Unlock()
		close(ackChan) // as sendAckChan is now nil nothing further will be sent on this
		DEBUG.Println(ROU, "matchAndDispatch exiting")
	}()
	return ackChan
}