Browse Source

Initial commit

master
Ryan Joseph 4 months ago
commit
70a0bc8931
3 changed files with 240 additions and 0 deletions
  1. +9
    -0
      go.mod
  2. +33
    -0
      go.sum
  3. +198
    -0
      main.go

+ 9
- 0
go.mod View File

@@ -0,0 +1,9 @@
module github.com/rpj/rhp

go 1.14

require (
github.com/go-redis/redis/v7 v7.2.0
github.com/google/uuid v1.1.1
github.com/gorilla/websocket v1.4.2
)

+ 33
- 0
go.sum View File

@@ -0,0 +1,33 @@
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/go-redis/redis v6.15.7+incompatible h1:3skhDh95XQMpnqeqNftPkQD9jL9e5e36z/1SUm6dy1U=
github.com/go-redis/redis/v7 v7.2.0 h1:CrCexy/jYWZjW0AyVoHlcJUeZN19VWlbepTh1Vq6dJs=
github.com/go-redis/redis/v7 v7.2.0/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

+ 198
- 0
main.go View File

@@ -0,0 +1,198 @@
package main

import (
"flag"
"fmt"
"log"
"net"
"net/http"
"os"
"path/filepath"
"strings"
"sync"

"github.com/go-redis/redis/v7"
"github.com/google/uuid"
"github.com/gorilla/websocket"
)

const defaultListenPort = 56545

var redisDefaultClient *redis.Client = nil
var redisOptions = redis.Options{
Addr: "localhost:6379",
DB: 0,
}

type subscriber struct {
Channel string
Addr string
}

type subscribeHandler struct {
Lock sync.Mutex
Pending map[uuid.UUID]subscriber
}

var gSubscribeHandler *subscribeHandler = nil

func allowLocalhostOnly(r *http.Request) bool {
addrSplit := strings.Split(r.RemoteAddr, ":")
return addrSplit[0] == "127.0.0.1" || addrSplit[0] == "localhost"
}

var wsUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: allowLocalhostOnly,
}

type wsClient struct {
Conn *websocket.Conn
Lock *sync.Mutex
Sub *redis.PubSub
}

var wsClients = map[net.Addr]wsClient{}
var wsClientsLock = sync.Mutex{}

func checkAuth(req *http.Request) (string, error) {
if authHeader, ok := req.Header["Authorization"]; ok {
return authHeader[0], nil
}

return "", fmt.Errorf("bad auth")
}

func (sh *subscribeHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
dir, file := filepath.Split(req.URL.Path)

if dir == "/sub/" {
newSubID := uuid.New()

sh.Lock.Lock()

if sh.Pending == nil {
sh.Pending = map[uuid.UUID]subscriber{}
}

sh.Pending[newSubID] = subscriber{file, req.RemoteAddr}

sh.Lock.Unlock()

fmt.Printf("Sub req! %s\n", file)
fmt.Println(sh.Pending)

fmt.Fprintln(w, newSubID.String())
}
}

func readUntilClose(c *websocket.Conn) {
for {
if _, _, err := c.NextReader(); err != nil {
fmt.Printf("ws client %v disconnected\n", c.RemoteAddr())

c.Close()

wsClientsLock.Lock()
defer wsClientsLock.Unlock()

delete(wsClients, c.RemoteAddr())

break
}
}
}

func forwardAllOnto(subChan <-chan *redis.Message, c *websocket.Conn) {
for fwd := range subChan {
c.WriteJSON(fwd.Payload)
}
}

func registerNewClient(wsConn *websocket.Conn, channel string) {
clientAddr := wsConn.RemoteAddr()

wsClientsLock.Lock()
defer wsClientsLock.Unlock()

if curClient, ok := wsClients[clientAddr]; ok {
fmt.Printf("already have conn for %v! closing it\n", clientAddr)
curClient.Lock.Lock()
curClient.Conn.Close()
curClient.Lock.Unlock()
}

wsClients[clientAddr] = wsClient{wsConn, new(sync.Mutex), redisDefaultClient.Subscribe(channel)}
fmt.Printf("ws client %v connected\n", clientAddr)

go readUntilClose(wsConn)
go forwardAllOnto(wsClients[clientAddr].Sub.Channel(), wsConn)
}

func websocketHandler(w http.ResponseWriter, req *http.Request) {
wsConn, err := wsUpgrader.Upgrade(w, req, nil)

if err != nil {
fmt.Printf("websocketHandler upgrade failed: %v\n", err)
return
}

okReqUUID, err := uuid.Parse(req.URL.RawQuery)

if err != nil {
fmt.Printf("bad ws query '%s'\n", req.URL.RawQuery)
return
}

gSubscribeHandler.Lock.Lock()
defer gSubscribeHandler.Lock.Unlock()

if pendingConn, ok := gSubscribeHandler.Pending[okReqUUID]; ok {
if strings.Split(wsConn.RemoteAddr().String(), ":")[0] == strings.Split(pendingConn.Addr, ":")[0] {
go registerNewClient(wsConn, pendingConn.Channel)
delete(gSubscribeHandler.Pending, okReqUUID)
} else {
fmt.Printf("bad addr match %s vs %s\n", wsConn.RemoteAddr(), pendingConn.Addr)
}
} else {
fmt.Printf("bad pending connection '%v'\n", okReqUUID)
}
}

func main() {
listenPort := flag.Uint("p", defaultListenPort, "listen port")

flag.Parse()

if listenPort == nil || *listenPort < 1024 || *listenPort > 65535 {
log.Panic("listen port")
}

redisAuth := os.Getenv("REDIS_LOCAL_PWD")

if len(redisAuth) == 0 {
log.Panic("Need auth")
}

redisOptions.Password = redisAuth
rc := redis.NewClient(&redisOptions)

_, err := rc.Ping().Result()

if err != nil {
log.Panic("Ping")
}

fmt.Println(rc)
redisDefaultClient = rc

gSubscribeHandler = new(subscribeHandler)
http.Handle("/sub/", gSubscribeHandler)
http.HandleFunc("/ws/sub", websocketHandler)

listenSpec := fmt.Sprintf("localhost:%d", *listenPort)
fmt.Printf("listening on %s\n", listenSpec)

http.ListenAndServe(listenSpec, nil)
}

Loading…
Cancel
Save