feat(multi-region-canvas): per-region kubeconfig PUT-back + per-region helmwatch (#1444)
Operator mandate (2026-05-12): the mothership canvas must surface install-* HRs from EVERY region of a multi-region provision, not just the primary CP's. Today catalyst-api stores ONE kubeconfig per deployment (the primary CP's) and spawns ONE helmwatch.Bridge against it. Result: secondary regions are invisible on the canvas even though their k3s clusters are fully reconciling. End-to-end change across infra + handler: 1) cloud-init (cloudinit-control-plane.tftpl): the kubeconfig PUT URL appends `?region=<kubeconfig_postback_region>` when the var is set. main.tf templatefile call passes empty for primary CP, `each.key` (e.g. "nbg1-1", "hel1-2") for each secondary region. 2) PutKubeconfig handler: reads ?region= query param. Empty → primary path (unchanged: stores at <dir>/<id>.yaml, sets Result.KubeconfigPath, fires Phase-1 watch + SMTP seed). Non-empty → secondary path: stores at <dir>/<id>-<region>.yaml, populates Deployment.secondaryKubeconfigPaths[region]. Single-use guard is per-region (the same bearer secures every CP's PUT — secondaries reuse it for their own slot). NO Phase-1 watch re-launch from a secondary PUT. 3) phase1_watch.spawnSecondaryRegionWatchers: runs alongside the primary's watcher. Scans <kubeconfigsDir>/<id>-*.yaml every 15s, spawns one helmwatch.NewWatcher per kubeconfig discovered, stores the Watcher on Deployment.secondaryWatchers[region]. Per-region watchers emit ordinary helmwatch events with region-prefixed Component names so the wizard's per-component view doesn't collide primary vs secondary bp-cilium events. They do NOT contribute to markPhase1Done — outcome remains the primary's classification. 4) flow_snapshot_local.flowSnapshotFromJobs: composes per-region group bubbles + install-* nodes from each secondary watcher's SnapshotComponents. Node id: <depID>:<region>:install-<chart>. FlowNode.region set so the canvas can colour-group. Intra-region finish-to-start deps emitted from cs.DependsOn — same-region only, never cross-region (per NAMING-CONVENTION §1.3 independent fault domains, no stretched cluster). 5) wipe.go: removes both <id>.yaml AND every <id>-*.yaml secondary kubeconfig file on Sovereign wipe. Storage model is uniform across SME and corporate Sovereigns. No hardcoding of provider, region count, or building block. Caught after operator pointed out that 3-region prov #50 was showing only 52 install-* nodes (all from fsn1) on the canvas — the architectural gap. Co-authored-by: e3mrah <1234567+e3mrah@users.noreply.github.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
effd75e4a7
commit
4923938c2b
@ -1260,7 +1260,7 @@ runcmd:
|
||||
-H "Authorization: Bearer ${kubeconfig_bearer_token}" \
|
||||
-H "Content-Type: application/x-yaml" \
|
||||
--data-binary @/etc/rancher/k3s/k3s.yaml.public \
|
||||
${catalyst_api_url}/api/v1/deployments/${deployment_id}/kubeconfig
|
||||
"${catalyst_api_url}/api/v1/deployments/${deployment_id}/kubeconfig${kubeconfig_postback_region != "" ? "?region=" : ""}${kubeconfig_postback_region}"
|
||||
- rm -f /etc/rancher/k3s/k3s.yaml.public
|
||||
%{ endif ~}
|
||||
|
||||
|
||||
@ -483,6 +483,16 @@ locals {
|
||||
hcloud_network_name = hcloud_network.main.name
|
||||
hcloud_firewall_name = hcloud_firewall.main.name
|
||||
hcloud_ssh_key_name = hcloud_ssh_key.main.name
|
||||
|
||||
# Multi-region kubeconfig PUT-back (operator mandate, 2026-05-12).
|
||||
# Empty string for the primary CP → catalyst-api stores the file
|
||||
# at <kubeconfigsDir>/<id>.yaml (back-compat with single-region).
|
||||
# Secondary regions pass their region key here (see the for_each
|
||||
# call below) so catalyst-api stores them at
|
||||
# <kubeconfigsDir>/<id>-<region>.yaml. catalyst-api's phase1Watch
|
||||
# then spawns one helmwatch.Bridge per kubeconfig so the canvas
|
||||
# surfaces install-* HRs from EVERY region, not just primary.
|
||||
kubeconfig_postback_region = ""
|
||||
}), "/(?m)^[ ]*#( |$).*\n/", "")
|
||||
}
|
||||
|
||||
@ -901,16 +911,19 @@ locals {
|
||||
# Issue #1778 (F7 multi-region completion) — same hcloud_*_name
|
||||
# threading as the primary CP templatefile call (lines 483-485)
|
||||
# so the secondary regions' cluster-autoscaler also has the
|
||||
# private-network attachment names. Without this every secondary
|
||||
# region's tofu plan blows up with "vars map does not contain
|
||||
# key hcloud_network_name" referenced in cloudinit-control-plane
|
||||
# .tftpl:478. Primary cluster shares one Network/Firewall/SSHKey
|
||||
# across all regions (multi-zone subnets, slice G1) so the same
|
||||
# resource refs go to every region's CP.
|
||||
# private-network attachment names.
|
||||
hcloud_network_name = hcloud_network.main.name
|
||||
hcloud_firewall_name = hcloud_firewall.main.name
|
||||
hcloud_ssh_key_name = hcloud_ssh_key.main.name
|
||||
}), "/(?m)^[ ]*#( |$).*\n/", "")
|
||||
|
||||
# Multi-region kubeconfig PUT-back — region key for this secondary
|
||||
# CP. cloudinit-control-plane.tftpl appends `?region=<k>` to the
|
||||
# PUT URL so catalyst-api stores it at
|
||||
# <kubeconfigsDir>/<id>-<k>.yaml and phase1Watch can spawn a
|
||||
# per-region helmwatch.Bridge.
|
||||
kubeconfig_postback_region = k
|
||||
}), "/(?m)^[ ]*#( |$).*
|
||||
/", "")
|
||||
}
|
||||
|
||||
# Per-secondary-region worker cloud-init — joins the secondary region's
|
||||
|
||||
@ -174,6 +174,20 @@ type Deployment struct {
|
||||
// for SnapshotComponents() and the GC reclaims the old one
|
||||
// once the field is overwritten.
|
||||
liveWatcher *helmwatch.Watcher
|
||||
|
||||
// Multi-region kubeconfig PUT-back (operator mandate, 2026-05-12).
|
||||
// secondaryKubeconfigPaths maps region key (e.g. "nbg1-1",
|
||||
// "hel1-2") → on-disk path of the per-region kubeconfig file.
|
||||
// Populated by PutKubeconfig when ?region=<k> is set; consumed
|
||||
// by runPhase1Watch to spawn one helmwatch.Bridge per region so
|
||||
// the canvas surfaces install-* HRs from EVERY region, not just
|
||||
// primary. The primary's kubeconfig stays on Result.KubeconfigPath.
|
||||
//
|
||||
// secondaryWatchers parallels the map — one running helmwatch
|
||||
// .Watcher per region whose Snapshot is composed into the flow
|
||||
// snapshot. Cleared on wipe.
|
||||
secondaryKubeconfigPaths map[string]string
|
||||
secondaryWatchers map[string]*helmwatch.Watcher
|
||||
}
|
||||
|
||||
// SlimForHandover returns a copy of the receiver retaining ONLY the
|
||||
|
||||
@ -55,6 +55,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/helmwatch"
|
||||
"github.com/openova-io/openova/products/catalyst/bootstrap/api/internal/jobs"
|
||||
)
|
||||
|
||||
@ -164,20 +165,34 @@ func (h *Handler) flowSnapshotFromJobs(deploymentID string) (*flowSnapshotLocalM
|
||||
return nil, false
|
||||
}
|
||||
|
||||
// Pull spec.dependsOn from the live helmwatch.Watcher's informer
|
||||
// cache. jobs.Store does NOT persist Job.DependsOn for Phase-1
|
||||
// install-* Jobs today (only the Phase-0 tofu chain + cluster-
|
||||
// bootstrap gets dep wiring — see jobs/types.go PhaseTofu*).
|
||||
// Without this every install-* bubble renders disconnected on
|
||||
// the canvas. SnapshotComponents() returns ComponentSnapshot
|
||||
// {AppID, DependsOn} populated by extractDependsOn from each
|
||||
// HelmRelease's spec.dependsOn[].name (bp- prefix stripped). We
|
||||
// index by AppID so the per-Job lookup is O(1).
|
||||
// Pull spec.dependsOn from the PRIMARY region's live helmwatch
|
||||
// .Watcher's informer cache. jobs.Store does NOT persist
|
||||
// Job.DependsOn for Phase-1 install-* Jobs today (only the Phase-0
|
||||
// tofu chain + cluster-bootstrap gets dep wiring). Without this
|
||||
// every primary install-* bubble renders disconnected.
|
||||
//
|
||||
// Multi-region: separately, secondary regions each have their own
|
||||
// helmwatch.Watcher (dep.secondaryWatchers) whose components are
|
||||
// emitted below as REGION-PREFIXED FlowNodes so the canvas shows
|
||||
// install-* HRs from EVERY region. They do NOT contribute to
|
||||
// hrDeps used for the primary-leaf-only dep derivation — each
|
||||
// region's intra-cluster deps are computed against its OWN
|
||||
// snapshot inside the per-region block below.
|
||||
hrDeps := map[string][]string{}
|
||||
var secondaryWatchers map[string]*helmwatch.Watcher
|
||||
if val, ok := h.deployments.Load(deploymentID); ok {
|
||||
if dep, ok := val.(*Deployment); ok && dep != nil {
|
||||
dep.mu.Lock()
|
||||
w := dep.liveWatcher
|
||||
// Snapshot the secondaryWatchers map under the lock; the
|
||||
// per-watcher SnapshotComponents() calls below are
|
||||
// individually goroutine-safe.
|
||||
if len(dep.secondaryWatchers) > 0 {
|
||||
secondaryWatchers = make(map[string]*helmwatch.Watcher, len(dep.secondaryWatchers))
|
||||
for r, sw := range dep.secondaryWatchers {
|
||||
secondaryWatchers[r] = sw
|
||||
}
|
||||
}
|
||||
dep.mu.Unlock()
|
||||
if w != nil {
|
||||
for _, cs := range w.SnapshotComponents() {
|
||||
@ -340,6 +355,109 @@ func (h *Handler) flowSnapshotFromJobs(deploymentID string) (*flowSnapshotLocalM
|
||||
})
|
||||
}
|
||||
|
||||
// Multi-region — append one synthetic group bubble per secondary
|
||||
// region + one FlowNode per HR observed in that region's watcher
|
||||
// + contains edges parent→child + finish-to-start edges between
|
||||
// siblings (same-region only). Region tag is set so the canvas
|
||||
// can colour-group by region.
|
||||
if len(secondaryWatchers) > 0 {
|
||||
statusToFlow := func(state string) string {
|
||||
switch state {
|
||||
case "installed":
|
||||
return "succeeded"
|
||||
case "failed":
|
||||
return "failed"
|
||||
case "installing", "pending", "degraded":
|
||||
return "running"
|
||||
}
|
||||
return "pending"
|
||||
}
|
||||
for region, sw := range secondaryWatchers {
|
||||
if sw == nil {
|
||||
continue
|
||||
}
|
||||
snap := sw.SnapshotComponents()
|
||||
if len(snap) == 0 {
|
||||
continue
|
||||
}
|
||||
regionGroupID := deploymentID + ":" + region + ":bootstrap-kit"
|
||||
regionStr := region
|
||||
regionFamily := "group"
|
||||
nodes = append(nodes, flowSnapshotLocalNode{
|
||||
ID: regionGroupID,
|
||||
FlowID: deploymentID,
|
||||
Label: "Bootstrap (" + region + ")",
|
||||
Status: "running",
|
||||
Family: ®ionFamily,
|
||||
Region: ®ionStr,
|
||||
})
|
||||
// Hierarchy: this region's group is contained by the
|
||||
// top-level bootstrap-kit (so the canvas can fold all
|
||||
// regions under one parent).
|
||||
rels = append(rels, flowSnapshotLocalRelationship{
|
||||
FromID: regionGroupID,
|
||||
ToID: bootstrapID,
|
||||
Type: "contains",
|
||||
})
|
||||
|
||||
// Index this region's components for intra-region dep edges.
|
||||
regionAppIDs := map[string]string{} // appID → full node id
|
||||
for _, cs := range snap {
|
||||
appID := cs.AppID
|
||||
if appID == "" {
|
||||
continue
|
||||
}
|
||||
nodeID := deploymentID + ":" + region + ":install-" + appID
|
||||
regionAppIDs[appID] = nodeID
|
||||
}
|
||||
|
||||
installFamily := "install"
|
||||
for _, cs := range snap {
|
||||
appID := cs.AppID
|
||||
if appID == "" {
|
||||
continue
|
||||
}
|
||||
nodeID := regionAppIDs[appID]
|
||||
startedAt := int64(0)
|
||||
if !cs.LastTransitionAt.IsZero() {
|
||||
startedAt = cs.LastTransitionAt.Unix()
|
||||
}
|
||||
node := flowSnapshotLocalNode{
|
||||
ID: nodeID,
|
||||
FlowID: deploymentID,
|
||||
Label: "install-" + appID,
|
||||
Status: statusToFlow(cs.Status),
|
||||
Family: &installFamily,
|
||||
Region: ®ionStr,
|
||||
}
|
||||
if startedAt > 0 {
|
||||
node.StartedAt = &startedAt
|
||||
}
|
||||
nodes = append(nodes, node)
|
||||
// Hierarchy
|
||||
rels = append(rels, flowSnapshotLocalRelationship{
|
||||
FromID: nodeID,
|
||||
ToID: regionGroupID,
|
||||
Type: "contains",
|
||||
})
|
||||
// Intra-region deps (same region only — DO NOT cross
|
||||
// region edges, since each region is an independent
|
||||
// fault domain per NAMING-CONVENTION §1.3).
|
||||
for _, depApp := range cs.DependsOn {
|
||||
depID, ok := regionAppIDs[depApp]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
rels = append(rels, flowSnapshotLocalRelationship{
|
||||
FromID: depID,
|
||||
ToID: nodeID,
|
||||
Type: "finish-to-start",
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &flowSnapshotLocalMessage{
|
||||
Type: "snapshot",
|
||||
Flow: &flowSnapshotLocalFlow{
|
||||
|
||||
@ -388,12 +388,20 @@ func rewriteKubeconfigContext(in []byte, target string) ([]byte, error) {
|
||||
return out.Bytes(), nil
|
||||
}
|
||||
|
||||
// PutKubeconfig — PUT /api/v1/deployments/{id}/kubeconfig.
|
||||
// PutKubeconfig — PUT /api/v1/deployments/{id}/kubeconfig[?region=<k>].
|
||||
//
|
||||
// The cloud-init postback endpoint. See file header for the full
|
||||
// contract.
|
||||
//
|
||||
// Multi-region (2026-05-12): when `?region=<k>` is present the body
|
||||
// is stored at `<kubeconfigsDir>/<id>-<k>.yaml` and the single-use
|
||||
// guard fires PER REGION rather than per deployment. This lets each
|
||||
// secondary CP's cloud-init PUT its own kubeconfig without colliding
|
||||
// with the primary's PUT. The primary's path (no ?region=) is
|
||||
// unchanged.
|
||||
func (h *Handler) PutKubeconfig(w http.ResponseWriter, r *http.Request) {
|
||||
id := chi.URLParam(r, "id")
|
||||
region := strings.TrimSpace(r.URL.Query().Get("region"))
|
||||
val, ok := h.deployments.Load(id)
|
||||
if !ok {
|
||||
http.Error(w, "deployment not found", http.StatusNotFound)
|
||||
@ -416,9 +424,21 @@ func (h *Handler) PutKubeconfig(w http.ResponseWriter, r *http.Request) {
|
||||
// Snapshot the persisted hash + already-set state under the
|
||||
// lock so a concurrent retry/double-PUT can't observe the old
|
||||
// hash while we write the new file.
|
||||
//
|
||||
// Multi-region: `alreadySet` is region-scoped. For the primary
|
||||
// (region=="") it means Result.KubeconfigPath is populated. For a
|
||||
// secondary region it means the secondaryKubeconfigPaths[region]
|
||||
// entry is already populated.
|
||||
dep.mu.Lock()
|
||||
persistedHash := dep.kubeconfigBearerHash
|
||||
alreadySet := dep.Result != nil && dep.Result.KubeconfigPath != ""
|
||||
alreadySet := false
|
||||
if region == "" {
|
||||
alreadySet = dep.Result != nil && dep.Result.KubeconfigPath != ""
|
||||
} else {
|
||||
if dep.secondaryKubeconfigPaths != nil {
|
||||
_, alreadySet = dep.secondaryKubeconfigPaths[region]
|
||||
}
|
||||
}
|
||||
dep.mu.Unlock()
|
||||
|
||||
if persistedHash == "" {
|
||||
@ -498,7 +518,14 @@ func (h *Handler) PutKubeconfig(w http.ResponseWriter, r *http.Request) {
|
||||
})
|
||||
return
|
||||
}
|
||||
target := filepath.Join(h.kubeconfigsDir, id+".yaml")
|
||||
// Multi-region: secondary CPs land at <dir>/<id>-<region>.yaml so
|
||||
// the primary's <dir>/<id>.yaml stays the canonical entry the rest
|
||||
// of catalyst-api (and the GET /kubeconfig endpoint) reads.
|
||||
filename := id + ".yaml"
|
||||
if region != "" {
|
||||
filename = id + "-" + region + ".yaml"
|
||||
}
|
||||
target := filepath.Join(h.kubeconfigsDir, filename)
|
||||
if err := writeFileAtomic0600(target, body); err != nil {
|
||||
h.log.Error("kubeconfig file write failed", "id", id, "err", err)
|
||||
writeJSON(w, http.StatusServiceUnavailable, map[string]string{
|
||||
@ -531,9 +558,21 @@ func (h *Handler) PutKubeconfig(w http.ResponseWriter, r *http.Request) {
|
||||
SovereignFQDN: dep.Request.SovereignFQDN,
|
||||
}
|
||||
}
|
||||
dep.Result.KubeconfigPath = target
|
||||
if region == "" {
|
||||
dep.Result.KubeconfigPath = target
|
||||
} else {
|
||||
if dep.secondaryKubeconfigPaths == nil {
|
||||
dep.secondaryKubeconfigPaths = make(map[string]string)
|
||||
}
|
||||
dep.secondaryKubeconfigPaths[region] = target
|
||||
}
|
||||
relaunchAfterTerminalKubeconfigMissing := false
|
||||
if dep.Result.Phase1Outcome == helmwatch.OutcomeKubeconfigMissing {
|
||||
// Secondary-region PUTs skip the Phase-1 relaunch / channel
|
||||
// re-init machinery — that's owned by the primary's first PUT.
|
||||
// A secondary kubeconfig arriving later just makes a new file on
|
||||
// disk available for the per-region helmwatch.Bridge spawned by
|
||||
// runPhase1Watch (which scans the kubeconfigsDir).
|
||||
if region == "" && dep.Result.Phase1Outcome == helmwatch.OutcomeKubeconfigMissing {
|
||||
// Clear the terminal markers so markPhase1Done writes the
|
||||
// new outcome cleanly when the relaunched watch finishes,
|
||||
// and clear phase1Started so the goroutine below isn't
|
||||
@ -567,50 +606,34 @@ func (h *Handler) PutKubeconfig(w http.ResponseWriter, r *http.Request) {
|
||||
"relaunchAfterKubeconfigMissing", relaunchAfterTerminalKubeconfigMissing,
|
||||
)
|
||||
|
||||
// Issue #883 — seed the Sovereign-side
|
||||
// catalyst-system/sovereign-smtp-credentials Secret with the
|
||||
// mothership's SMTP submission credentials BEFORE Phase-1 watch
|
||||
// launches. The bp-catalyst-platform chart's auto-create step
|
||||
// (#901) runs Helm `lookup` against this Secret when rendering
|
||||
// the Sovereign-local catalyst-openova-kc-credentials Secret;
|
||||
// seeding now (kubeconfig present, bp-catalyst-platform not yet
|
||||
// installed) is exactly the window that makes the lookup land
|
||||
// real bytes instead of empty placeholders.
|
||||
//
|
||||
// The seed is best-effort: a failure here does NOT abort
|
||||
// Phase-1. PIN email delivery may degrade, but the Sovereign
|
||||
// itself still bootstraps. Outcome flows through the SSE event
|
||||
// bus so the wizard surfaces it inline with helmwatch events.
|
||||
seedOutcome := h.seedSovereignSMTPCredentials(r.Context(), dep, string(body))
|
||||
h.emitSovereignSMTPSeedEvent(dep, seedOutcome)
|
||||
// SMTP seed + Phase-1 watch launch happen ONCE per deployment, on
|
||||
// the primary CP's PUT. Secondary-region PUTs just deposit the
|
||||
// per-region kubeconfig file; runPhase1Watch's per-region
|
||||
// helmwatch.Bridge spawner picks them up at its next refresh.
|
||||
if region == "" {
|
||||
// Issue #883 — seed the Sovereign-side
|
||||
// catalyst-system/sovereign-smtp-credentials Secret with the
|
||||
// mothership's SMTP submission credentials BEFORE Phase-1 watch
|
||||
// launches.
|
||||
seedOutcome := h.seedSovereignSMTPCredentials(r.Context(), dep, string(body))
|
||||
h.emitSovereignSMTPSeedEvent(dep, seedOutcome)
|
||||
|
||||
// Launch the helmwatch goroutine in the background. The PUT
|
||||
// returns immediately; per-component events flow via the SSE
|
||||
// stream the wizard already has open. The phase1Started guard
|
||||
// ensures runProvisioning's later (synchronous) call is a
|
||||
// no-op so we don't run two informers — except in the
|
||||
// relaunch-after-kubeconfig-missing case, where we cleared the
|
||||
// guard above so this fresh watch can run and supersede the
|
||||
// terminal-failed state. Issue #538.
|
||||
go func() {
|
||||
h.runPhase1Watch(dep)
|
||||
// On the relaunch path we own the channels we just
|
||||
// allocated under the lock above (the original
|
||||
// runProvisioning's close()s ran long ago). Mirror
|
||||
// resumePhase1Watch's close-on-terminate logic so any SSE
|
||||
// consumer waiting on `event: done` is released.
|
||||
if relaunchAfterTerminalKubeconfigMissing {
|
||||
dep.mu.Lock()
|
||||
select {
|
||||
case <-dep.done:
|
||||
// Already closed (defensive).
|
||||
default:
|
||||
close(dep.eventsCh)
|
||||
close(dep.done)
|
||||
// Launch the helmwatch goroutine in the background.
|
||||
go func() {
|
||||
h.runPhase1Watch(dep)
|
||||
if relaunchAfterTerminalKubeconfigMissing {
|
||||
dep.mu.Lock()
|
||||
select {
|
||||
case <-dep.done:
|
||||
// Already closed (defensive).
|
||||
default:
|
||||
close(dep.eventsCh)
|
||||
close(dep.done)
|
||||
}
|
||||
dep.mu.Unlock()
|
||||
}
|
||||
dep.mu.Unlock()
|
||||
}
|
||||
}()
|
||||
}()
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
@ -33,7 +33,9 @@ import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -264,6 +266,17 @@ func (h *Handler) runPhase1Watch(dep *Deployment) {
|
||||
dep.mu.Unlock()
|
||||
}()
|
||||
|
||||
// Multi-region: spawn one helmwatch.Watcher per secondary region
|
||||
// whose kubeconfig has been PUT back (cloudinit-control-plane.tftpl
|
||||
// passes `?region=<k>` for each secondary CP). The watchers run
|
||||
// in background goroutines; their SnapshotComponents() is
|
||||
// composed into the flow snapshot per region so the canvas shows
|
||||
// install-* HRs from every region. A poll loop catches secondary
|
||||
// kubeconfigs that arrive AFTER the primary's watch starts (they
|
||||
// typically race within ~10s of each other but not always).
|
||||
stopSecondaries := h.spawnSecondaryRegionWatchers(dep)
|
||||
defer stopSecondaries()
|
||||
|
||||
// Use the background context so a finished HTTP request from the
|
||||
// caller doesn't cancel a multi-minute Phase-1 watch. The watch
|
||||
// has its own configured timeout via cfg.WatchTimeout.
|
||||
@ -398,6 +411,135 @@ func (h *Handler) setPhase1Substate(dep *Deployment, substate string) {
|
||||
h.persistDeployment(dep)
|
||||
}
|
||||
|
||||
// spawnSecondaryRegionWatchers reads `<kubeconfigsDir>/<id>-*.yaml`
|
||||
// every 15s for the lifetime of the parent runPhase1Watch and spawns
|
||||
// one helmwatch.Watcher per secondary region whose kubeconfig has been
|
||||
// PUT back. Returns a stop function the caller defers; the stop fn
|
||||
// cancels the ticker AND each per-region watcher's context.
|
||||
//
|
||||
// Each per-region watcher emits ordinary helmwatch events into the
|
||||
// parent SSE channel (so the wizard still sees them) — but it does
|
||||
// NOT contribute to the parent's `markPhase1Done` terminal call,
|
||||
// since secondary regions can succeed/fail independently and the
|
||||
// parent's outcome is taken from the primary's watcher.
|
||||
//
|
||||
// The composed `Snapshot` of all regions lives on
|
||||
// dep.secondaryWatchers, read by flowSnapshotFromJobs to emit
|
||||
// per-region FlowNodes for the canvas.
|
||||
func (h *Handler) spawnSecondaryRegionWatchers(dep *Deployment) func() {
|
||||
if h.kubeconfigsDir == "" {
|
||||
return func() {}
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
var wg sync.WaitGroup
|
||||
stopWatchers := make(map[string]context.CancelFunc)
|
||||
var mu sync.Mutex
|
||||
|
||||
spawn := func(region, kcPath string) {
|
||||
mu.Lock()
|
||||
if _, exists := stopWatchers[region]; exists {
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
raw, err := os.ReadFile(kcPath)
|
||||
if err != nil {
|
||||
mu.Unlock()
|
||||
h.log.Warn("secondary kubeconfig read failed",
|
||||
"id", dep.ID, "region", region, "path", kcPath, "err", err)
|
||||
return
|
||||
}
|
||||
cfg := h.phase1WatchConfigForDeployment(dep, string(raw))
|
||||
watcher, err := helmwatch.NewWatcher(cfg, func(ev provisioner.Event) {
|
||||
// Region-tag the component events so the SSE consumer
|
||||
// can group them per region. Bare bp-* names from
|
||||
// secondary regions would otherwise collide with the
|
||||
// primary's events in the wizard's per-component view.
|
||||
ev.Component = region + "/" + ev.Component
|
||||
h.emitWatchEvent(dep, ev)
|
||||
})
|
||||
if err != nil {
|
||||
mu.Unlock()
|
||||
h.log.Error("secondary helmwatch.NewWatcher failed",
|
||||
"id", dep.ID, "region", region, "err", err)
|
||||
return
|
||||
}
|
||||
wctx, wcancel := context.WithCancel(ctx)
|
||||
stopWatchers[region] = wcancel
|
||||
dep.mu.Lock()
|
||||
if dep.secondaryWatchers == nil {
|
||||
dep.secondaryWatchers = make(map[string]*helmwatch.Watcher)
|
||||
}
|
||||
dep.secondaryWatchers[region] = watcher
|
||||
dep.mu.Unlock()
|
||||
mu.Unlock()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
h.log.Info("secondary phase1 watch starting", "id", dep.ID, "region", region)
|
||||
_, werr := watcher.Watch(wctx)
|
||||
if werr != nil && wctx.Err() == nil {
|
||||
h.log.Warn("secondary phase1 watch returned error",
|
||||
"id", dep.ID, "region", region, "err", werr)
|
||||
}
|
||||
dep.mu.Lock()
|
||||
if dep.secondaryWatchers != nil && dep.secondaryWatchers[region] == watcher {
|
||||
delete(dep.secondaryWatchers, region)
|
||||
}
|
||||
dep.mu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
scan := func() {
|
||||
entries, err := os.ReadDir(h.kubeconfigsDir)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
prefix := dep.ID + "-"
|
||||
for _, e := range entries {
|
||||
n := e.Name()
|
||||
if !strings.HasPrefix(n, prefix) || !strings.HasSuffix(n, ".yaml") {
|
||||
continue
|
||||
}
|
||||
region := strings.TrimSuffix(strings.TrimPrefix(n, prefix), ".yaml")
|
||||
if region == "" {
|
||||
continue
|
||||
}
|
||||
spawn(region, filepath.Join(h.kubeconfigsDir, n))
|
||||
}
|
||||
}
|
||||
|
||||
// Initial scan + periodic refresh — secondary CPs may PUT their
|
||||
// kubeconfigs ~minutes apart depending on per-region tofu apply
|
||||
// timing.
|
||||
go func() {
|
||||
scan()
|
||||
ticker := time.NewTicker(15 * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
scan()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return func() {
|
||||
cancel()
|
||||
mu.Lock()
|
||||
for _, c := range stopWatchers {
|
||||
c()
|
||||
}
|
||||
mu.Unlock()
|
||||
wg.Wait()
|
||||
dep.mu.Lock()
|
||||
dep.secondaryWatchers = nil
|
||||
dep.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// markPhase1Done writes the watch outcome onto dep.Result and flips
|
||||
// Status accordingly. Holds dep.mu for the whole transition so a
|
||||
// State() snapshot from another goroutine can't observe Status=ready
|
||||
|
||||
@ -559,12 +559,28 @@ func (h *Handler) WipeDeployment(w http.ResponseWriter, r *http.Request) {
|
||||
// defensive in case Destroy returned an error and left it)
|
||||
// - on-disk deployment record JSON
|
||||
if h.kubeconfigsDir != "" {
|
||||
// Primary kubeconfig.
|
||||
kcPath := filepath.Join(h.kubeconfigsDir, id+".yaml")
|
||||
if err := os.Remove(kcPath); err != nil && !os.IsNotExist(err) {
|
||||
report.Errors = append(report.Errors, "remove kubeconfig: "+err.Error())
|
||||
} else if err == nil {
|
||||
emit("wipe", "info", "kubeconfig file removed: "+kcPath)
|
||||
}
|
||||
// Multi-region secondaries — <id>-<region>.yaml glob.
|
||||
if entries, derr := os.ReadDir(h.kubeconfigsDir); derr == nil {
|
||||
prefix := id + "-"
|
||||
for _, e := range entries {
|
||||
n := e.Name()
|
||||
if strings.HasPrefix(n, prefix) && strings.HasSuffix(n, ".yaml") {
|
||||
p := filepath.Join(h.kubeconfigsDir, n)
|
||||
if rerr := os.Remove(p); rerr != nil && !os.IsNotExist(rerr) {
|
||||
report.Errors = append(report.Errors, "remove secondary kubeconfig "+n+": "+rerr.Error())
|
||||
} else if rerr == nil {
|
||||
emit("wipe", "info", "secondary kubeconfig file removed: "+p)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tofuWorkDir := filepath.Join(prov.WorkDir, deploymentSovereignName(dep.Request.SovereignFQDN))
|
||||
|
||||
Loading…
Reference in New Issue
Block a user