Browse Source

Refactor plugin interface & loading; add build infra

master
Ryan Joseph 3 months ago
parent
commit
b492ad39e3
5 changed files with 358 additions and 135 deletions
  1. +27
    -0
      Makefile
  2. +130
    -134
      main.go
  3. +93
    -1
      plugins/rpjios/main.go
  4. +71
    -0
      scripts/build-plugin.sh
  5. +37
    -0
      types.go

+ 27
- 0
Makefile View File

@@ -0,0 +1,27 @@
BUILDDIR=build
PLUGINBUILDDIR=$(BUILDDIR)/plugins
PLUGINBUILDER=./scripts/build-plugin.sh
BINARY=$(BUILDDIR)/rhp
GO=/usr/local/go/bin/go
GOOPTS=
SRCS=main.go types.go

all: clean $(BINARY) rpjios-plugin

debug: GOOPTS += -race
debug: all

rpjios-plugin:
BUILD_PLUGIN_OUTDIR="$(PLUGINBUILDDIR)" GOOPTS=$(GOOPTS) $(PLUGINBUILDER) plugins/rpjios

$(BINARY): $(SRCS)
$(GO) fmt
$(GO) build -o $(BINARY) $(GOOPTS) $(SRCS)

run: debug rpjios-plugin
./$(BINARY) -listen 0.0.0.0

.PHONY: clean
clean:
rm -fr $(BUILDDIR)
$(GO) clean

+ 130
- 134
main.go View File

@@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"plugin"
"reflect"
"strconv"
"strings"
"sync"
@@ -26,10 +27,9 @@ const defaultRedisPort = 6379
const defaultListenHost = "localhost"
const defaultListenPort = 56545
const defaultUsersFile = "./users.json"
const defaultPluginsPath = "./plugins"
const pluginEntrySymbolName = "RhpPlugin"
const defaultPluginsPath = "./build/plugins"

var loadedPlugins = map[string]func(interface{}) (interface{}, error){}
var loadedPlugins rhpPluginsT
var g_usersFile string
var usersMap map[string]string = nil
var usersMapLock sync.Mutex = sync.Mutex{}
@@ -128,145 +128,83 @@ func (sh *subscribeHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)

dir, file := filepath.Split(req.URL.Path)

if dir == "/sub/" {
var respStr string = ""

if dir == "/sub/" {
newSubID := uuid.New()
newSub := subscriber{file, req.RemoteAddr, authedUser}

sh.Lock.Lock()

if sh.Pending == nil {
sh.Pending = map[uuid.UUID]subscriber{}
}

sh.Pending[newSubID] = newSub

sh.Lock.Unlock()

w.WriteHeader(http.StatusOK)
fmt.Fprintln(w, newSubID.String())

log.Printf("new sub %v\n", newSub)

return
}

if dir == "/list/" && file != "" {
start := int64(0)
end := int64(10)
respStr = newSubID.String()
} else if dir == "/list/" && file != "" {
query := req.URL.Query()
var err error = nil
log.Printf("LIST -- %v -- %v\n", file, query)

if startSpec, ok := query["start"]; ok {
start, err = strconv.ParseInt(startSpec[0], 10, 64)
listLookup := func(start int64, end int64) ([]string, error) {
return redisDefaultClient.LRange(file, start, end).Result()
}

if err == nil && start >= 0 {
if endSpec, ok := query["end"]; ok {
end, err = strconv.ParseInt(endSpec[0], 10, 64)
}

if timeSpec, ok := query["back"]; ok {
back, err := strconv.ParseInt(timeSpec[0], 10, 64)

if err == nil {
start = 0
end = 100
_ux := time.Now()
limUnix := _ux.Unix() - (back * 60)
log.Printf("NOW %v (%d)\n", _ux.String(), _ux.Unix())
log.Printf("LIM %v (%d)\n", time.Unix(limUnix, 0), limUnix)
retList := make([][2]float64, 0, end-start)
stillGoing := true

for stillGoing {
lRes := redisDefaultClient.LRange(file, start, end)
list, err := lRes.Result()

if err != nil {
stillGoing = false
log.Printf("broke! %v", err)
break
}

for _, toDec := range list {
var newVal [2]float64
err = json.Unmarshal([]byte(toDec), &newVal)

if err != nil {
log.Printf("bad rpjiosListVal! %v\n", err)
stillGoing = false
break
}

if int64(newVal[0]) >= limUnix {
retList = append(retList, newVal)
} else {
stillGoing = false
}
}

if stillGoing {
start = end
end = end + end
}
}

log.Printf("1 RETLIST LEN: %v (%v)\n", len(retList), cap(retList))
// allow plugins the opprotunity to handle the request before using the default handler
// only one can handle any given request, so the first to do so affirmatively ends the request
loadedPlugins.Lock.Lock()
for name, plugin := range loadedPlugins.List {
strResp, err := plugin.HandleListReq(dir, file, query, listLookup)

if cadSpec, ok := query["cad"]; ok {
cad, err := strconv.ParseInt(cadSpec[0], 10, 64)

if err == nil && cad < (back*60) {
newList := make([][2]float64, 0, int64(cap(retList))/cad)
lastMark := int64(-1)

// TODO: detect natural cadence and then jump over N elements
// in retList as optimization
for _, chkVal := range retList {
curMark := int64(chkVal[0])

if lastMark == int64(-1) || lastMark-curMark > cad {
newList = append(newList, chkVal)
lastMark = curMark
}
}

retList = newList
log.Printf("2 RETLIST LEN: %v (%v)\n", len(retList), cap(retList))
}
}

rlStr, err := json.Marshal(retList)
if err == nil && len(strResp) > 0 {
respStr = strResp
log.Printf("using response for '%v%v' produced by plugin '%v'\n", dir, file, name)
break
}
}
loadedPlugins.Lock.Unlock()

// default handler
if respStr == "" {
start := int64(0)
end := int64(10)
query := req.URL.Query()
var err error = nil
log.Printf("LIST -- %v -- %v\n", file, query)

if startSpec, ok := query["start"]; ok {
start, err = strconv.ParseInt(startSpec[0], 10, 64)
}

if err == nil {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, string(rlStr))
return
}
if err == nil && start >= 0 {
if endSpec, ok := query["end"]; ok {
end, err = strconv.ParseInt(endSpec[0], 10, 64)
}
}

if err == nil && end > start {
lRes := redisDefaultClient.LRange(file, start, end)
if err == nil && end > start {
lRes := redisDefaultClient.LRange(file, start, end)

listRes, err := lRes.Result()
listRes, err := lRes.Result()

if err == nil {
listStr, err := json.Marshal(listRes)
if err == nil {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, string(listStr))
return
listStr, err := json.Marshal(listRes)
if err == nil {
respStr = string(listStr)
}
}
}
}
}
}

log.Printf("BAD REQ: %v\n", req)
w.WriteHeader(http.StatusBadRequest)
if respStr != "" {
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, respStr)
} else {
log.Printf("BAD REQ: %v\n", req)
w.WriteHeader(http.StatusBadRequest)
}
}

func readUntilClose(c *websocket.Conn) {
@@ -293,12 +231,14 @@ func forwardAllOnto(wsc wsClient) {
payload := interface{}(fwd.Payload)
var err error

for pluginName, pluginFunc := range loadedPlugins {
payload, err = pluginFunc(payload)
loadedPlugins.Lock.Lock()
for pluginName, plugin := range loadedPlugins.List {
payload, err = plugin.HandleMsg(payload)
if err != nil {
log.Panic(pluginName)
}
}
loadedPlugins.Lock.Unlock()

go func() {
wsc.Lock.Lock()
@@ -354,6 +294,7 @@ func websocketHandler(w http.ResponseWriter, req *http.Request) {

if err != nil {
log.Printf("bad ws query '%s'\n", req.URL.RawQuery)
log.Println(req)
return
}

@@ -393,33 +334,82 @@ func parseJSON(path string, intoObj interface{}) error {
return nil
}

func loadPlugins(fromPath string) {
err := filepath.Walk(filepath.ToSlash(fromPath), func(path string, info os.FileInfo, err error) error {
if filepath.Ext(path) == ".so" {
pluginLoad, err := plugin.Open(path)
func loadPlugin(path string) (*rhpPluginImpl, error) {
ifaceType := reflect.TypeOf((*RhpPlugin)(nil)).Elem()
pluginLoad, err := plugin.Open(path)

if err != nil {
return err
}
if err != nil {
return nil, err
}

newPlugin, err := pluginLoad.Lookup(pluginEntrySymbolName)
// without stubbing the fields, reflect.ValueOf(...).Elem().FieldByName(...) below will return nil
newPlugin := newRhpPluginImpl()

if err != nil {
return err
}
// for each method declared in the interface, look for the same-named concrete defintion
// in the loaded plugin. if that exists, find the field in the concrete implementation
// instance (rhpPluginImpl) and set each function pointer accordingly
for i := 0; i < ifaceType.NumMethod(); i++ {
methodName := ifaceType.Method(i).Name
pluginMethod, err := pluginLoad.Lookup(methodName)

log.Printf("loaded %s\n", path)
loadedPlugins[path] = newPlugin.(func(interface{}) (interface{}, error))
if err != nil {
return nil, err
}

return nil
})
implValue := reflect.ValueOf(&newPlugin).Elem()

if err != nil {
log.Panic(err.Error())
if implValue.IsZero() {
return nil, fmt.Errorf("unable to get value of concrete impl")
}

implElem := implValue.FieldByName(methodName)

if implElem.IsZero() {
return nil, fmt.Errorf("unable to set value on concrete impl")
}

// must .Convert to the target type (implElem.Interface()), else will panic with a strangely-worded error:
// "reflect.Set: value of type T is not assignable to type T"
// (not a typo: the 'from' and 'to' types in the error message will be exactly the same, because
// indeed if we've made it this far the types will match, hence why .Convert() succeeds!)
implElem.Set(reflect.ValueOf(pluginMethod).Convert(reflect.TypeOf(implElem.Interface())))
}

log.Printf("loaded %d plugins\n", len(loadedPlugins))
return &newPlugin, nil
}

func loadPlugins(fromPath string) (rhpPluginMapT, error) {
retVal := rhpPluginMapT{}

// we never return a non-nil error from within the walk function so as to allow .Walk() to continue;
// there is the special return filepath.SkipDir, but it will cause Walk to skip remaining files,
// which isn't what we want either. only in the case that `err` is already non-nil do we return non-nil.
err := filepath.Walk(filepath.ToSlash(fromPath), func(path string, info os.FileInfo, err error) error {
if err != nil {
log.Printf("filepath.Walk errored on entry: '%s' -> %v", path, err)
return err
}

if filepath.Ext(path) != ".so" {
return nil
}

log.Printf("found %s, checking for compatibility...", filepath.Base(path))
newPlugin, err := loadPlugin(path)

if err != nil {
log.Printf("failed to load %s: %v", path, err)
return nil
}

pBaseName := strings.Replace(filepath.Base(path), ".so", "", 1)
log.Printf("loaded compatible plugin %s@%s", pBaseName, newPlugin.Version())
retVal[pBaseName] = newPlugin

return nil
})

return retVal, err
}

func loadUsers() {
@@ -453,8 +443,6 @@ func main() {
log.Panic("redis spec")
}

loadPlugins(*pluginsPath)

redisAuth := os.Getenv("REDIS_LOCAL_PWD")

if len(redisAuth) == 0 {
@@ -477,6 +465,14 @@ func main() {
g_usersFile = *usersFile
loadUsers()

loadedPlugins.Lock.Lock()
loadedPlugins.List, err = loadPlugins(*pluginsPath)
loadedPlugins.Lock.Unlock()

if err != nil {
log.Fatalf("plugin load failed: %v", err)
}

gSubscribeHandler = new(subscribeHandler)
http.Handle("/sub/", gSubscribeHandler)
http.Handle("/list/", gSubscribeHandler)


+ 93
- 1
plugins/rpjios/main.go View File

@@ -4,13 +4,19 @@ import (
"encoding/json"
"fmt"
"log"
"net/url"
"os"
"strconv"
"time"
)

var hostname string

func RhpPlugin(payload interface{}) (interface{}, error) {
func Version() string {
return "0.0.1"
}

func HandleMsg(payload interface{}) (interface{}, error) {
rxTime := float64(time.Now().UnixNano()) / 1e9

if len(hostname) == 0 {
@@ -51,3 +57,89 @@ func RhpPlugin(payload interface{}) (interface{}, error) {

return payload, nil
}

func HandleListReq(dir string, file string, query url.Values, listLookup RhpHandleListReqLookupFunc) (string, error) {
if dir != "/list/" || file == "" {
return "", nil
}

if timeSpec, ok := query["back"]; ok {
back, err := strconv.ParseInt(timeSpec[0], 10, 64)

if err == nil {
start := int64(0)
end := int64(100)
_ux := time.Now()
limUnix := _ux.Unix() - (back * 60)
log.Printf("NOW %v (%d)\n", _ux.String(), _ux.Unix())
log.Printf("LIM %v (%d)\n", time.Unix(limUnix, 0), limUnix)
retList := make([][2]float64, 0, end-start)
stillGoing := true

for stillGoing {
list, err := listLookup(start, end)

if err != nil {
stillGoing = false
log.Printf("broke! %v", err)
break
}

for _, toDec := range list {
var newVal [2]float64
err = json.Unmarshal([]byte(toDec), &newVal)

if err != nil {
log.Printf("bad rpjiosListVal! %v\n", err)
stillGoing = false
break
}

if int64(newVal[0]) >= limUnix {
retList = append(retList, newVal)
} else {
stillGoing = false
}
}

if stillGoing {
start = end
end = end + end
}
}

log.Printf("1 RETLIST LEN: %v (%v)\n", len(retList), cap(retList))

if cadSpec, ok := query["cad"]; ok {
cad, err := strconv.ParseInt(cadSpec[0], 10, 64)

if err == nil && cad < (back*60) {
newList := make([][2]float64, 0, int64(cap(retList))/cad)
lastMark := int64(-1)

// TODO: detect natural cadence and then jump over N elements
// in retList as optimization
for _, chkVal := range retList {
curMark := int64(chkVal[0])

if lastMark == int64(-1) || lastMark-curMark > cad {
newList = append(newList, chkVal)
lastMark = curMark
}
}

retList = newList
log.Printf("2 RETLIST LEN: %v (%v)\n", len(retList), cap(retList))
}
}

rlStr, err := json.Marshal(retList)

if err == nil {
return string(rlStr), nil
}
}
}

return "", nil
}

+ 71
- 0
scripts/build-plugin.sh View File

@@ -0,0 +1,71 @@
#!/bin/bash
# wrapper script for `go build -buildmode=plugin` that symbolically
# links in the required source file(s) and then cleans up when finished

# trick to silence pushd/popd from here:
# https://stackoverflow.com/questions/25288194/dont-display-pushd-popd-stack-across-several-bash-scripts-quiet-pushd-popd
pushd () {
command pushd "$@" > /dev/null
}
popd () {
command popd "$@" > /dev/null
}

PLUGIN_PATH=$1
REQ_SRC_FILES=(types.go)

# get absolute path in a portable way
WD="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
PROJECT_ROOT="${WD}/.."
pushd $PROJECT_ROOT
PROJECT_ROOT=$PWD
popd

BUILD_DIR="${PROJECT_ROOT}/build/plugins"

if [ ! -z $BUILD_PLUGIN_OUTDIR ]; then
BUILD_DIR="${PROJECT_ROOT}/$BUILD_PLUGIN_OUTDIR"
fi

mkdir -p $BUILD_DIR
pushd $BUILD_DIR
BUILD_DIR=$PWD
popd

echo "$0 building into '${BUILD_DIR}'"

if [ -z $PLUGIN_PATH ]; then
echo -e "Must give plugin path as sole argument\n"
exit -1
fi

PPATH="${PROJECT_ROOT}/${PLUGIN_PATH}"

if [ ! -d $PPATH ]; then
echo -e "Plugin '${PLUGIN_PATH}' not found at '${PPATH}'\n"
exit -1
fi

pushd $PPATH
echo "Building '$PPATH'..."

for srcFile in "${REQ_SRC_FILES[@]}"
do
echo "ln -s \"${PROJECT_ROOT}/${srcFile}\" \"./${srcFile}\""
ln -s "${PROJECT_ROOT}/${srcFile}" "./${srcFile}"
done

/usr/local/go/bin/go fmt
/usr/local/go/bin/go build -buildmode=plugin ${GOOPTS}
PNAME="$(basename $PLUGIN_PATH).so"
PDEST="$BUILD_DIR/${PNAME}"
mv $PNAME $PDEST
echo "Produced '${PDEST}'"

for srcFile in "${REQ_SRC_FILES[@]}"
do
echo "rm ${srcFile}"
rm ${srcFile}
done

popd

+ 37
- 0
types.go View File

@@ -0,0 +1,37 @@
package main

import (
"net/url"
"sync"
)

type RhpHandleListReqLookupFunc func(start int64, end int64) ([]string, error)

type RhpPlugin interface {
Version() string
HandleMsg(payload interface{}) (interface{}, error)
HandleListReq(dir string, file string, query url.Values, listLookup RhpHandleListReqLookupFunc) (string, error)
}

type rhpPluginImpl struct {
Version func() string
HandleMsg func(interface{}) (interface{}, error)
HandleListReq func(string, string, url.Values, RhpHandleListReqLookupFunc) (string, error)
}

// newRhpPluginImpl is defined here so as to make it easy to keep in-sync
// with the definition of rhpPluginImpl above
func newRhpPluginImpl() rhpPluginImpl {
return rhpPluginImpl{
func() string { return "" },
func(_ interface{}) (interface{}, error) { return nil, nil },
func(_ string, _ string, _ url.Values, _ RhpHandleListReqLookupFunc) (string, error) { return "", nil },
}
}

type rhpPluginMapT map[string]*rhpPluginImpl

type rhpPluginsT struct {
Lock sync.Mutex
List rhpPluginMapT
}

Loading…
Cancel
Save