Skip to content

File GroundCommsDriver.cpp

File List > Components > GroundCommsDriver > GroundCommsDriver.cpp

Go to the documentation of this file

#include "GroundCommsDriver.hpp"

#include <arpa/inet.h>
#include <dirent.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <unistd.h>

#include <cstdio>
#include <cstdlib>
#include <cstring>

namespace Orion {

// Ground station address.
static const char* getGdsHost() {
    const char* p = ::getenv("ORION_GDS_HOST");
    return p ? p : "127.0.0.1";
}
static U16 getGdsPort() {
    const char* p = ::getenv("ORION_GDS_PORT");
    return p ? static_cast<U16>(::atoi(p)) : 50050;
}

// Disk queue directory for frames buffered outside comm window.
static const char* getQueueDir() {
    const char* p = ::getenv("ORION_DOWNLINK_QUEUE_DIR");
    return p ? p : "./media/sd/downlink_queue/";
}

// Recursive mkdir — creates path and any missing parents (POSIX, like `mkdir -p`).
// Existing directories are silently ignored.
static void ensureDirExists(const char* path) {
    if (!path || !*path) return;
    char buf[256];
    ::snprintf(buf, sizeof(buf), "%s", path);
    const size_t len = ::strlen(buf);
    for (size_t i = 1; i < len; i++) {
        if (buf[i] == '/') {
            buf[i] = '\0';
            ::mkdir(buf, 0755);
            buf[i] = '/';
        }
    }
    if (buf[len - 1] != '/') {
        ::mkdir(buf, 0755);
    }
}

// 8-byte frame header prepended to every downlinked image:
//   Bytes 0-3 : magic "ORIO" (0x4F52494F) in network byte order
//   Bytes 4-7 : payload length in bytes, network byte order
static constexpr U32 FRAME_MAGIC = 0x4F52494Fu;

GroundCommsDriver::GroundCommsDriver(const char* compName)
    : GroundCommsDriverComponentBase(compName),
      m_framesDownlinked(0),
      m_bytesDownlinked(0),
      m_transmitFailures(0),
      m_framesQueued(0),
      m_queueFileIndex(0),
      m_currentMode(MissionMode::IDLE) {}

GroundCommsDriver::~GroundCommsDriver() {}

// ---------------------------------------------------------------------------
// Port handler
// ---------------------------------------------------------------------------

void GroundCommsDriver::fileDownlinkIn_handler(FwIndexType portNum, Fw::Buffer& buffer, const Fw::StringBase& reason) {
    if (m_currentMode.e == MissionMode::DOWNLINK) {
        // In comm window — flush any previously queued frames first
        U32 flushed = flushQueue();
        if (flushed > 0) {
            this->log_ACTIVITY_HI_QueueFlushed(flushed);
        }

        // Transmit the current frame
        if (transmit(buffer)) {
            m_framesDownlinked++;
            m_bytesDownlinked += static_cast<U32>(buffer.getSize());
            this->tlmWrite_FramesDownlinked(m_framesDownlinked);
            this->tlmWrite_BytesDownlinked(m_bytesDownlinked);
            this->log_ACTIVITY_HI_FrameDownlinked(reason);
        } else {
            m_transmitFailures++;
            this->tlmWrite_TransmitFailures(m_transmitFailures);
            this->log_WARNING_HI_TransmitFailed();
        }
    } else {
        // Outside comm window — save to disk queue
        saveToQueue(buffer);
        m_framesQueued++;
        this->tlmWrite_FramesQueued(m_framesQueued);
        this->log_ACTIVITY_LO_FrameQueued();
    }

    // Always return the buffer to the pool.
    this->bufferReturnOut_out(0, buffer);
}

// ---------------------------------------------------------------------------
// Schedule handler — periodic queue flush
// ---------------------------------------------------------------------------

void GroundCommsDriver::modeChangeIn_handler(FwIndexType portNum, const Orion::MissionMode& mode) {
    m_currentMode = mode;

    // When entering DOWNLINK, immediately flush the queue
    if (mode.e == MissionMode::DOWNLINK) {
        U32 flushed = flushQueue();
        if (flushed > 0) {
            this->log_ACTIVITY_HI_QueueFlushed(flushed);
        }
    }
}

void GroundCommsDriver::schedIn_handler(FwIndexType portNum, U32 context) {
    if (m_currentMode.e == MissionMode::DOWNLINK) {
        U32 flushed = flushQueue();
        if (flushed > 0) {
            this->log_ACTIVITY_HI_QueueFlushed(flushed);
        }
    }
}

// ---------------------------------------------------------------------------
// Transmit helpers
// ---------------------------------------------------------------------------

bool GroundCommsDriver::transmit(const Fw::Buffer& buffer) { return transmitRaw(buffer.getData(), buffer.getSize()); }

bool GroundCommsDriver::transmitRaw(const U8* data, FwSizeType size) {
    int sock = ::socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0) {
        return false;
    }

    struct sockaddr_in addr;
    ::memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(getGdsPort());

    if (::inet_pton(AF_INET, getGdsHost(), &addr.sin_addr) <= 0) {
        ::close(sock);
        return false;
    }

    if (::connect(sock, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr)) < 0) {
        ::close(sock);
        return false;
    }

    // Send frame header.
    U8 header[8];
    U32 magic = htonl(FRAME_MAGIC);
    U32 payloadLen = htonl(static_cast<U32>(size));
    ::memcpy(header, &magic, 4);
    ::memcpy(header + 4, &payloadLen, 4);

    if (::send(sock, header, sizeof(header), 0) != static_cast<ssize_t>(sizeof(header))) {
        ::close(sock);
        return false;
    }

    // Send payload in a loop to handle partial writes.
    FwSizeType remaining = size;
    bool ok = true;

    while (remaining > 0) {
        ssize_t sent = ::send(sock, data, remaining, 0);
        if (sent <= 0) {
            ok = false;
            break;
        }
        data += sent;
        remaining -= static_cast<FwSizeType>(sent);
    }

    ::close(sock);
    return ok;
}

// ---------------------------------------------------------------------------
// Disk queue
// ---------------------------------------------------------------------------

void GroundCommsDriver::saveToQueue(const Fw::Buffer& buffer) {
    // Ensure queue directory exists
    ensureDirExists(getQueueDir());

    char path[256];
    snprintf(path, sizeof(path), "%sorion_queued_%05u.raw", getQueueDir(), m_queueFileIndex++);

    FILE* f = ::fopen(path, "wb");
    if (!f) {
        this->log_WARNING_HI_QueueWriteFailed();
        return;
    }
    ::fwrite(buffer.getData(), 1, buffer.getSize(), f);
    ::fclose(f);
}

U32 GroundCommsDriver::flushQueue() {
    DIR* dir = ::opendir(getQueueDir());
    if (!dir) {
        return 0;
    }

    U32 count = 0;
    struct dirent* entry;

    while ((entry = ::readdir(dir)) != nullptr) {
        // Only process our queued files
        if (strncmp(entry->d_name, "orion_queued_", 13) != 0) {
            continue;
        }

        char path[256];
        snprintf(path, sizeof(path), "%s%s", getQueueDir(), entry->d_name);

        // Read file into a temporary stack buffer
        FILE* f = ::fopen(path, "rb");
        if (!f) continue;

        ::fseek(f, 0, SEEK_END);
        long fileSize = ::ftell(f);
        ::fseek(f, 0, SEEK_SET);

        if (fileSize <= 0 || fileSize > 1024 * 1024) {
            // Skip invalid files (>1MB safety check)
            ::fclose(f);
            ::unlink(path);
            continue;
        }

        U8* tmpBuf = new U8[static_cast<size_t>(fileSize)];
        size_t bytesRead = ::fread(tmpBuf, 1, static_cast<size_t>(fileSize), f);
        ::fclose(f);

        bool sent = false;
        if (bytesRead == static_cast<size_t>(fileSize)) {
            if (transmitRaw(tmpBuf, static_cast<FwSizeType>(fileSize))) {
                m_framesDownlinked++;
                m_bytesDownlinked += static_cast<U32>(fileSize);
                count++;
                sent = true;
            }
        }

        delete[] tmpBuf;

        if (sent) {
            ::unlink(path);  // Only delete after successful transmit
        } else {
            // Stop flushing — receiver likely down, no point retrying rest
            break;
        }
    }

    ::closedir(dir);

    if (count > 0) {
        this->tlmWrite_FramesDownlinked(m_framesDownlinked);
        this->tlmWrite_BytesDownlinked(m_bytesDownlinked);
    }

    return count;
}

}  // namespace Orion