Browse Source

Add rudimentary message-per-second watch thread; update yarl to v0.3.1

master
Ryan Joseph 11 months ago
parent
commit
c9360eecd4
2 changed files with 73 additions and 12 deletions
  1. +72
    -11
      spheremon/main.c
  2. +1
    -1
      yarl

+ 72
- 11
spheremon/main.c View File

@@ -14,6 +14,7 @@
#include <arpa/inet.h>
#include <assert.h>
#include <pthread.h>
#include <signal.h>

#include <applibs/networking.h>
#include <applibs/gpio.h>
@@ -154,7 +155,7 @@ int trackedKeyCount = 0;
int __attribute__((atomic)) msgCount = 0;
int __attribute__((atomic)) lastLost = 0;
int __attribute__((atomic)) threadRunningCount = 0;
bool __attribute__((atomic)) running = true;
static volatile sig_atomic_t running = true;

RedisConnection_t newConnection(psubThreadArgs_t* tArgs)
{
@@ -178,6 +179,51 @@ RedisConnection_t newConnection(psubThreadArgs_t* tArgs)
return threadConn;
}

void* watchThreadFunc(void* arg)
{
assert(arg);
RedisConnection_t threadConn = newConnection((psubThreadArgs_t*)arg);
static const double SLEEP_TIME_SECONDS = 5.0;
printf("watch thread up and running.\n");
threadRunningCount++;

struct timespec sleepTime = { (time_t)SLEEP_TIME_SECONDS, 0 };
int last = 0;
double perSec = 0.0, curPerSec = 0.0;
time_t timeIncr = 0;
char buf[128];
while (running)
{
if (!last) {
perSec = msgCount / SLEEP_TIME_SECONDS;
}
else {
curPerSec = (msgCount - last) / SLEEP_TIME_SECONDS;
perSec = (curPerSec + perSec) / 2;
}

if (timeIncr) {
bzero(buf, 128);
snprintf(buf, 128, "[%06d] %-6d %-6d %-3d %5.2f %5.2f %s",
timeIncr, msgCount, last, (msgCount - last),
perSec, curPerSec, (curPerSec > perSec * 1.5 ? "!>!" : (curPerSec < perSec * 0.5 ? "!<!" : "")));

Redis_PUBLISH(threadConn, "spheremon:watchthread", buf);
#if DEBUG
fprintf(stderr, "%s\n", buf);
fflush(stderr);
#endif
}

last = msgCount;
timeIncr += (time_t)SLEEP_TIME_SECONDS;
nanosleep(&sleepTime, NULL);
}

printf("watch thread exiting.\n");
--threadRunningCount;
}

void* psubThreadFunc(void* arg)
{
assert(arg);
@@ -195,16 +241,10 @@ void* psubThreadFunc(void* arg)

if (!lastLost)
{
struct timespec quickTime = { 0, 5e4 };
bool msgCadenceHit = msgCount && !(msgCount % MSG_CADENCE_AMOUNT);

if (msgCadenceHit) GPIO_SetValue(tArgs->fds[ACTIVITY_LED], LED_ON);
GPIO_SetValue(tArgs->fds[LOST_PULSE_LED], LED_ON);

struct timespec quickTime = { 0, 1 };
GPIO_SetValue(tArgs->fds[GREEN_FDIDX], LED_ON);
nanosleep(&quickTime, NULL);

if (msgCadenceHit) GPIO_SetValue(tArgs->fds[ACTIVITY_LED], LED_OFF);
GPIO_SetValue(tArgs->fds[LOST_PULSE_LED], LED_OFF);
GPIO_SetValue(tArgs->fds[GREEN_FDIDX], LED_OFF);
}

++msgCount;
@@ -281,8 +321,19 @@ void* cmdThreadFunc(void* arg)
--threadRunningCount;
}

void sighand(int sig)
{
if (sig == SIGTERM)
{
printf("Got SIGTERM! Shutting down...\n");
running = false;
}
}

int main(int argc, char** argv)
{
signal(SIGTERM, sighand);

if (argc < 3)
{
fprintf(stderr, "Usage: %s host port password\n\n", argv[0]);
@@ -362,6 +413,7 @@ int main(int argc, char** argv)

pthread_t psubThread;
pthread_t commandThread;
pthread_t watchThread;

printf("Starting activity thread...\n");
int pc = pthread_create(&psubThread, NULL, psubThreadFunc, &psubThreadArgs);
@@ -381,8 +433,17 @@ int main(int argc, char** argv)
exit(pc);
}

printf("Starting watch thread...\n");
pc = pthread_create(&watchThread, NULL, watchThreadFunc, &psubThreadArgs);

if (pc)
{
fprintf(stderr, "pthread_create (watch): %d\n", pc);
exit(pc);
}

GPIO_SetValue(fds[BLUE_FDIDX], LED_OFF);
while (threadRunningCount < 2);
while (threadRunningCount < 3);

nanosleep(&blinkTime, NULL);
TOGGLE_ALL(fds, LED_OFF);


+ 1
- 1
yarl

@@ -1 +1 @@
Subproject commit b0edb4a7ef4372439aac44ca0ff9f04802370b7f
Subproject commit bf23cc2680e79648e06a67c4371e327996fe6821

Loading…
Cancel
Save