From ea98ee187477051444bbf548757af6336d333862 Mon Sep 17 00:00:00 2001 From: xengineering Date: Tue, 4 Jul 2023 12:58:13 +0200 Subject: Re-implement stream open and close This is more suitable for the new RX concept. --- xmpp/session.go | 21 ++++++++++++++++++--- xmpp/streams.go | 53 +++++++++++++++-------------------------------------- xmpp/xml.go | 19 ++++++++++++------- 3 files changed, 45 insertions(+), 48 deletions(-) diff --git a/xmpp/session.go b/xmpp/session.go index f18fd2a..4be3386 100644 --- a/xmpp/session.go +++ b/xmpp/session.go @@ -39,8 +39,6 @@ func StartSession(out chan<- any, jid string, pwd string) (in chan<- any) { } func (s *session) run() { - defer func() { s.out <- SessionDisconnect{} }() - err := s.startTransport() if err != nil { return @@ -55,8 +53,25 @@ func (s *session) run() { lw := logger{"[TX] "} w := io.MultiWriter(s.transport, lw) s.tx = xml.NewEncoder(w) + defer s.tx.Close() + + openStream(s.tx, s.jid) + defer closeStream(s.tx) - s.out <- SessionConnect{} + s.out <- SessionConnect{} // TODO this should be sent after initial presence + defer func() { s.out <- SessionDisconnect{} }() + + for { + select { + case e := <-s.rx: + log.Print(e) + case signal := <-s.in: + switch signal.(type) { + case SessionShouldDisconnect: + return + } + } + } } func (s *session) startTransport() error { diff --git a/xmpp/streams.go b/xmpp/streams.go index 388be4d..b9c0cb7 100644 --- a/xmpp/streams.go +++ b/xmpp/streams.go @@ -5,62 +5,39 @@ import ( "log" ) -func openStream(s *session) xml.EndElement { +func openStream(e *xml.Encoder, jid string) { start := xml.StartElement{ xml.Name{"jabber:client", "stream:stream"}, []xml.Attr{ - xml.Attr{xml.Name{"", "from"}, s.jid}, - xml.Attr{xml.Name{"", "to"}, domainpart(s.jid)}, + xml.Attr{xml.Name{"", "from"}, jid}, + xml.Attr{xml.Name{"", "to"}, domainpart(jid)}, xml.Attr{xml.Name{"", "version"}, "1.0"}, xml.Attr{xml.Name{"", "xml:lang"}, "en"}, xml.Attr{xml.Name{"", "xmlns:stream"}, "http://etherx.jabber.org/streams"}, }, } - end := start.End() - err := s.encodeToken(start) + err := e.EncodeToken(start) if err != nil { log.Println("Could not encode stream start!") } - - syncStreams(s) - - return end -} - -// syncStreams drops XML tokens from the receiving stream until an -// xml.StartElement with the local name `stream` is received. If this function -// is called after opening a new stream in the sending direction it is ensured -// that both streams directions work on the same stream level and are in sync. -// Tokens received which are not a stream StartElement are not handled but -// logged since this should not happen. -func syncStreams(s *session) { - for { - select { - case data := <-s.in: - switch data.(type) { - case SessionShouldDisconnect: - return - default: - log.Printf("Unhandled data '%d' during stream sync!\n", data) - } - case t := <-s.rx: - switch token := t.(type) { - case xml.StartElement: - if token.Name.Local == "stream" { - return - } - } - log.Printf("Unhandled XML token '%v' during stream sync!\n", t) - } + err = e.Flush() + if err != nil { + log.Println("Could not flush after stream start!") } } -func closeStream(s *session, end xml.EndElement) { - err := s.encodeToken(end) +func closeStream(e *xml.Encoder) { + end := xml.EndElement{xml.Name{"jabber:client", "stream:stream"}} + + err := e.EncodeToken(end) if err != nil { log.Println("Could not encode stream end!") } + err = e.Flush() + if err != nil { + log.Println("Could not flush after stream end!") + } } func streamFeaturesHandler(s *session, e []xml.Token) { diff --git a/xmpp/xml.go b/xmpp/xml.go index 14c6637..36d7eb1 100644 --- a/xmpp/xml.go +++ b/xmpp/xml.go @@ -22,13 +22,18 @@ func runRx(ctx context.Context, chn chan xml.Token, conn *tls.Conn) { default: t, err := d.Token() if t != nil && err == nil { - switch t.(type) { - case xml.ProcInst: - case xml.Directive: - case xml.Comment: - default: - c := xml.CopyToken(t) - chn <- c + switch e := t.(type) { + case xml.StartElement: + if e.Name.Local == "stream" { + // new server-side stream TODO what to do with this info? + } else { +// route(&e, &d, chn, getRoutingTable()) + } + case xml.EndElement: + if e.Name.Local == "stream" { + // TODO end complete session + return + } } } if err != nil { -- cgit v1.2.3-70-g09d2