Files
kubesolo-os/update/pkg/bootenv/bootenv_test.go
Adolfo Delorenzo efc7f80b65
Some checks failed
CI / Go Tests (push) Has been cancelled
CI / Build Go Binaries (amd64, linux, linux-amd64) (push) Has been cancelled
CI / Build Go Binaries (arm64, linux, linux-arm64) (push) Has been cancelled
CI / Shellcheck (push) Has been cancelled
feat: add security hardening, AppArmor, and ARM64 Raspberry Pi support (Phase 6)
Security hardening: bind kubeconfig server to localhost, mount hardening
(noexec/nosuid/nodev on tmpfs), sysctl network hardening, kernel module
loading lock after boot, SHA256 checksum verification for downloads,
kernel AppArmor + Audit support, complain-mode AppArmor profiles for
containerd and kubelet, and security integration test.

ARM64 Raspberry Pi support: piCore64 base extraction, RPi kernel build
from raspberrypi/linux fork, RPi firmware fetch, SD card image with 4-
partition GPT and tryboot A/B mechanism, BootEnv Go interface abstracting
GRUB vs RPi boot environments, architecture-aware build scripts, QEMU
aarch64 dev VM and boot test.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 13:08:17 -06:00

534 lines
12 KiB
Go

package bootenv
import (
"os"
"path/filepath"
"strconv"
"strings"
"testing"
)
// createTestGrubenv writes a properly formatted 1024-byte grubenv file.
func createTestGrubenv(t *testing.T, dir string, vars map[string]string) string {
t.Helper()
path := filepath.Join(dir, "grubenv")
var sb strings.Builder
sb.WriteString("# GRUB Environment Block\n")
for k, v := range vars {
sb.WriteString(k + "=" + v + "\n")
}
content := sb.String()
padding := 1024 - len(content)
if padding > 0 {
content += strings.Repeat("#", padding)
}
if err := os.WriteFile(path, []byte(content), 0o644); err != nil {
t.Fatal(err)
}
return path
}
// TestGRUBActiveSlot verifies ActiveSlot reads the correct value.
func TestGRUBActiveSlot(t *testing.T) {
dir := t.TempDir()
path := createTestGrubenv(t, dir, map[string]string{
"active_slot": "A",
"boot_counter": "3",
"boot_success": "1",
})
env := NewGRUB(path)
slot, err := env.ActiveSlot()
if err != nil {
t.Fatal(err)
}
if slot != "A" {
t.Errorf("expected A, got %s", slot)
}
}
// TestGRUBPassiveSlot verifies PassiveSlot returns the opposite slot.
func TestGRUBPassiveSlot(t *testing.T) {
dir := t.TempDir()
path := createTestGrubenv(t, dir, map[string]string{
"active_slot": "A",
"boot_counter": "3",
"boot_success": "0",
})
env := NewGRUB(path)
passive, err := env.PassiveSlot()
if err != nil {
t.Fatal(err)
}
if passive != "B" {
t.Errorf("expected B, got %s", passive)
}
}
// TestGRUBBootCounter verifies BootCounter reads the correct value.
func TestGRUBBootCounter(t *testing.T) {
dir := t.TempDir()
path := createTestGrubenv(t, dir, map[string]string{
"active_slot": "A",
"boot_counter": "2",
"boot_success": "0",
})
env := NewGRUB(path)
counter, err := env.BootCounter()
if err != nil {
t.Fatal(err)
}
if counter != 2 {
t.Errorf("expected 2, got %d", counter)
}
}
// TestGRUBBootSuccess verifies BootSuccess reads the correct value.
func TestGRUBBootSuccess(t *testing.T) {
dir := t.TempDir()
path := createTestGrubenv(t, dir, map[string]string{
"active_slot": "A",
"boot_counter": "3",
"boot_success": "1",
})
env := NewGRUB(path)
success, err := env.BootSuccess()
if err != nil {
t.Fatal(err)
}
if !success {
t.Error("expected true, got false")
}
}
// TestGRUBMarkBootSuccess verifies marking boot as successful.
func TestGRUBMarkBootSuccess(t *testing.T) {
dir := t.TempDir()
path := createTestGrubenv(t, dir, map[string]string{
"active_slot": "B",
"boot_counter": "1",
"boot_success": "0",
})
env := NewGRUB(path)
if err := env.MarkBootSuccess(); err != nil {
t.Fatal(err)
}
success, err := env.BootSuccess()
if err != nil {
t.Fatal(err)
}
if !success {
t.Error("expected boot_success=true after MarkBootSuccess")
}
counter, err := env.BootCounter()
if err != nil {
t.Fatal(err)
}
if counter != 3 {
t.Errorf("expected boot_counter=3 after MarkBootSuccess, got %d", counter)
}
}
// TestGRUBActivateSlot verifies slot activation sets correct state.
func TestGRUBActivateSlot(t *testing.T) {
dir := t.TempDir()
path := createTestGrubenv(t, dir, map[string]string{
"active_slot": "A",
"boot_counter": "3",
"boot_success": "1",
})
env := NewGRUB(path)
if err := env.ActivateSlot("B"); err != nil {
t.Fatal(err)
}
slot, _ := env.ActiveSlot()
if slot != "B" {
t.Errorf("expected B, got %s", slot)
}
counter, _ := env.BootCounter()
if counter != 3 {
t.Errorf("expected counter=3, got %d", counter)
}
success, _ := env.BootSuccess()
if success {
t.Error("expected boot_success=false after ActivateSlot")
}
}
// TestGRUBForceRollback verifies rollback switches to passive slot.
func TestGRUBForceRollback(t *testing.T) {
dir := t.TempDir()
path := createTestGrubenv(t, dir, map[string]string{
"active_slot": "A",
"boot_counter": "3",
"boot_success": "1",
})
env := NewGRUB(path)
if err := env.ForceRollback(); err != nil {
t.Fatal(err)
}
slot, _ := env.ActiveSlot()
if slot != "B" {
t.Errorf("expected B after rollback from A, got %s", slot)
}
}
// TestGRUBSlotCycling verifies A->B->A slot switching.
func TestGRUBSlotCycling(t *testing.T) {
dir := t.TempDir()
path := createTestGrubenv(t, dir, map[string]string{
"active_slot": "A",
"boot_counter": "3",
"boot_success": "1",
})
env := NewGRUB(path)
// A -> B
if err := env.ActivateSlot("B"); err != nil {
t.Fatal(err)
}
slot, _ := env.ActiveSlot()
if slot != "B" {
t.Fatalf("expected B, got %s", slot)
}
// B -> A
if err := env.ActivateSlot("A"); err != nil {
t.Fatal(err)
}
slot, _ = env.ActiveSlot()
if slot != "A" {
t.Fatalf("expected A, got %s", slot)
}
}
// TestGRUBActivateInvalidSlot verifies invalid slot is rejected.
func TestGRUBActivateInvalidSlot(t *testing.T) {
dir := t.TempDir()
path := createTestGrubenv(t, dir, map[string]string{
"active_slot": "A",
"boot_counter": "3",
"boot_success": "0",
})
env := NewGRUB(path)
if err := env.ActivateSlot("C"); err == nil {
t.Fatal("expected error for invalid slot")
}
}
// TestRPiActiveSlot verifies ActiveSlot reads from autoboot.txt.
func TestRPiActiveSlot(t *testing.T) {
dir := t.TempDir()
createTestAutobootFiles(t, dir, 2, 3, 3, false)
env := NewRPi(dir)
slot, err := env.ActiveSlot()
if err != nil {
t.Fatal(err)
}
if slot != "A" {
t.Errorf("expected A (partition 2), got %s", slot)
}
}
// TestRPiActiveSlotB verifies slot B with partition 3.
func TestRPiActiveSlotB(t *testing.T) {
dir := t.TempDir()
createTestAutobootFiles(t, dir, 3, 2, 3, true)
env := NewRPi(dir)
slot, err := env.ActiveSlot()
if err != nil {
t.Fatal(err)
}
if slot != "B" {
t.Errorf("expected B (partition 3), got %s", slot)
}
}
// TestRPiPassiveSlot verifies passive slot is opposite of active.
func TestRPiPassiveSlot(t *testing.T) {
dir := t.TempDir()
createTestAutobootFiles(t, dir, 2, 3, 3, false)
env := NewRPi(dir)
passive, err := env.PassiveSlot()
if err != nil {
t.Fatal(err)
}
if passive != "B" {
t.Errorf("expected B, got %s", passive)
}
}
// TestRPiBootCounter verifies counter is read from status file.
func TestRPiBootCounter(t *testing.T) {
dir := t.TempDir()
createTestAutobootFiles(t, dir, 2, 3, 2, false)
env := NewRPi(dir)
counter, err := env.BootCounter()
if err != nil {
t.Fatal(err)
}
if counter != 2 {
t.Errorf("expected 2, got %d", counter)
}
}
// TestRPiBootCounterMissingFile verifies default when status file is absent.
func TestRPiBootCounterMissingFile(t *testing.T) {
dir := t.TempDir()
// Only create autoboot.txt, no boot-status
autoboot := "[all]\ntryboot_a_b=1\nboot_partition=2\n[tryboot]\nboot_partition=3\n"
if err := os.WriteFile(filepath.Join(dir, "autoboot.txt"), []byte(autoboot), 0o644); err != nil {
t.Fatal(err)
}
env := NewRPi(dir)
counter, err := env.BootCounter()
if err != nil {
t.Fatal(err)
}
if counter != 3 {
t.Errorf("expected default counter 3, got %d", counter)
}
}
// TestRPiBootSuccess verifies success is read from status file.
func TestRPiBootSuccess(t *testing.T) {
dir := t.TempDir()
createTestAutobootFiles(t, dir, 2, 3, 3, true)
env := NewRPi(dir)
success, err := env.BootSuccess()
if err != nil {
t.Fatal(err)
}
if !success {
t.Error("expected true, got false")
}
}
// TestRPiMarkBootSuccess verifies marking boot success updates both files.
func TestRPiMarkBootSuccess(t *testing.T) {
dir := t.TempDir()
createTestAutobootFiles(t, dir, 2, 3, 1, false)
env := NewRPi(dir)
if err := env.MarkBootSuccess(); err != nil {
t.Fatal(err)
}
// Active slot should still be A
slot, _ := env.ActiveSlot()
if slot != "A" {
t.Errorf("expected active slot A, got %s", slot)
}
// Boot success should be true
success, _ := env.BootSuccess()
if !success {
t.Error("expected boot_success=true after MarkBootSuccess")
}
// Counter should be reset to 3
counter, _ := env.BootCounter()
if counter != 3 {
t.Errorf("expected counter=3 after MarkBootSuccess, got %d", counter)
}
// [all] boot_partition should be 2 (slot A, making it permanent)
data, _ := os.ReadFile(filepath.Join(dir, "autoboot.txt"))
if !strings.Contains(string(data), "boot_partition=2") {
t.Error("expected [all] boot_partition=2 after MarkBootSuccess")
}
}
// TestRPiActivateSlot verifies slot activation updates tryboot and status.
func TestRPiActivateSlot(t *testing.T) {
dir := t.TempDir()
createTestAutobootFiles(t, dir, 2, 3, 3, true)
env := NewRPi(dir)
if err := env.ActivateSlot("B"); err != nil {
t.Fatal(err)
}
// [tryboot] should now point to partition 3 (slot B)
data, _ := os.ReadFile(filepath.Join(dir, "autoboot.txt"))
content := string(data)
// Find [tryboot] section and check boot_partition
idx := strings.Index(content, "[tryboot]")
if idx < 0 {
t.Fatal("missing [tryboot] section")
}
trybootSection := content[idx:]
if !strings.Contains(trybootSection, "boot_partition=3") {
t.Errorf("expected [tryboot] boot_partition=3, got: %s", trybootSection)
}
// Status should be reset
success, _ := env.BootSuccess()
if success {
t.Error("expected boot_success=false after ActivateSlot")
}
counter, _ := env.BootCounter()
if counter != 3 {
t.Errorf("expected counter=3, got %d", counter)
}
}
// TestRPiActivateInvalidSlot verifies invalid slot is rejected.
func TestRPiActivateInvalidSlot(t *testing.T) {
dir := t.TempDir()
createTestAutobootFiles(t, dir, 2, 3, 3, false)
env := NewRPi(dir)
if err := env.ActivateSlot("C"); err == nil {
t.Fatal("expected error for invalid slot")
}
}
// TestRPiForceRollback verifies rollback swaps the active slot.
func TestRPiForceRollback(t *testing.T) {
dir := t.TempDir()
createTestAutobootFiles(t, dir, 2, 3, 3, true)
env := NewRPi(dir)
if err := env.ForceRollback(); err != nil {
t.Fatal(err)
}
// [all] should now point to partition 3 (slot B)
slot, _ := env.ActiveSlot()
if slot != "B" {
t.Errorf("expected B after rollback from A, got %s", slot)
}
// Success should be false
success, _ := env.BootSuccess()
if success {
t.Error("expected boot_success=false after ForceRollback")
}
}
// TestRPiSlotCycling verifies A->B->A slot switching works.
func TestRPiSlotCycling(t *testing.T) {
dir := t.TempDir()
createTestAutobootFiles(t, dir, 2, 3, 3, true)
env := NewRPi(dir)
// Rollback A -> B
if err := env.ForceRollback(); err != nil {
t.Fatal(err)
}
slot, _ := env.ActiveSlot()
if slot != "B" {
t.Fatalf("expected B, got %s", slot)
}
// Rollback B -> A
if err := env.ForceRollback(); err != nil {
t.Fatal(err)
}
slot, _ = env.ActiveSlot()
if slot != "A" {
t.Fatalf("expected A, got %s", slot)
}
}
// TestInterfaceCompliance verifies both implementations satisfy BootEnv.
func TestInterfaceCompliance(t *testing.T) {
dir := t.TempDir()
grubPath := createTestGrubenv(t, dir, map[string]string{
"active_slot": "A",
"boot_counter": "3",
"boot_success": "0",
})
rpiDir := t.TempDir()
createTestAutobootFiles(t, rpiDir, 2, 3, 3, false)
impls := map[string]BootEnv{
"grub": NewGRUB(grubPath),
"rpi": NewRPi(rpiDir),
}
for name, env := range impls {
t.Run(name, func(t *testing.T) {
slot, err := env.ActiveSlot()
if err != nil {
t.Fatalf("ActiveSlot: %v", err)
}
if slot != "A" {
t.Errorf("ActiveSlot: expected A, got %s", slot)
}
passive, err := env.PassiveSlot()
if err != nil {
t.Fatalf("PassiveSlot: %v", err)
}
if passive != "B" {
t.Errorf("PassiveSlot: expected B, got %s", passive)
}
counter, err := env.BootCounter()
if err != nil {
t.Fatalf("BootCounter: %v", err)
}
if counter != 3 {
t.Errorf("BootCounter: expected 3, got %d", counter)
}
success, err := env.BootSuccess()
if err != nil {
t.Fatalf("BootSuccess: %v", err)
}
if success {
t.Error("BootSuccess: expected false")
}
})
}
}
// createTestAutobootFiles is a helper that writes both autoboot.txt and boot-status.
func createTestAutobootFiles(t *testing.T, dir string, allPart, trybootPart, counter int, success bool) {
t.Helper()
autoboot := "[all]\ntryboot_a_b=1\nboot_partition=" + strconv.Itoa(allPart) + "\n"
autoboot += "[tryboot]\nboot_partition=" + strconv.Itoa(trybootPart) + "\n"
if err := os.WriteFile(filepath.Join(dir, "autoboot.txt"), []byte(autoboot), 0o644); err != nil {
t.Fatal(err)
}
successVal := "0"
if success {
successVal = "1"
}
status := "boot_counter=" + strconv.Itoa(counter) + "\nboot_success=" + successVal + "\n"
if err := os.WriteFile(filepath.Join(dir, "boot-status"), []byte(status), 0o644); err != nil {
t.Fatal(err)
}
}