Linkerd 제어 플레인의 Destination 컨트롤러는 서비스 디스커버리와 라우팅을 담당합니다. 이 컨트롤러는 Kubernetes 리소스(Services, EndpointSlices, Pods, ExternalWorkloads 등)를 공유 인포머(shared informers)를 통해 감시하고, 엔드포인트의 로컬 캐시를 구축하며, 데이터 플레인 프록시로부터의 gRPC 요청을 제공합니다.
Destination이 시작되면, k8s.io/client-go Go 모듈을 사용하여 관심 있는 모든 리소스 종류(CronJobs, Pods, Services 등)에 대해 하나의 공유 인포머(shared informer)를 생성하고, API 구조체에 해당 핸들을 저장하며, 각 인포머가 동기화되었는지 확인하는 체크(HasSynced)를 기록하고, 현재 캐시의 키 개수를 나타내는 Prometheus 게이지를 등록합니다.
Sync 함수가 호출되면, 각 인포머는 초기 스냅샷을 가져오기 위해 API 서버에 요청하고, 이후 변경 이벤트를 실시간으로 수신하기 위해 장기 워치 스트림(watch=0)을 엽니다. ResyncTime = 10 * time.Minute으로 정의된 10분 동안 이벤트가 도착하지 않으면, Kubernetes API 서버에 전체 스냅샷 재요청을 보냅니다.
kubectl logs -n linkerd deploy/linkerd-destination -c destination --follow
...
time="2025-05-19T09:10:15Z" level=info msg="GET https://10.247.0.1:443/apis/workload.linkerd.io/v1beta1/externalworkloads?allowWatchBookmarks=true&resourceVersion=740&timeout=7m47s&timeoutSeconds=467&watch=true 200 OK in 0 milliseconds"
time="2025-05-19T09:10:15Z" level=info msg="GET https://10.247.0.1:443/apis/discovery.k8s.io/v1/endpointslices?allowWatchBookmarks=true&resourceVersion=751&timeout=9m16s&timeoutSeconds=556&watch=true 200 OK in 0 milliseconds"
time="2025-05-19T09:10:15Z" level=info msg="GET https://10.247.0.1:443/apis/batch/v1/jobs?allowWatchBookmarks=true&resourceVersion=740&timeout=9m10s&timeoutSeconds=550&watch=true 200 OK in 0 milliseconds"
time="2025-05-19T09:10:15Z" level=info msg="GET https://10.247.0.1:443/api/v1/endpoints?allowWatchBookmarks=true&resourceVersion=740&timeout=9m35s&timeoutSeconds=575&watch=true 200 OK in 1 milliseconds"
time="2025-05-19T09:10:15Z" level=info msg="GET https://10.247.0.1:443/apis/linkerd.io/v1alpha2/serviceprofiles?allowWatchBookmarks=true&resourceVersion=740&timeout=7m19s&timeoutSeconds=439&watch=true 200 OK in 0 milliseconds"
time="2025-05-19T09:10:15Z" level=info msg="GET https://10.247.0.1:443/apis/policy.linkerd.io/v1beta3/servers?allowWatchBookmarks=true&resourceVersion=740&timeout=8m16s&timeoutSeconds=496&watch=true 200 OK in 0 milliseconds"
time="2025-05-19T09:10:15Z" level=info msg="GET https://10.247.0.1:443/api/v1/services?allowWatchBookmarks=true&resourceVersion=740&timeout=8m50s&timeoutSeconds=530&watch=true 200 OK in 0 milliseconds"
time="2025-05-19T09:10:15Z" level=info msg="GET https://10.247.0.1:443/api/v1/pods?allowWatchBookmarks=true&resourceVersion=741&timeout=8m55s&timeoutSeconds=535&watch=true 200 OK in 0 milliseconds"
time="2025-05-19T09:10:15Z" level=info msg="GET https://10.247.0.1:443/apis/apps/v1/replicasets?allowWatchBookmarks=true&resourceVersion=739&timeout=7m4s&timeoutSeconds=424&watch=true 200 OK in 0 milliseconds"
time="2025-05-19T09:10:15Z" level=info msg="GET https://10.247.0.1:443/apis/batch/v1/jobs?allowWatchBookmarks=true&resourceVersion=740&timeout=7m29s&timeoutSeconds=449&watch=true 200 OK in 0 milliseconds"
time="2025-05-19T09:10:15Z" level=info msg="GET https://10.247.0.1:443/api/v1/nodes?allowWatchBookmarks=true&resourceVersion=740&timeout=8m30s&timeoutSeconds=510&watch=true 200 OK in 0 milliseconds"
time="2025-05-19T09:10:15Z" level=info msg="GET https://10.247.0.1:443/api/v1/namespaces/linkerd/secrets?allowWatchBookmarks=true&resourceVersion=740&timeout=9m7s&timeoutSeconds=547&watch=true 200 OK in 0 milliseconds"
각 인포머는 Kubernetes API 서버에서 반환된 데이터를 저장하는 스레드 안전한 로컬 캐시를 소유합니다.
인포머 자체만으로는 캐시가 업데이트될 때 비즈니스 로직에 알림을 보내지 않으며, 단지 로컬 캐시를 채우고 이를 쿼리할 수 있게 합니다. 그래서 컨트롤러 소스 코드에는 관련 인포머 위에 여러 워처(watcher)가 있으며, 인포머에 이벤트 핸들러(event handlers)를 등록하여 캐시가 업데이트될 때 실제로 알림을 받도록 구현되어 있습니다.
알림을 받으면, 동작에 따라 새 버전과 이전 버전 사이의 차분(diff)을 생성하고, 리스너(listener)에게 증분 변경만 전달하여 업데이트를 반영합니다. 중요한 점은 ew.enableEndpointSlices 값을 기준으로 둘 중 하나만 감시한다는 것입니다. 이 값은 컨테이너의 매개변수 -enable-endpoint-slices로 전달되며, 기본값은 true입니다.
func (pp *portPublisher) updateEndpoints(endpoints *corev1.Endpoints) {
newAddressSet := pp.endpointsToAddresses(endpoints)
if len(newAddressSet.Addresses) == 0 {
for _, listener := range pp.listeners {
listener.NoEndpoints(true)
}
} else {
add, remove := diffAddresses(pp.addresses, newAddressSet)
for _, listener := range pp.listeners {
if len(remove.Addresses) > 0 {
listener.Remove(remove)
}
if len(add.Addresses) > 0 {
listener.Add(add)
}
}
}
pp.addresses = newAddressSet
pp.exists = true
pp.metrics.incUpdates()
pp.metrics.setPods(len(pp.addresses.Addresses))
pp.metrics.setExists(true)
}
관련 로그 메시지를 확인할 수 있습니다:
time="2025-06-03T15:39:09Z" level=debug msg="Updating service for kube-system/kube-dns" addr=":8086" component=service-publisher ns=kube-system svc=kube-dns
time="2025-06-03T15:39:09Z" level=debug msg="Updating service for kube-system/metrics-server" addr=":8086" component=service-publisher ns=kube-system svc=metrics-server
time="2025-06-03T15:39:09Z" level=debug msg="Updating service for linkerd/linkerd-dst" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-dst
time="2025-06-03T15:39:09Z" level=debug msg="Updating service for linkerd/linkerd-dst-headless" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-dst-headless
time="2025-06-03T15:39:09Z" level=debug msg="Updating service for linkerd/linkerd-identity" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-identity
time="2025-06-03T15:39:09Z" level=debug msg="Updating service for linkerd/linkerd-identity-headless" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-identity-headless
time="2025-06-03T15:39:09Z" level=debug msg="Updating service for linkerd/linkerd-policy" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-policy
time="2025-06-03T15:39:09Z" level=debug msg="Updating service for linkerd/linkerd-policy-validator" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-policy-validator
time="2025-06-03T15:39:09Z" level=debug msg="Updating service for linkerd/linkerd-proxy-injector" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-proxy-injector
time="2025-06-03T15:39:09Z" level=debug msg="Updating service for linkerd/linkerd-sp-validator" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-sp-validator
EndpointSlice와 Endpoints도 마찬가지로 처리됩니다.
func (sp *servicePublisher) addEndpointSlice(newSlice *discovery.EndpointSlice) {
sp.Lock()
defer sp.Unlock()
sp.log.Debugf("Adding ES %s/%s", newSlice.Namespace, newSlice.Name)
for _, port := range sp.ports {
port.addEndpointSlice(newSlice)
}
}
...
func (ew *EndpointsWatcher) addEndpointSlice(obj interface{}) {
newSlice, ok := obj.(*discovery.EndpointSlice)
if !ok {
ew.log.Errorf("error processing EndpointSlice resource, got %#v expected *discovery.EndpointSlice", obj)
return
}
id, err := getEndpointSliceServiceID(newSlice)
if err != nil {
ew.log.Errorf("Could not fetch resource service name:%v", err)
return
}
sp := ew.getOrNewServicePublisher(id)
sp.addEndpointSlice(newSlice)
}
관련 로그:
time="2025-06-03T15:39:09Z" level=debug msg="Adding ES default/kubernetes" addr=":8086" component=service-publisher ns=default svc=kubernetes
time="2025-06-03T15:39:09Z" level=debug msg="Adding ES kube-system/kube-dns-g2kr4" addr=":8086" component=service-publisher ns=kube-system svc=kube-dns
time="2025-06-03T15:39:09Z" level=debug msg="Adding ES kube-system/metrics-server-mfqjs" addr=":8086" component=service-publisher ns=kube-system svc=metrics-server
time="2025-06-03T15:39:09Z" level=debug msg="Adding ES linkerd/linkerd-dst-headless-kz5q4" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-dst-headless
time="2025-06-03T15:39:09Z" level=debug msg="Adding ES linkerd/linkerd-dst-pmtjw" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-dst
time="2025-06-03T15:39:09Z" level=debug msg="Adding ES linkerd/linkerd-identity-dzpzm" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-identity
time="2025-06-03T15:39:09Z" level=debug msg="Adding ES linkerd/linkerd-identity-headless-45vjs" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-identity-headless
time="2025-06-03T15:39:09Z" level=debug msg="Adding ES linkerd/linkerd-policy-n54jm" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-policy
time="2025-06-03T15:39:09Z" level=debug msg="Adding ES linkerd/linkerd-policy-validator-5hqbl" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-policy-validator
time="2025-06-03T15:39:09Z" level=debug msg="Adding ES linkerd/linkerd-proxy-injector-kzk6j" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-proxy-injector
time="2025-06-03T15:39:09Z" level=debug msg="Adding ES linkerd/linkerd-sp-validator-g7vgx" addr=":8086" component=service-publisher ns=linkerd svc=linkerd-sp-validator
6. 외부 워크로드(External Workloads)
Linkerd의 Destination 서브시스템은 클러스터 외부에서 실행되는 워크로드를 나타내는 ExternalWorkload 리소스를 관리합니다. 일반적인 Pod과 달리 내부 IP가 Kubernetes 리소스에 존재하지 않으므로, 워크로드 IP는 ExternalWorkload.spec.workloadIPs 필드에 정의되어 있습니다.
어떤 Service에 특정 ExternalWorkload를 포함시켜야 하는지 결정하기 위해, 컨트롤러는 레이블 셀렉터(label-selector) 매칭을 사용하여 <namespace>/<service-name> 목록을 생성합니다. 이를 통해 어떤 Service를 다시 동기화해야 하는지 알 수 있습니다.
ExternalWorkload가 업데이트되면 IP와 레이블 모두를 재평가하며, 실제로 변경이 발생했을 때만 영향을 받는 Service들을 다시 큐(queue)에 넣습니다.
func (ec *EndpointsController) getServicesToUpdateOnExternalWorkloadChange(old, cur interface{}) sets.Set[string] {
newEw, newEwOk := cur.(*ewv1beta1.ExternalWorkload)
oldEw, oldEwOk := old.(*ewv1beta1.ExternalWorkload)
if !oldEwOk {
ec.log.Errorf("Expected (cur) to be an EndpointSlice in getServicesToUpdateOnExternalWorkloadChange(), got type: %T", cur)
return sets.Set[string]{}
}
if !newEwOk {
ec.log.Errorf("Expected (old) to be an EndpointSlice in getServicesToUpdateOnExternalWorkloadChange(), got type: %T", old)
return sets.Set[string]{}
}
if newEw.ResourceVersion == oldEw.ResourceVersion {
return sets.Set[string]{}
}
ewChanged, labelsChanged := ewEndpointsChanged(oldEw, newEw)
if !ewChanged && !labelsChanged {
ec.log.Errorf("skipping update; nothing has changed between old rv %s and new rv %s", oldEw.ResourceVersion, newEw.ResourceVersion)
return sets.Set[string]{}
}
services, err := ec.getExternalWorkloadSvcMembership(newEw)
if err != nil {
ec.log.Errorf("unable to get pod %s/%s's service memberships: %v", newEw.Namespace, newEw.Name, err)
return sets.Set[string]{}
}
if labelsChanged {
oldServices, err := ec.getExternalWorkloadSvcMembership(oldEw)
if err != nil {
ec.log.Errorf("unable to get pod %s/%s's service memberships: %v", oldEw.Namespace, oldEw.Name, err)
}
services = determineNeededServiceUpdates(oldServices, services, ewChanged)
}
return services
}
Service가 큐에 추가되면, 백그라운드 워커가 하나씩 처리합니다.
func (ec *EndpointsController) processQueue() {
for {
key, quit := ec.queue.Get()
if quit {
ec.log.Trace("queue received shutdown signal")
return
}
err := ec.syncService(key)
ec.handleError(err, key)
ec.queue.Done(key)
}
}
syncService 메서드는 서비스의 타입이 ExternalName이 아닌 경우에만 ExternalWorkload CR에서 가져온 IP와 일치하도록 EndpointSlice 객체 세트를 보장합니다. Service의 셀렉터가 없으면 아무런 작업도 수행하지 않습니다.
func (ec *EndpointsController) syncService(update string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(update)
if err != nil {
return err
}
svc, err := ec.k8sAPI.Svc().Lister().Services(namespace).Get(name)
if err != nil {
if !kerrors.IsNotFound(err) {
return err
}
ec.reconciler.endpointTracker.DeleteService(namespace, name)
return nil
}
if svc.Spec.Type == corev1.ServiceTypeExternalName {
return nil
}
if svc.Spec.Selector == nil {
return nil
}
ewSelector := labels.Set(svc.Spec.Selector).AsSelectorPreValidated()
ews, err := ec.k8sAPI.ExtWorkload().Lister().List(ewSelector)
if err != nil {
return err
}
esSelector := labels.Set(map[string]string{
discoveryv1.LabelServiceName: svc.Name,
discoveryv1.LabelManagedBy: managedBy,
}).AsSelectorPreValidated()
epSlices, err := ec.k8sAPI.ES().Lister().List(esSelector)
if err != nil {
return err
}
epSlices = dropEndpointSlicesPendingDeletion(epSlices)
if ec.reconciler.endpointTracker.StaleSlices(svc, epSlices) {
ec.log.Warnf("detected EndpointSlice informer cache is out of date when processing %s", update)
return errors.New("EndpointSlice informer cache is out of date")
}
err = ec.reconciler.reconcile(svc, ews, epSlices)
if err != nil {
return err
}
return nil
}
마지막으로, 정확히 생성·업데이트·삭제해야 할 EndpointSlice 객체를 계산하고 Kubernetes에 반영합니다.
리더가 주기적으로 갱신을 계속하는 동안 ExternalWorkload 변경 사항을 계속 리컨실(reconcile)하며, 리더가 실패하면 다른 복제본이 리더가 되어 쓰기 작업을 이어받습니다. 다음과 같은 주기적인 갱신 로그를 확인할 수 있습니다:
time="2025-05-19T08:53:23Z" level=info msg="PUT https://10.247.0.1:443/apis/coordination.k8s.io/v1/namespaces/linkerd/leases/linkerd-destination-endpoint-write 200 OK in 2 milliseconds"
...
time="2025-05-19T08:53:25Z" level=info msg="PUT https://10.247.0.1:443/apis/coordination.k8s.io/v1/namespaces/linkerd/leases/linkerd-destination-endpoint-write 200 OK in 6 milliseconds"