summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorxengineering <me@xengineering.eu>2023-06-28 10:28:44 +0200
committerxengineering <me@xengineering.eu>2023-06-28 10:28:44 +0200
commit9afa580f3f3207ac449be86c1a305cb716b77f76 (patch)
tree93429d634b17291fe970dc4ae32e84ec18f7bef1
parentc600e10faf59e96be6b0ad440bd4c5d8cc13d38a (diff)
parent4b4fe72fdb100df492b1df1960775ff8b98d6dfc (diff)
downloadlimox-9afa580f3f3207ac449be86c1a305cb716b77f76.tar
limox-9afa580f3f3207ac449be86c1a305cb716b77f76.tar.zst
limox-9afa580f3f3207ac449be86c1a305cb716b77f76.zip
Merge branch 'stream-pair'
This adds the source file xmpp/stream_pair.go with the central function runStreamPair(). This function is called once by a session and could call itself. That way an initial stream and nested streams are implemented and closed via return and defer.
-rw-r--r--xmpp/sasl.go2
-rw-r--r--xmpp/session.go46
-rw-r--r--xmpp/stream_pair.go82
3 files changed, 84 insertions, 46 deletions
diff --git a/xmpp/sasl.go b/xmpp/sasl.go
index 1cb3670..8687782 100644
--- a/xmpp/sasl.go
+++ b/xmpp/sasl.go
@@ -1,8 +1,8 @@
package xmpp
import (
- "encoding/xml"
"encoding/base64"
+ "encoding/xml"
"log"
)
diff --git a/xmpp/session.go b/xmpp/session.go
index e0819da..f14cd34 100644
--- a/xmpp/session.go
+++ b/xmpp/session.go
@@ -49,26 +49,9 @@ func (s *session) run() {
go s.ed.run()
defer func() { s.ed.terminator <- true }()
- s.openStream()
- defer s.closeStreams()
-
- s.sasl()
-
s.out <- SessionConnect{}
- for {
- select {
- case data := <-s.in:
- switch data.(type) {
- case SessionShouldDisconnect:
- return
- default:
- log.Printf("Unknown data '%d'!\n", data)
- }
- case _ = <-s.rx:
- // TODO route received XML token here
- }
- }
+ runStreamPair(s)
}
func (s *session) startTransport() error {
@@ -88,30 +71,3 @@ func (s *session) startTransport() error {
return nil
}
-
-func (s *session) openStream() {
- start := xml.StartElement{
- xml.Name{"jabber:client", "stream:stream"},
- []xml.Attr{
- xml.Attr{xml.Name{"", "from"}, s.jid},
- xml.Attr{xml.Name{"", "to"}, domainpart(s.jid)},
- xml.Attr{xml.Name{"", "version"}, "1.0"},
- xml.Attr{xml.Name{"", "xml:lang"}, "en"},
- xml.Attr{xml.Name{"", "xmlns:stream"}, "http://etherx.jabber.org/streams"},
- },
- }
-
- s.streamEnd = start.End()
-
- err := s.ed.encodeToken(start)
- if err != nil {
- log.Println("Could not encode stream start!")
- }
-}
-
-func (s *session) closeStreams() {
- err := s.ed.encodeToken(s.streamEnd)
- if err != nil {
- log.Println("Could not encode stream end!")
- }
-}
diff --git a/xmpp/stream_pair.go b/xmpp/stream_pair.go
new file mode 100644
index 0000000..e02cca6
--- /dev/null
+++ b/xmpp/stream_pair.go
@@ -0,0 +1,82 @@
+package xmpp
+
+import (
+ "encoding/xml"
+ "log"
+)
+
+func runStreamPair(s *session) {
+ openStream(s)
+ defer closeStream(s)
+
+ for {
+ select {
+ case data := <-s.in:
+ switch data.(type) {
+ case SessionShouldDisconnect:
+ return
+ default:
+ log.Printf("Unknown data '%d'!\n", data)
+ }
+ case _ = <-s.rx:
+ // TODO route received XML token here
+ }
+ }
+}
+
+func openStream(s *session) {
+ start := xml.StartElement{
+ xml.Name{"jabber:client", "stream:stream"},
+ []xml.Attr{
+ xml.Attr{xml.Name{"", "from"}, s.jid},
+ xml.Attr{xml.Name{"", "to"}, domainpart(s.jid)},
+ xml.Attr{xml.Name{"", "version"}, "1.0"},
+ xml.Attr{xml.Name{"", "xml:lang"}, "en"},
+ xml.Attr{xml.Name{"", "xmlns:stream"}, "http://etherx.jabber.org/streams"},
+ },
+ }
+
+ s.streamEnd = start.End()
+
+ err := s.ed.encodeToken(start)
+ if err != nil {
+ log.Println("Could not encode stream start!")
+ }
+
+ syncStreams(s)
+}
+
+// syncStreams drops XML tokens from the receiving stream until an
+// xml.StartElement with the local name `stream` is received. If this function
+// is called after opening a new stream in the sending direction it is ensured
+// that both streams directions work on the same stream level and are in sync.
+// Tokens received which are not a stream StartElement are not handled but
+// logged since this should not happen.
+func syncStreams(s *session) {
+ for {
+ select {
+ case data := <-s.in:
+ switch data.(type) {
+ case SessionShouldDisconnect:
+ return
+ default:
+ log.Printf("Unhandled data '%d' during stream sync!\n", data)
+ }
+ case t := <-s.rx:
+ switch token := t.(type) {
+ case xml.StartElement:
+ if token.Name.Local == "stream" {
+ return
+ }
+ }
+ log.Printf("Unhandled XML token '%v' during stream sync!\n", t)
+ }
+ }
+}
+
+func closeStream(s *session) {
+ err := s.ed.encodeToken(s.streamEnd)
+ if err != nil {
+ log.Println("Could not encode stream end!")
+ }
+}