diff options
| author | xengineering <me@xengineering.eu> | 2025-12-20 12:09:44 +0100 |
|---|---|---|
| committer | xengineering <me@xengineering.eu> | 2025-12-20 12:09:44 +0100 |
| commit | b0fcc3e7590b9f3486f1edf8c2b004d035e38652 (patch) | |
| tree | 81c123444d97f311e3b45a1881c9d2ff4ad2581b /vendor/github.com/eclipse/paho.mqtt.golang/messageids.go | |
| parent | f9fac85222892e45554a2cc49dd93455a374ef68 (diff) | |
| download | sia-server-b0fcc3e7590b9f3486f1edf8c2b004d035e38652.tar sia-server-b0fcc3e7590b9f3486f1edf8c2b004d035e38652.tar.zst sia-server-b0fcc3e7590b9f3486f1edf8c2b004d035e38652.zip | |
Add github.com/eclipse/paho.mqtt.golang
This dependency is required to use MQTT with Go.
Diffstat (limited to 'vendor/github.com/eclipse/paho.mqtt.golang/messageids.go')
| -rw-r--r-- | vendor/github.com/eclipse/paho.mqtt.golang/messageids.go | 200 |
1 files changed, 200 insertions, 0 deletions
diff --git a/vendor/github.com/eclipse/paho.mqtt.golang/messageids.go b/vendor/github.com/eclipse/paho.mqtt.golang/messageids.go new file mode 100644 index 0000000..04c94bd --- /dev/null +++ b/vendor/github.com/eclipse/paho.mqtt.golang/messageids.go @@ -0,0 +1,200 @@ +/* + * Copyright (c) 2013 IBM Corp and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v2.0 + * and Eclipse Distribution License v1.0 which accompany this distribution. + * + * The Eclipse Public License is available at + * https://www.eclipse.org/legal/epl-2.0/ + * and the Eclipse Distribution License is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * Contributors: + * Seth Hoenig + * Allan Stockdill-Mander + * Mike Robertson + * Matt Brittan + */ + +package mqtt + +import ( + "fmt" + "sync" + "time" +) + +// MId is 16 bit message id as specified by the MQTT spec. +// In general, these values should not be depended upon by +// the client application. +type MId uint16 + +type messageIds struct { + mu sync.RWMutex // Named to prevent Mu from being accessible directly via client + index map[uint16]tokenCompletor + + lastIssuedID uint16 // The most recently issued ID. Used so we cycle through ids rather than immediately reusing them (can make debugging easier) +} + +const ( + midMin uint16 = 1 + midMax uint16 = 65535 +) + +// cleanup clears the message ID map; completes all token types and sets error on PUB, SUB and UNSUB tokens. +func (mids *messageIds) cleanUp() { + mids.mu.Lock() + for _, token := range mids.index { + switch token.(type) { + case *PublishToken: + token.setError(fmt.Errorf("connection lost before Publish completed")) + case *SubscribeToken: + token.setError(fmt.Errorf("connection lost before Subscribe completed")) + case *UnsubscribeToken: + token.setError(fmt.Errorf("connection lost before Unsubscribe completed")) + case nil: // should not be any nil entries + continue + } + token.flowComplete() + } + mids.index = make(map[uint16]tokenCompletor) + mids.mu.Unlock() + DEBUG.Println(MID, "cleaned up") +} + +// cleanUpSubscribe removes all SUBSCRIBE and UNSUBSCRIBE tokens (setting error) +// This may be called when the connection is lost, and we will not be resending SUB/UNSUB packets +func (mids *messageIds) cleanUpSubscribe() { + mids.mu.Lock() + for mid, token := range mids.index { + switch token.(type) { + case *SubscribeToken: + token.setError(fmt.Errorf("connection lost before Subscribe completed")) + delete(mids.index, mid) + case *UnsubscribeToken: + token.setError(fmt.Errorf("connection lost before Unsubscribe completed")) + delete(mids.index, mid) + } + } + mids.mu.Unlock() + DEBUG.Println(MID, "cleaned up subs") +} + +func (mids *messageIds) freeID(id uint16) { + mids.mu.Lock() + delete(mids.index, id) + mids.mu.Unlock() +} + +func (mids *messageIds) claimID(token tokenCompletor, id uint16) { + mids.mu.Lock() + defer mids.mu.Unlock() + if _, ok := mids.index[id]; !ok { + mids.index[id] = token + } else { + old := mids.index[id] + old.flowComplete() + mids.index[id] = token + } + if id > mids.lastIssuedID { + mids.lastIssuedID = id + } +} + +// getID will return an available id or 0 if none available +// The id will generally be the previous id + 1 (because this makes tracing messages a bit simpler) +func (mids *messageIds) getID(t tokenCompletor) uint16 { + mids.mu.Lock() + defer mids.mu.Unlock() + i := mids.lastIssuedID // note: the only situation where lastIssuedID is 0 the map will be empty + looped := false // uint16 will loop from 65535->0 + for { + i++ + if i == 0 { // skip 0 because its not a valid id (Control Packets MUST contain a non-zero 16-bit Packet Identifier [MQTT-2.3.1-1]) + i++ + looped = true + } + if _, ok := mids.index[i]; !ok { + mids.index[i] = t + mids.lastIssuedID = i + return i + } + if (looped && i == mids.lastIssuedID) || (mids.lastIssuedID == 0 && i == midMax) { // lastIssuedID will be 0 at startup + return 0 // no free ids + } + } +} + +func (mids *messageIds) getToken(id uint16) tokenCompletor { + mids.mu.RLock() + defer mids.mu.RUnlock() + if token, ok := mids.index[id]; ok { + return token + } + return &DummyToken{id: id} +} + +type DummyToken struct { + id uint16 +} + +// Wait implements the Token Wait method. +func (d *DummyToken) Wait() bool { + return true +} + +// WaitTimeout implements the Token WaitTimeout method. +func (d *DummyToken) WaitTimeout(t time.Duration) bool { + return true +} + +// Done implements the Token Done method. +func (d *DummyToken) Done() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +} + +func (d *DummyToken) flowComplete() { + ERROR.Printf("A lookup for token %d returned nil\n", d.id) +} + +func (d *DummyToken) Error() error { + return nil +} + +func (d *DummyToken) setError(e error) {} + +// PlaceHolderToken does nothing and was implemented to allow a messageid to be reserved +// it differs from DummyToken in that calling flowComplete does not generate an error (it +// is expected that flowComplete will be called when the token is overwritten with a real token) +type PlaceHolderToken struct { + id uint16 +} + +// Wait implements the Token Wait method. +func (p *PlaceHolderToken) Wait() bool { + return true +} + +// WaitTimeout implements the Token WaitTimeout method. +func (p *PlaceHolderToken) WaitTimeout(t time.Duration) bool { + return true +} + +// Done implements the Token Done method. +func (p *PlaceHolderToken) Done() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +} + +func (p *PlaceHolderToken) flowComplete() { +} + +func (p *PlaceHolderToken) Error() error { + return nil +} + +func (p *PlaceHolderToken) setError(e error) {} |
