Program Listing for File data_provider.hpp

Return to documentation for file (foxglove/include/foxglove/data_provider.hpp)

#pragma once


#include <foxglove/schema.hpp>

#include <nlohmann/json.hpp>

#include <base64.hpp>
#include <cstdint>
#include <optional>
#include <stdexcept>
#include <string>
#include <string_view>
#include <vector>

namespace foxglove::data_provider {

// ============================================================================
// Manifest types
// ============================================================================

struct Topic {
  std::string name;
  std::string message_encoding;
  std::optional<uint16_t> schema_id;
};

struct Schema {
  uint16_t id;
  std::string name;
  std::string encoding;
  std::string data;
};

struct StreamedSource {
  std::string url;
  std::optional<std::string> id;
  std::vector<Topic> topics;
  std::vector<Schema> schemas;
  std::string start_time;
  std::string end_time;
};

struct Manifest {
  std::optional<std::string> name;
  std::vector<StreamedSource> sources;
};

// ============================================================================
// JSON serialization
// ============================================================================

inline std::string to_json_string(const Manifest& m) {
  // Use nlohmann/json internally without exposing it in the public API.
  using json = nlohmann::json;

  auto topic_to_json = [](const Topic& t) -> json {
    json j{
      {"name", t.name},
      {"messageEncoding", t.message_encoding},
    };
    if (t.schema_id.has_value()) {
      j["schemaId"] = *t.schema_id;
    }
    return j;
  };

  auto schema_to_json = [](const Schema& s) -> json {
    return json{
      {"id", s.id},
      {"name", s.name},
      {"encoding", s.encoding},
      {"data", s.data},
    };
  };

  auto source_to_json = [&](const StreamedSource& s) -> json {
    json topics = json::array();
    for (const auto& t : s.topics) {
      topics.push_back(topic_to_json(t));
    }
    json schemas = json::array();
    for (const auto& sc : s.schemas) {
      schemas.push_back(schema_to_json(sc));
    }
    json j{
      {"url", s.url},
      {"topics", topics},
      {"schemas", schemas},
      {"startTime", s.start_time},
      {"endTime", s.end_time},
    };
    if (s.id.has_value()) {
      j["id"] = *s.id;
    }
    return j;
  };

  json sources = json::array();
  for (const auto& s : m.sources) {
    sources.push_back(source_to_json(s));
  }
  json j{
    {"sources", sources},
  };
  if (m.name.has_value()) {
    j["name"] = *m.name;
  }
  return j.dump();
}

// ============================================================================
// ChannelSet
// ============================================================================

class ChannelSet {
public:
  template<typename T>
  void insert(const std::string& topic) {
    auto schema = T::schema();
    uint16_t schema_id = add_schema(schema);
    topics.push_back(Topic{topic, "protobuf", schema_id});
  }

  std::vector<Topic> topics;
  std::vector<Schema> schemas;

private:
  // Next schema ID to assign. 0 means we have exhausted all IDs.
  // Schema ID 0 is reserved by MCAP, so valid IDs are 1..65535.
  uint16_t next_schema_id_ = 1;

  static std::string encode_schema_data(const foxglove::Schema& schema) {
    std::string_view sv(reinterpret_cast<const char*>(schema.data), schema.data_len);
    return base64::to_base64(sv);
  }

  uint16_t add_schema(const foxglove::Schema& schema) {
    auto encoded_data = encode_schema_data(schema);

    // Deduplicate: return existing ID if an identical schema was already added.
    for (const auto& existing : schemas) {
      if (existing.name == schema.name && existing.encoding == schema.encoding &&
          existing.data == encoded_data) {
        return existing.id;
      }
    }

    if (next_schema_id_ == 0) {
      throw std::overflow_error("ChannelSet: cannot add more than 65535 schemas");
    }
    uint16_t id = next_schema_id_++;  // wraps to 0 after 65535

    schemas.push_back(Schema{
      id,
      schema.name,
      schema.encoding,
      std::move(encoded_data),
    });
    return id;
  }
};

}  // namespace foxglove::data_provider