summaryrefslogtreecommitdiff
path: root/xmpp
diff options
context:
space:
mode:
authorxengineering <me@xengineering.eu>2023-07-04 12:58:13 +0200
committerxengineering <me@xengineering.eu>2023-07-04 12:58:13 +0200
commitea98ee187477051444bbf548757af6336d333862 (patch)
tree6901908aed0bfcded1c73b230d170e51fd931b9f /xmpp
parent8e84ca8a4d340956bd0aaead59d5c79dcaede6a5 (diff)
downloadlimox-ea98ee187477051444bbf548757af6336d333862.tar
limox-ea98ee187477051444bbf548757af6336d333862.tar.zst
limox-ea98ee187477051444bbf548757af6336d333862.zip
Re-implement stream open and close
This is more suitable for the new RX concept.
Diffstat (limited to 'xmpp')
-rw-r--r--xmpp/session.go21
-rw-r--r--xmpp/streams.go53
-rw-r--r--xmpp/xml.go19
3 files changed, 45 insertions, 48 deletions
diff --git a/xmpp/session.go b/xmpp/session.go
index f18fd2a..4be3386 100644
--- a/xmpp/session.go
+++ b/xmpp/session.go
@@ -39,8 +39,6 @@ func StartSession(out chan<- any, jid string, pwd string) (in chan<- any) {
}
func (s *session) run() {
- defer func() { s.out <- SessionDisconnect{} }()
-
err := s.startTransport()
if err != nil {
return
@@ -55,8 +53,25 @@ func (s *session) run() {
lw := logger{"[TX] "}
w := io.MultiWriter(s.transport, lw)
s.tx = xml.NewEncoder(w)
+ defer s.tx.Close()
+
+ openStream(s.tx, s.jid)
+ defer closeStream(s.tx)
- s.out <- SessionConnect{}
+ s.out <- SessionConnect{} // TODO this should be sent after initial presence
+ defer func() { s.out <- SessionDisconnect{} }()
+
+ for {
+ select {
+ case e := <-s.rx:
+ log.Print(e)
+ case signal := <-s.in:
+ switch signal.(type) {
+ case SessionShouldDisconnect:
+ return
+ }
+ }
+ }
}
func (s *session) startTransport() error {
diff --git a/xmpp/streams.go b/xmpp/streams.go
index 388be4d..b9c0cb7 100644
--- a/xmpp/streams.go
+++ b/xmpp/streams.go
@@ -5,62 +5,39 @@ import (
"log"
)
-func openStream(s *session) xml.EndElement {
+func openStream(e *xml.Encoder, jid string) {
start := xml.StartElement{
xml.Name{"jabber:client", "stream:stream"},
[]xml.Attr{
- xml.Attr{xml.Name{"", "from"}, s.jid},
- xml.Attr{xml.Name{"", "to"}, domainpart(s.jid)},
+ xml.Attr{xml.Name{"", "from"}, jid},
+ xml.Attr{xml.Name{"", "to"}, domainpart(jid)},
xml.Attr{xml.Name{"", "version"}, "1.0"},
xml.Attr{xml.Name{"", "xml:lang"}, "en"},
xml.Attr{xml.Name{"", "xmlns:stream"}, "http://etherx.jabber.org/streams"},
},
}
- end := start.End()
- err := s.encodeToken(start)
+ err := e.EncodeToken(start)
if err != nil {
log.Println("Could not encode stream start!")
}
-
- syncStreams(s)
-
- return end
-}
-
-// syncStreams drops XML tokens from the receiving stream until an
-// xml.StartElement with the local name `stream` is received. If this function
-// is called after opening a new stream in the sending direction it is ensured
-// that both streams directions work on the same stream level and are in sync.
-// Tokens received which are not a stream StartElement are not handled but
-// logged since this should not happen.
-func syncStreams(s *session) {
- for {
- select {
- case data := <-s.in:
- switch data.(type) {
- case SessionShouldDisconnect:
- return
- default:
- log.Printf("Unhandled data '%d' during stream sync!\n", data)
- }
- case t := <-s.rx:
- switch token := t.(type) {
- case xml.StartElement:
- if token.Name.Local == "stream" {
- return
- }
- }
- log.Printf("Unhandled XML token '%v' during stream sync!\n", t)
- }
+ err = e.Flush()
+ if err != nil {
+ log.Println("Could not flush after stream start!")
}
}
-func closeStream(s *session, end xml.EndElement) {
- err := s.encodeToken(end)
+func closeStream(e *xml.Encoder) {
+ end := xml.EndElement{xml.Name{"jabber:client", "stream:stream"}}
+
+ err := e.EncodeToken(end)
if err != nil {
log.Println("Could not encode stream end!")
}
+ err = e.Flush()
+ if err != nil {
+ log.Println("Could not flush after stream end!")
+ }
}
func streamFeaturesHandler(s *session, e []xml.Token) {
diff --git a/xmpp/xml.go b/xmpp/xml.go
index 14c6637..36d7eb1 100644
--- a/xmpp/xml.go
+++ b/xmpp/xml.go
@@ -22,13 +22,18 @@ func runRx(ctx context.Context, chn chan xml.Token, conn *tls.Conn) {
default:
t, err := d.Token()
if t != nil && err == nil {
- switch t.(type) {
- case xml.ProcInst:
- case xml.Directive:
- case xml.Comment:
- default:
- c := xml.CopyToken(t)
- chn <- c
+ switch e := t.(type) {
+ case xml.StartElement:
+ if e.Name.Local == "stream" {
+ // new server-side stream TODO what to do with this info?
+ } else {
+// route(&e, &d, chn, getRoutingTable())
+ }
+ case xml.EndElement:
+ if e.Name.Local == "stream" {
+ // TODO end complete session
+ return
+ }
}
}
if err != nil {