summaryrefslogtreecommitdiff
path: root/xmpp/streams.go
diff options
context:
space:
mode:
Diffstat (limited to 'xmpp/streams.go')
-rw-r--r--xmpp/streams.go131
1 files changed, 131 insertions, 0 deletions
diff --git a/xmpp/streams.go b/xmpp/streams.go
new file mode 100644
index 0000000..87df86a
--- /dev/null
+++ b/xmpp/streams.go
@@ -0,0 +1,131 @@
+package xmpp
+
+import (
+ "encoding/xml"
+ "log"
+)
+
+func runStreamPair(s *session) {
+ end := openStream(s)
+ defer closeStream(s, end)
+
+ buf := newElementBuffer()
+
+ for {
+ select {
+ case data := <-s.in:
+ switch data.(type) {
+ case SessionShouldDisconnect:
+ return
+ default:
+ log.Printf("Unknown data '%d'!\n", data)
+ }
+ case t := <-s.rx:
+ err := buf.add(t)
+ if err != nil {
+ log.Printf("Could not add XML token to buffer: %v\n", err)
+ return
+ }
+ if buf.isComplete() {
+ element := buf.reset()
+ route(s, element, getRoutingTable())
+ }
+ }
+ }
+}
+
+func openStream(s *session) xml.EndElement {
+ start := xml.StartElement{
+ xml.Name{"jabber:client", "stream:stream"},
+ []xml.Attr{
+ xml.Attr{xml.Name{"", "from"}, s.jid},
+ xml.Attr{xml.Name{"", "to"}, domainpart(s.jid)},
+ xml.Attr{xml.Name{"", "version"}, "1.0"},
+ xml.Attr{xml.Name{"", "xml:lang"}, "en"},
+ xml.Attr{xml.Name{"", "xmlns:stream"}, "http://etherx.jabber.org/streams"},
+ },
+ }
+ end := start.End()
+
+ err := s.ed.encodeToken(start)
+ if err != nil {
+ log.Println("Could not encode stream start!")
+ }
+
+ syncStreams(s)
+
+ return end
+}
+
+// syncStreams drops XML tokens from the receiving stream until an
+// xml.StartElement with the local name `stream` is received. If this function
+// is called after opening a new stream in the sending direction it is ensured
+// that both streams directions work on the same stream level and are in sync.
+// Tokens received which are not a stream StartElement are not handled but
+// logged since this should not happen.
+func syncStreams(s *session) {
+ for {
+ select {
+ case data := <-s.in:
+ switch data.(type) {
+ case SessionShouldDisconnect:
+ return
+ default:
+ log.Printf("Unhandled data '%d' during stream sync!\n", data)
+ }
+ case t := <-s.rx:
+ switch token := t.(type) {
+ case xml.StartElement:
+ if token.Name.Local == "stream" {
+ return
+ }
+ }
+ log.Printf("Unhandled XML token '%v' during stream sync!\n", t)
+ }
+ }
+}
+
+func closeStream(s *session, end xml.EndElement) {
+ err := s.ed.encodeToken(end)
+ if err != nil {
+ log.Println("Could not encode stream end!")
+ }
+}
+
+func streamFeaturesHandler(s *session, e []xml.Token) {
+ if hasSaslPlain(e) {
+ s.sasl()
+ return
+ }
+
+ if hasBind(e) {
+ s.sendBind()
+ return
+ }
+
+ log.Println("Stream has no implemented features!")
+}
+
+func iqHandler(s *session, e []xml.Token) {
+ isResult := false
+ idMatches := false
+
+ result := xml.Attr{xml.Name{"", "type"}, "result"}
+ id := xml.Attr{xml.Name{"", "id"}, s.resourceReq}
+
+ switch start := e[0].(type) {
+ case xml.StartElement:
+ for _, v := range start.Attr {
+ if v == result {
+ isResult = true
+ }
+ if v == id {
+ idMatches = true
+ }
+ }
+
+ if isResult && idMatches {
+ s.sendPresence()
+ }
+ }
+}