Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 67 additions & 48 deletions internal/service/kubernetes_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ import (
"k8s.io/client-go/rest"
)

type ingressKey struct {
type resourceKey struct {
namespace string
name string
}

type ingressAppKey struct {
ingressKey
type resourceAppKey struct {
resourceKey
appName string
}

type ingressApp struct {
type resourceApp struct {
domain string
appName string
app model.App
Expand All @@ -42,9 +42,27 @@ type KubernetesService struct {
client dynamic.Interface
started bool
mu sync.RWMutex
ingressApps map[ingressKey][]ingressApp
domainIndex map[string]ingressAppKey
appNameIndex map[string]ingressAppKey
resourceApps map[resourceKey][]resourceApp
domainIndex map[string]resourceAppKey
appNameIndex map[string]resourceAppKey
}

var watchedGVRs = []schema.GroupVersionResource{
{
Group: "networking.k8s.io",
Version: "v1",
Resource: "ingresses",
},
{
Group: "gateway.networking.k8s.io",
Version: "v1",
Resource: "httproutes",
},
{
Group: "gateway.networking.k8s.io",
Version: "v1",
Resource: "grpcroutes",
},
}

func NewKubernetesService(
Expand All @@ -62,74 +80,75 @@ func NewKubernetesService(
return nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}

gvr := schema.GroupVersionResource{
Group: "networking.k8s.io",
Version: "v1",
Resource: "ingresses",
service := &KubernetesService{
log: log,
ctx: ctx,
client: client,
resourceApps: make(map[resourceKey][]resourceApp),
domainIndex: make(map[string]resourceAppKey),
appNameIndex: make(map[string]resourceAppKey),
}

accessCtx, accessCancel := context.WithTimeout(ctx, 5*time.Second)
defer accessCancel()

_, err = client.Resource(gvr).List(accessCtx, metav1.ListOptions{Limit: 1})
if err != nil {
log.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Failed to access Ingress API, Kubernetes label provider will be disabled")
return nil, fmt.Errorf("failed to access ingress api: %w", err)
started := 0
for _, gvr := range watchedGVRs {
_, err = client.Resource(gvr).List(accessCtx, metav1.ListOptions{Limit: 1})
if err != nil {
log.App.Warn().Err(err).Str("api", gvr.GroupVersion().String()).Msg("Failed to access API, skipping watcher")
continue
}
log.App.Debug().Str("api", gvr.GroupVersion().String()).Msg("Successfully accessed API, starting watcher")
gvrCopy := gvr
wg.Go(func() {
service.watchGVR(gvrCopy)
})
started++
}

log.App.Debug().Str("api", gvr.GroupVersion().String()).Msg("Successfully accessed Ingress API, starting watcher")

service := &KubernetesService{
log: log,
ctx: ctx,
client: client,
ingressApps: make(map[ingressKey][]ingressApp),
domainIndex: make(map[string]ingressAppKey),
appNameIndex: make(map[string]ingressAppKey),
if started == 0 {
return nil, fmt.Errorf("failed to access any supported kubernetes API (ingresses, httproutes)")
}

wg.Go(func() {
service.watchGVR(gvr)
})

service.started = true
log.App.Debug().Msg("Kubernetes label provider started successfully")

return service, nil
}

func (k *KubernetesService) addIngressApps(namespace, name string, apps []ingressApp) {
func (k *KubernetesService) addResourceApps(namespace, name string, apps []resourceApp) {
k.mu.Lock()
defer k.mu.Unlock()

key := ingressKey{namespace, name}
// Remove existing entries for this ingress
if existing, ok := k.ingressApps[key]; ok {
key := resourceKey{namespace, name}
// Remove existing entries for this resource
if existing, ok := k.resourceApps[key]; ok {
for _, app := range existing {
delete(k.domainIndex, app.domain)
delete(k.appNameIndex, app.appName)
}
}
// Add new entries
k.ingressApps[key] = apps
k.resourceApps[key] = apps
for _, app := range apps {
appKey := ingressAppKey{key, app.appName}
appKey := resourceAppKey{key, app.appName}
k.domainIndex[app.domain] = appKey
k.appNameIndex[app.appName] = appKey
}
}

func (k *KubernetesService) removeIngress(namespace, name string) {
func (k *KubernetesService) removeResource(namespace, name string) {
k.mu.Lock()
defer k.mu.Unlock()

key := ingressKey{namespace, name}
if apps, ok := k.ingressApps[key]; ok {
key := resourceKey{namespace, name}
if apps, ok := k.resourceApps[key]; ok {
for _, app := range apps {
delete(k.domainIndex, app.domain)
delete(k.appNameIndex, app.appName)
}
delete(k.ingressApps, key)
delete(k.resourceApps, key)
}
}

Expand All @@ -138,7 +157,7 @@ func (k *KubernetesService) getByDomain(domain string) *model.App {
defer k.mu.RUnlock()

if appKey, ok := k.domainIndex[domain]; ok {
if apps, ok := k.ingressApps[appKey.ingressKey]; ok {
if apps, ok := k.resourceApps[appKey.resourceKey]; ok {
for i := range apps {
app := &apps[i]
if app.domain == domain && app.appName == appKey.appName {
Expand All @@ -155,7 +174,7 @@ func (k *KubernetesService) getByAppName(appName string) *model.App {
defer k.mu.RUnlock()

if appKey, ok := k.appNameIndex[appName]; ok {
if apps, ok := k.ingressApps[appKey.ingressKey]; ok {
if apps, ok := k.resourceApps[appKey.resourceKey]; ok {
for i := range apps {
app := &apps[i]
if app.appName == appName {
Expand All @@ -172,30 +191,30 @@ func (k *KubernetesService) updateFromItem(item *unstructured.Unstructured) {
name := item.GetName()
annotations := item.GetAnnotations()
if annotations == nil {
k.removeIngress(namespace, name)
k.removeResource(namespace, name)
return
}
labels, err := decoders.DecodeLabels[model.Apps](annotations, "apps")
if err != nil {
k.log.App.Warn().Err(err).Str("namespace", namespace).Str("name", name).Msg("Failed to decode ingress labels, skipping")
k.removeIngress(namespace, name)
k.log.App.Warn().Err(err).Str("namespace", namespace).Str("name", name).Msg("Failed to decode labels, skipping")
k.removeResource(namespace, name)
return
}
var apps []ingressApp
var apps []resourceApp
for appName, appLabels := range labels.Apps {
if appLabels.Config.Domain == "" {
continue
}
apps = append(apps, ingressApp{
apps = append(apps, resourceApp{
domain: appLabels.Config.Domain,
appName: appName,
app: appLabels,
})
}
if len(apps) == 0 {
k.removeIngress(namespace, name)
k.removeResource(namespace, name)
} else {
k.addIngressApps(namespace, name, apps)
k.addResourceApps(namespace, name, apps)
}
}

Expand Down Expand Up @@ -239,7 +258,7 @@ func (k *KubernetesService) runWatcher(gvr schema.GroupVersionResource, w watch.
case watch.Added, watch.Modified:
k.updateFromItem(item)
case watch.Deleted:
k.removeIngress(item.GetNamespace(), item.GetName())
k.removeResource(item.GetNamespace(), item.GetName())
}
case <-resyncTicker.C:
if err := k.resyncGVR(gvr); err != nil {
Expand Down
Loading