From 2fade1039c1842f08b30da5c95b5542b57e38ec6 Mon Sep 17 00:00:00 2001 From: xengineering Date: Mon, 3 Jul 2023 22:17:15 +0200 Subject: Move xml.Encoder to session struct The encoderDecoder sub-struct of the session struct should be removed in little steps. This is the first one. --- xmpp/session.go | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'xmpp/session.go') diff --git a/xmpp/session.go b/xmpp/session.go index a43e4f4..b4a8fab 100644 --- a/xmpp/session.go +++ b/xmpp/session.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "crypto/x509" "encoding/xml" + "io" "log" ) @@ -18,6 +19,7 @@ type session struct { out chan<- any transport *tls.Conn ed encoderDecoder + tx *xml.Encoder rx chan xml.Token resourceReq string } @@ -49,6 +51,10 @@ func (s *session) run() { go s.ed.run() defer func() { s.ed.terminator <- true }() + lw := logger{"[TX] "} + w := io.MultiWriter(s.transport, lw) + s.tx = xml.NewEncoder(w) + s.out <- SessionConnect{} runStreamPair(s) -- cgit v1.2.3-70-g09d2 From 3efcd60f8ebdc962d5be85003cc8c59a2b43e610 Mon Sep 17 00:00:00 2001 From: xengineering Date: Mon, 3 Jul 2023 22:28:01 +0200 Subject: Remove encoderDecoder struct completely This was not really necessary because it was all related to the xmpp.session and should thus be implemented there. Using the context package further reduced the complexity for cancelation. --- xmpp/session.go | 9 ++++---- xmpp/xml.go | 69 ++++++++++++++++++++++++--------------------------------- 2 files changed, 34 insertions(+), 44 deletions(-) (limited to 'xmpp/session.go') diff --git a/xmpp/session.go b/xmpp/session.go index b4a8fab..6a5e646 100644 --- a/xmpp/session.go +++ b/xmpp/session.go @@ -1,6 +1,7 @@ package xmpp import ( + "context" "crypto/tls" "crypto/x509" "encoding/xml" @@ -18,7 +19,6 @@ type session struct { in chan any out chan<- any transport *tls.Conn - ed encoderDecoder tx *xml.Encoder rx chan xml.Token resourceReq string @@ -47,9 +47,10 @@ func (s *session) run() { } defer s.transport.Close() - s.ed = newEncoderDecoder(s) - go s.ed.run() - defer func() { s.ed.terminator <- true }() + ctx, cancel := context.WithCancel(context.Background()) + cpy := s.rx + go runRx(ctx, cpy, s.transport) + defer cancel() lw := logger{"[TX] "} w := io.MultiWriter(s.transport, lw) diff --git a/xmpp/xml.go b/xmpp/xml.go index f547210..14c6637 100644 --- a/xmpp/xml.go +++ b/xmpp/xml.go @@ -1,57 +1,26 @@ package xmpp import ( + "context" + "crypto/tls" "encoding/xml" "errors" "io" "log" ) -type encoderDecoder struct { - session *session - rx *xml.Decoder - terminator chan bool -} - -func newEncoderDecoder(s *session) encoderDecoder { - ed := encoderDecoder{} - - ed.session = s +func runRx(ctx context.Context, chn chan xml.Token, conn *tls.Conn) { - lr := logger{"[RX] "} - r := io.TeeReader(s.transport, lr) - ed.rx = xml.NewDecoder(r) + l := logger{"[RX] "} + r := io.TeeReader(conn, l) + d := xml.NewDecoder(r) - return ed -} - -func (s *session) encodeToken(t xml.Token) error { - var err error - defer func() { - if err != nil { - log.Println(err) - } - }() - - err = s.tx.EncodeToken(t) - if err != nil { - return err - } - err = s.tx.Flush() - if err != nil { - return err - } - - return nil -} - -func (ed *encoderDecoder) run() { for { select { - case <-ed.terminator: + case <-ctx.Done(): return default: - t, err := ed.rx.Token() + t, err := d.Token() if t != nil && err == nil { switch t.(type) { case xml.ProcInst: @@ -59,7 +28,7 @@ func (ed *encoderDecoder) run() { case xml.Comment: default: c := xml.CopyToken(t) - ed.session.rx <- c + chn <- c } } if err != nil { @@ -72,3 +41,23 @@ func (ed *encoderDecoder) run() { } } } + +func (s *session) encodeToken(t xml.Token) error { + var err error + defer func() { + if err != nil { + log.Println(err) + } + }() + + err = s.tx.EncodeToken(t) + if err != nil { + return err + } + err = s.tx.Flush() + if err != nil { + return err + } + + return nil +} -- cgit v1.2.3-70-g09d2 From 9f87ef34c589825d65824a8c9210fed5bf92f94d Mon Sep 17 00:00:00 2001 From: xengineering Date: Tue, 4 Jul 2023 12:33:12 +0200 Subject: Remove runStreamPair() This will not be used in the new RX concept. --- xmpp/sasl.go | 1 - xmpp/session.go | 2 -- xmpp/streams.go | 29 ----------------------------- 3 files changed, 32 deletions(-) (limited to 'xmpp/session.go') diff --git a/xmpp/sasl.go b/xmpp/sasl.go index 0c13f36..a20ae56 100644 --- a/xmpp/sasl.go +++ b/xmpp/sasl.go @@ -60,7 +60,6 @@ func hasSaslPlain(e []xml.Token) bool { } func saslSuccessHandler(s *session, e []xml.Token) { - runStreamPair(s) } func saslFailureHandler(s *session, e []xml.Token) { diff --git a/xmpp/session.go b/xmpp/session.go index 6a5e646..f18fd2a 100644 --- a/xmpp/session.go +++ b/xmpp/session.go @@ -57,8 +57,6 @@ func (s *session) run() { s.tx = xml.NewEncoder(w) s.out <- SessionConnect{} - - runStreamPair(s) } func (s *session) startTransport() error { diff --git a/xmpp/streams.go b/xmpp/streams.go index 5ba4c1d..388be4d 100644 --- a/xmpp/streams.go +++ b/xmpp/streams.go @@ -5,35 +5,6 @@ import ( "log" ) -func runStreamPair(s *session) { - end := openStream(s) - defer closeStream(s, end) - - buf := newElementBuffer() - - for { - select { - case data := <-s.in: - switch data.(type) { - case SessionShouldDisconnect: - return - default: - log.Printf("Unknown data '%d'!\n", data) - } - case t := <-s.rx: - err := buf.add(t) - if err != nil { - log.Printf("Could not add XML token to buffer: %v\n", err) - return - } - if buf.isComplete() { - element := buf.reset() - route(s, element, getRoutingTable()) - } - } - } -} - func openStream(s *session) xml.EndElement { start := xml.StartElement{ xml.Name{"jabber:client", "stream:stream"}, -- cgit v1.2.3-70-g09d2 From ea98ee187477051444bbf548757af6336d333862 Mon Sep 17 00:00:00 2001 From: xengineering Date: Tue, 4 Jul 2023 12:58:13 +0200 Subject: Re-implement stream open and close This is more suitable for the new RX concept. --- xmpp/session.go | 21 ++++++++++++++++++--- xmpp/streams.go | 53 +++++++++++++++-------------------------------------- xmpp/xml.go | 19 ++++++++++++------- 3 files changed, 45 insertions(+), 48 deletions(-) (limited to 'xmpp/session.go') diff --git a/xmpp/session.go b/xmpp/session.go index f18fd2a..4be3386 100644 --- a/xmpp/session.go +++ b/xmpp/session.go @@ -39,8 +39,6 @@ func StartSession(out chan<- any, jid string, pwd string) (in chan<- any) { } func (s *session) run() { - defer func() { s.out <- SessionDisconnect{} }() - err := s.startTransport() if err != nil { return @@ -55,8 +53,25 @@ func (s *session) run() { lw := logger{"[TX] "} w := io.MultiWriter(s.transport, lw) s.tx = xml.NewEncoder(w) + defer s.tx.Close() + + openStream(s.tx, s.jid) + defer closeStream(s.tx) - s.out <- SessionConnect{} + s.out <- SessionConnect{} // TODO this should be sent after initial presence + defer func() { s.out <- SessionDisconnect{} }() + + for { + select { + case e := <-s.rx: + log.Print(e) + case signal := <-s.in: + switch signal.(type) { + case SessionShouldDisconnect: + return + } + } + } } func (s *session) startTransport() error { diff --git a/xmpp/streams.go b/xmpp/streams.go index 388be4d..b9c0cb7 100644 --- a/xmpp/streams.go +++ b/xmpp/streams.go @@ -5,62 +5,39 @@ import ( "log" ) -func openStream(s *session) xml.EndElement { +func openStream(e *xml.Encoder, jid string) { start := xml.StartElement{ xml.Name{"jabber:client", "stream:stream"}, []xml.Attr{ - xml.Attr{xml.Name{"", "from"}, s.jid}, - xml.Attr{xml.Name{"", "to"}, domainpart(s.jid)}, + xml.Attr{xml.Name{"", "from"}, jid}, + xml.Attr{xml.Name{"", "to"}, domainpart(jid)}, xml.Attr{xml.Name{"", "version"}, "1.0"}, xml.Attr{xml.Name{"", "xml:lang"}, "en"}, xml.Attr{xml.Name{"", "xmlns:stream"}, "http://etherx.jabber.org/streams"}, }, } - end := start.End() - err := s.encodeToken(start) + err := e.EncodeToken(start) if err != nil { log.Println("Could not encode stream start!") } - - syncStreams(s) - - return end -} - -// syncStreams drops XML tokens from the receiving stream until an -// xml.StartElement with the local name `stream` is received. If this function -// is called after opening a new stream in the sending direction it is ensured -// that both streams directions work on the same stream level and are in sync. -// Tokens received which are not a stream StartElement are not handled but -// logged since this should not happen. -func syncStreams(s *session) { - for { - select { - case data := <-s.in: - switch data.(type) { - case SessionShouldDisconnect: - return - default: - log.Printf("Unhandled data '%d' during stream sync!\n", data) - } - case t := <-s.rx: - switch token := t.(type) { - case xml.StartElement: - if token.Name.Local == "stream" { - return - } - } - log.Printf("Unhandled XML token '%v' during stream sync!\n", t) - } + err = e.Flush() + if err != nil { + log.Println("Could not flush after stream start!") } } -func closeStream(s *session, end xml.EndElement) { - err := s.encodeToken(end) +func closeStream(e *xml.Encoder) { + end := xml.EndElement{xml.Name{"jabber:client", "stream:stream"}} + + err := e.EncodeToken(end) if err != nil { log.Println("Could not encode stream end!") } + err = e.Flush() + if err != nil { + log.Println("Could not flush after stream end!") + } } func streamFeaturesHandler(s *session, e []xml.Token) { diff --git a/xmpp/xml.go b/xmpp/xml.go index 14c6637..36d7eb1 100644 --- a/xmpp/xml.go +++ b/xmpp/xml.go @@ -22,13 +22,18 @@ func runRx(ctx context.Context, chn chan xml.Token, conn *tls.Conn) { default: t, err := d.Token() if t != nil && err == nil { - switch t.(type) { - case xml.ProcInst: - case xml.Directive: - case xml.Comment: - default: - c := xml.CopyToken(t) - chn <- c + switch e := t.(type) { + case xml.StartElement: + if e.Name.Local == "stream" { + // new server-side stream TODO what to do with this info? + } else { +// route(&e, &d, chn, getRoutingTable()) + } + case xml.EndElement: + if e.Name.Local == "stream" { + // TODO end complete session + return + } } } if err != nil { -- cgit v1.2.3-70-g09d2 From 92534f5af88b42665ad44f2495fe5dfb116d3406 Mon Sep 17 00:00:00 2001 From: xengineering Date: Tue, 4 Jul 2023 13:48:34 +0200 Subject: First working version of new RX concept This uses xml.Decoder.DecodeElement() which makes parsing way easier. This first step is just able to parse stream features partially. --- xmpp/router.go | 38 ++++++++------------------------------ xmpp/session.go | 4 ++-- xmpp/streams.go | 33 +++++++++++++++++++-------------- xmpp/xml.go | 4 ++-- 4 files changed, 31 insertions(+), 48 deletions(-) (limited to 'xmpp/session.go') diff --git a/xmpp/router.go b/xmpp/router.go index 1e21c9b..d437b28 100644 --- a/xmpp/router.go +++ b/xmpp/router.go @@ -2,7 +2,6 @@ package xmpp import ( "encoding/xml" - "log" ) // routingTable is a data structure which contains routing information for XML @@ -12,7 +11,7 @@ import ( // entry of the routingTable. type routingTable []struct { name xml.Name - handler func(*session, []xml.Token) + handler func(s *xml.StartElement, d *xml.Decoder, c chan<- any) } // getRoutingTable returns the routing table used in @@ -23,40 +22,19 @@ type routingTable []struct { func getRoutingTable() routingTable { return routingTable{ {xml.Name{`http://etherx.jabber.org/streams`, `features`}, streamFeaturesHandler}, - {xml.Name{`urn:ietf:params:xml:ns:xmpp-sasl`, `success`}, saslSuccessHandler}, - {xml.Name{`urn:ietf:params:xml:ns:xmpp-sasl`, `failure`}, saslFailureHandler}, - {xml.Name{`jabber:client`, `iq`}, iqHandler}, +// {xml.Name{`urn:ietf:params:xml:ns:xmpp-sasl`, `success`}, saslSuccessHandler}, +// {xml.Name{`urn:ietf:params:xml:ns:xmpp-sasl`, `failure`}, saslFailureHandler}, +// {xml.Name{`jabber:client`, `iq`}, iqHandler}, } } // route determines the correct handler function for the given XML element by a // given routingTable. In addition it executes the determined handler function. // If no handler function is found an error message is send via the log module. -func route(s *session, e []xml.Token, t routingTable) { - var name xml.Name - - // TODO a stronger definition of an XML element (as here - // https://www.w3schools.com/xml/xml_elements.asp) would define that the - // first Token of an element is a StartElement token. This would make this - // code easier. - escape := false - for _, token := range e { - switch s := token.(type) { - case xml.StartElement: - name = s.Name - escape = true - } - if escape { - break - } - } - - for _, r := range t { - if name == r.name { - r.handler(s, e) - return +func route(s *xml.StartElement, d *xml.Decoder, c chan<- any, t routingTable) { + for _, v := range t { + if v.name == (*s).Name { + v.handler(s, d, c) } } - - log.Println("Could not route XML element") } diff --git a/xmpp/session.go b/xmpp/session.go index 4be3386..6abc343 100644 --- a/xmpp/session.go +++ b/xmpp/session.go @@ -20,7 +20,7 @@ type session struct { out chan<- any transport *tls.Conn tx *xml.Encoder - rx chan xml.Token + rx chan any resourceReq string } @@ -31,7 +31,7 @@ func StartSession(out chan<- any, jid string, pwd string) (in chan<- any) { s.pwd = pwd s.in = make(chan any) s.out = out - s.rx = make(chan xml.Token, 0) + s.rx = make(chan any, 0) go s.run() diff --git a/xmpp/streams.go b/xmpp/streams.go index b9c0cb7..9c90554 100644 --- a/xmpp/streams.go +++ b/xmpp/streams.go @@ -40,20 +40,6 @@ func closeStream(e *xml.Encoder) { } } -func streamFeaturesHandler(s *session, e []xml.Token) { - if hasSaslPlain(e) { - s.sasl() - return - } - - if hasBind(e) { - s.sendBind() - return - } - - log.Println("Stream has no implemented features!") -} - func iqHandler(s *session, e []xml.Token) { isResult := false idMatches := false @@ -77,3 +63,22 @@ func iqHandler(s *session, e []xml.Token) { } } } + +type streamFeatures struct { + Mechanisms struct { + Items []struct { + Type string `xml:",innerxml"` + } `xml:"mechanism"` + } `xml:"mechanisms"` +} + +func streamFeaturesHandler(s *xml.StartElement, d *xml.Decoder, c chan<- any) { + e := streamFeatures{} + + err := d.DecodeElement(&e, s) + if err != nil { + log.Printf("Could not decode stream features: %v\n", err) + } + + c <- e +} diff --git a/xmpp/xml.go b/xmpp/xml.go index 36d7eb1..470a2ef 100644 --- a/xmpp/xml.go +++ b/xmpp/xml.go @@ -9,7 +9,7 @@ import ( "log" ) -func runRx(ctx context.Context, chn chan xml.Token, conn *tls.Conn) { +func runRx(ctx context.Context, chn chan<- any, conn *tls.Conn) { l := logger{"[RX] "} r := io.TeeReader(conn, l) @@ -27,7 +27,7 @@ func runRx(ctx context.Context, chn chan xml.Token, conn *tls.Conn) { if e.Name.Local == "stream" { // new server-side stream TODO what to do with this info? } else { -// route(&e, &d, chn, getRoutingTable()) + route(&e, d, chn, getRoutingTable()) } case xml.EndElement: if e.Name.Local == "stream" { -- cgit v1.2.3-70-g09d2 From e529bab2e5df93ff8e9fd415b9d65e9bb6d17695 Mon Sep 17 00:00:00 2001 From: xengineering Date: Tue, 4 Jul 2023 16:37:27 +0200 Subject: Introduce handle() as dummy --- xmpp/routing.go | 9 +++++++++ xmpp/session.go | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) (limited to 'xmpp/session.go') diff --git a/xmpp/routing.go b/xmpp/routing.go index 2b1680f..b184b1c 100644 --- a/xmpp/routing.go +++ b/xmpp/routing.go @@ -22,3 +22,12 @@ func parse[T any](data T, s *xml.StartElement, d *xml.Decoder, c chan<- any) { c <- data } } + +func handle(element any) { + switch t := element.(type) { + case streamFeatures: + log.Println("Handling stream features ...") + default: + log.Printf("Unknown parsed element: %v", t) + } +} diff --git a/xmpp/session.go b/xmpp/session.go index 6abc343..a4120e9 100644 --- a/xmpp/session.go +++ b/xmpp/session.go @@ -64,7 +64,7 @@ func (s *session) run() { for { select { case e := <-s.rx: - log.Print(e) + handle(e) case signal := <-s.in: switch signal.(type) { case SessionShouldDisconnect: -- cgit v1.2.3-70-g09d2 From 2c71877e392da6c2691827160142e95142f7bea6 Mon Sep 17 00:00:00 2001 From: xengineering Date: Tue, 4 Jul 2023 21:24:04 +0200 Subject: Re-implement SASL Was broken because of switch to new RX concept. --- xmpp/routing.go | 4 ++-- xmpp/session.go | 2 +- xmpp/streams.go | 21 +++++++++++++++++---- 3 files changed, 20 insertions(+), 7 deletions(-) (limited to 'xmpp/session.go') diff --git a/xmpp/routing.go b/xmpp/routing.go index b184b1c..a9dd8b6 100644 --- a/xmpp/routing.go +++ b/xmpp/routing.go @@ -23,10 +23,10 @@ func parse[T any](data T, s *xml.StartElement, d *xml.Decoder, c chan<- any) { } } -func handle(element any) { +func handle(s *session, element any) { switch t := element.(type) { case streamFeatures: - log.Println("Handling stream features ...") + handleStreamFeatures(s, t) default: log.Printf("Unknown parsed element: %v", t) } diff --git a/xmpp/session.go b/xmpp/session.go index a4120e9..7a07280 100644 --- a/xmpp/session.go +++ b/xmpp/session.go @@ -64,7 +64,7 @@ func (s *session) run() { for { select { case e := <-s.rx: - handle(e) + handle(s, e) case signal := <-s.in: switch signal.(type) { case SessionShouldDisconnect: diff --git a/xmpp/streams.go b/xmpp/streams.go index 3aca8a2..8f6fd03 100644 --- a/xmpp/streams.go +++ b/xmpp/streams.go @@ -5,6 +5,23 @@ import ( "log" ) +type streamFeatures struct { + SaslMechanisms []string `xml:"mechanisms>mechanism"` +} + +func handleStreamFeatures(s *session, f streamFeatures) { + if len(f.SaslMechanisms) > 0 { + for _, v := range f.SaslMechanisms { + if v == "PLAIN" { + s.sasl() + return + } + } + log.Println("No compatible SASL mechanism found!") + return + } +} + func openStream(e *xml.Encoder, jid string) { start := xml.StartElement{ xml.Name{"jabber:client", "stream:stream"}, @@ -63,7 +80,3 @@ func iqHandler(s *session, e []xml.Token) { } } } - -type streamFeatures struct { - SaslMechanisms []string `xml:"mechanisms>mechanism"` -} -- cgit v1.2.3-70-g09d2 From d9fe0a4360770b1e4b6b4fb3686c3275ad1b6e6e Mon Sep 17 00:00:00 2001 From: xengineering Date: Tue, 4 Jul 2023 22:09:36 +0200 Subject: Apply go fmt --- xmpp/jid.go | 2 +- xmpp/sasl.go | 2 +- xmpp/session.go | 2 +- xmpp/streams.go | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) (limited to 'xmpp/session.go') diff --git a/xmpp/jid.go b/xmpp/jid.go index c732027..9580ad5 100644 --- a/xmpp/jid.go +++ b/xmpp/jid.go @@ -43,7 +43,7 @@ func username(jid string) string { type bindRequest struct { Bind struct { - Xmlns string `xml:"xmlns,attr"` + Xmlns string `xml:"xmlns,attr"` Resource struct { Content string `xml:",chardata"` } `xml:"resource"` diff --git a/xmpp/sasl.go b/xmpp/sasl.go index ae3be4a..69b536d 100644 --- a/xmpp/sasl.go +++ b/xmpp/sasl.go @@ -29,7 +29,7 @@ func (s *session) sasl() { } } -type saslSuccess struct {} +type saslSuccess struct{} func handleSaslSuccess(s *session) { openStream(s.tx, s.jid) diff --git a/xmpp/session.go b/xmpp/session.go index 7a07280..4dfd76f 100644 --- a/xmpp/session.go +++ b/xmpp/session.go @@ -58,7 +58,7 @@ func (s *session) run() { openStream(s.tx, s.jid) defer closeStream(s.tx) - s.out <- SessionConnect{} // TODO this should be sent after initial presence + s.out <- SessionConnect{} // TODO this should be sent after initial presence defer func() { s.out <- SessionDisconnect{} }() for { diff --git a/xmpp/streams.go b/xmpp/streams.go index ec16a02..9f6ffe8 100644 --- a/xmpp/streams.go +++ b/xmpp/streams.go @@ -7,7 +7,7 @@ import ( type streamFeatures struct { SaslMechanisms []string `xml:"urn:ietf:params:xml:ns:xmpp-sasl mechanisms>mechanism"` - Bind *bool `xml:"urn:ietf:params:xml:ns:xmpp-bind bind,omitempty"` + Bind *bool `xml:"urn:ietf:params:xml:ns:xmpp-bind bind,omitempty"` } func handleStreamFeatures(s *session, f streamFeatures) { -- cgit v1.2.3-70-g09d2