Skip to content

Commit

Permalink
feat: make concurrency configurable (#62)
Browse files Browse the repository at this point in the history
Signed-off-by: Mario Constanti <[email protected]>
  • Loading branch information
bavarianbidi committed Jan 9, 2024
1 parent 22f1017 commit 835d118
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 51 deletions.
43 changes: 32 additions & 11 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (
"k8s.io/klog/v2/textlogger"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"

garmoperatorv1alpha1 "github.com/mercedes-benz/garm-operator/api/v1alpha1"
"github.com/mercedes-benz/garm-operator/internal/controller"
garmcontroller "github.com/mercedes-benz/garm-operator/internal/controller"
"github.com/mercedes-benz/garm-operator/pkg/client"
"github.com/mercedes-benz/garm-operator/pkg/config"
"github.com/mercedes-benz/garm-operator/pkg/flags"
Expand Down Expand Up @@ -126,19 +127,27 @@ func run() error {
return fmt.Errorf("unable to setup garm: %w", err)
}

if err = (&controller.EnterpriseReconciler{
if err = (&garmcontroller.EnterpriseReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("enterprise-controller"),
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr,
controller.Options{
MaxConcurrentReconciles: config.Config.Operator.EnterpriseConcurrency,
},
); err != nil {
return fmt.Errorf("unable to create controller Enterprise: %w", err)
}

if err = (&controller.PoolReconciler{
if err = (&garmcontroller.PoolReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("pool-controller"),
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr,
controller.Options{
MaxConcurrentReconciles: config.Config.Operator.PoolConcurrency,
},
); err != nil {
return fmt.Errorf("unable to create controller Pool: %w", err)
}

Expand All @@ -154,30 +163,42 @@ func run() error {
return fmt.Errorf("unable to create webhook Repository: %w", err)
}

if err = (&controller.OrganizationReconciler{
if err = (&garmcontroller.OrganizationReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("organization-controller"),
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr,
controller.Options{
MaxConcurrentReconciles: config.Config.Operator.OrganizationConcurrency,
},
); err != nil {
return fmt.Errorf("unable to create controller Organization: %w", err)
}

if err = (&controller.RepositoryReconciler{
if err = (&garmcontroller.RepositoryReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("repository-controller"),
}).SetupWithManager(mgr); err != nil {
}).SetupWithManager(mgr,
controller.Options{
MaxConcurrentReconciles: config.Config.Operator.RepositoryConcurrency,
},
); err != nil {
return fmt.Errorf("unable to create controller Repository: %w", err)
}

eventChan := make(chan event.GenericEvent)
runnerReconciler := &controller.RunnerReconciler{
runnerReconciler := &garmcontroller.RunnerReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}

// setup controller so it can reconcile if events from eventChan are queued
if err = runnerReconciler.SetupWithManager(mgr, eventChan); err != nil {
if err = runnerReconciler.SetupWithManager(mgr, eventChan,
controller.Options{
MaxConcurrentReconciles: config.Config.Operator.RunnerConcurrency,
},
); err != nil {
return fmt.Errorf("unable to create controller Runner: %w", err)
}

Expand Down
32 changes: 27 additions & 5 deletions docs/config/configuration-parsing.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ OPERATOR_HEALTH_PROBE_BIND_ADDRESS
OPERATOR_LEADER_ELECTION
OPERATOR_SYNC_PERIOD
OPERATOR_WATCH_NAMESPACE
OPERATOR_ENTERPRISE_CONCURRENCY
OPERATOR_ORGANIZATION_CONCURRENCY
OPERATOR_POOL_CONCURRENCY
OPERATOR_REPOSITORY_CONCURRENCY
OPERATOR_RUNNER_CONCURRENCY
```

## Flags
Expand All @@ -63,6 +69,12 @@ The following flags will be parsed and can be found in the [flags package](../..
--operator-leader-election
--operator-sync-period
--operator-watch-namespace
--operator-enterprise-concurrency
--operator-organization-concurrency
--operator-pool-concurrency
--operator-repository-concurrency
--operator-runner-concurrency
```

### Additional Flags
Expand All @@ -87,11 +99,16 @@ garm:
init: false
email: ""
operator:
metricsbindaddress: :8080
healthprobebindaddress: :8081
leaderelection: false
syncperiod: 5m0s
watchnamespace: garm-operator-system
metrics_bind_address: :8080
health_probe_bind_address: :8081
leader_election: false
sync_period: 5m0s
watch_namespace: garm-operator-system
enterprise_concurrency: 1
organization_concurrency: 3
pool_concurrency: 10
repository_concurrency: 5
runner_concurrency: 20
```

## Config File (yaml)
Expand All @@ -113,6 +130,11 @@ operator:
leader_election: true
sync_period: "10m"
watch_namespace: "garm-operator-namespace"
enterprise_concurrency: 1
organization_concurrency: 3
pool_concurrency: 10
repository_concurrency: 5
runner_concurrency: 20
```
## Configuration Default Values
Expand Down
4 changes: 3 additions & 1 deletion internal/controller/enterprise_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"

Expand Down Expand Up @@ -232,8 +233,9 @@ func (r *EnterpriseReconciler) ensureFinalizer(ctx context.Context, pool *garmop
}

// SetupWithManager sets up the controller with the Manager.
func (r *EnterpriseReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *EnterpriseReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
return ctrl.NewControllerManagedBy(mgr).
For(&garmoperatorv1alpha1.Enterprise{}).
WithOptions(options).
Complete(r)
}
4 changes: 3 additions & 1 deletion internal/controller/organization_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"

Expand Down Expand Up @@ -231,8 +232,9 @@ func (r *OrganizationReconciler) ensureFinalizer(ctx context.Context, pool *garm
}

// SetupWithManager sets up the controller with the Manager.
func (r *OrganizationReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *OrganizationReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
return ctrl.NewControllerManagedBy(mgr).
For(&garmoperatorv1alpha1.Organization{}).
WithOptions(options).
Complete(r)
}
4 changes: 3 additions & 1 deletion internal/controller/pool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -478,7 +479,7 @@ func (r *PoolReconciler) findPoolsForImage(ctx context.Context, obj client.Objec
}

// SetupWithManager sets up the controller with the Manager.
func (r *PoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *PoolReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
// setup index for image
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &garmoperatorv1alpha1.Pool{}, imageField, func(rawObj client.Object) []string {
pool := rawObj.(*garmoperatorv1alpha1.Pool)
Expand All @@ -497,5 +498,6 @@ func (r *PoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
handler.EnqueueRequestsFromMapFunc(r.findPoolsForImage),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
).
WithOptions(options).
Complete(r)
}
4 changes: 3 additions & 1 deletion internal/controller/repository_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"

Expand Down Expand Up @@ -231,8 +232,9 @@ func (r *RepositoryReconciler) ensureFinalizer(ctx context.Context, pool *garmop
}

// SetupWithManager sets up the controller with the Manager.
func (r *RepositoryReconciler) SetupWithManager(mgr ctrl.Manager) error {
func (r *RepositoryReconciler) SetupWithManager(mgr ctrl.Manager, options controller.Options) error {
return ctrl.NewControllerManagedBy(mgr).
For(&garmoperatorv1alpha1.Repository{}).
WithOptions(options).
Complete(r)
}
4 changes: 3 additions & 1 deletion internal/controller/runner_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -192,9 +193,10 @@ func (r *RunnerReconciler) updateRunnerStatus(ctx context.Context, runner *garmo
}

// SetupWithManager sets up the controller with the Manager.
func (r *RunnerReconciler) SetupWithManager(mgr ctrl.Manager, eventChan chan event.GenericEvent) error {
func (r *RunnerReconciler) SetupWithManager(mgr ctrl.Manager, eventChan chan event.GenericEvent, options controller.Options) error {
c, err := ctrl.NewControllerManagedBy(mgr).
For(&garmoperatorv1alpha1.Runner{}).
WithOptions(options).
Build(r)
if err != nil {
return err
Expand Down
17 changes: 11 additions & 6 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ type GarmConfig struct {
}

type OperatorConfig struct {
MetricsBindAddress string `koanf:"metrics_bind_address" validate:"required,hostname_port"`
HealthProbeBindAddress string `koanf:"health_probe_bind_address" validate:"required,hostname_port"`
LeaderElection bool `koanf:"leader_election"`
SyncPeriod time.Duration `koanf:"sync_period" validate:"required"`
WatchNamespace string `koanf:"watch_namespace"`
SyncRunnersInterval time.Duration `koanf:"sync_runners_interval" validate:"gte=5s,lte=5m"`
MetricsBindAddress string `koanf:"metrics_bind_address" validate:"required,hostname_port"`
HealthProbeBindAddress string `koanf:"health_probe_bind_address" validate:"required,hostname_port"`
LeaderElection bool `koanf:"leader_election"`
SyncPeriod time.Duration `koanf:"sync_period" validate:"required"`
WatchNamespace string `koanf:"watch_namespace"`
SyncRunnersInterval time.Duration `koanf:"sync_runners_interval" validate:"gte=5s,lte=5m"`
RunnerConcurrency int `koanf:"runner_concurrency" validate:"gte=1"`
RepositoryConcurrency int `koanf:"repository_concurrency" validate:"gte=1"`
EnterpriseConcurrency int `koanf:"enterprise_concurrency" validate:"gte=1"`
OrganizationConcurrency int `koanf:"organization_concurrency" validate:"gte=1"`
PoolConcurrency int `koanf:"pool_concurrency" validate:"gte=1"`
}

type AppConfig struct {
Expand Down
68 changes: 44 additions & 24 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,17 @@ func TestGenerateConfig(t *testing.T) {
},
wantCfg: AppConfig{
Operator: OperatorConfig{
MetricsBindAddress: ":8080",
HealthProbeBindAddress: ":8081",
LeaderElection: false,
SyncPeriod: 5 * time.Minute,
WatchNamespace: "",
SyncRunnersInterval: 20 * time.Second,
MetricsBindAddress: ":8080",
HealthProbeBindAddress: ":8081",
LeaderElection: false,
SyncPeriod: 5 * time.Minute,
WatchNamespace: "",
SyncRunnersInterval: 20 * time.Second,
RunnerConcurrency: 50,
RepositoryConcurrency: 10,
EnterpriseConcurrency: 1,
OrganizationConcurrency: 5,
PoolConcurrency: 10,
},
Garm: GarmConfig{
Server: "http://localhost:9997",
Expand All @@ -59,12 +64,17 @@ func TestGenerateConfig(t *testing.T) {
},
wantCfg: AppConfig{
Operator: OperatorConfig{
MetricsBindAddress: ":8080",
HealthProbeBindAddress: ":8081",
LeaderElection: false,
SyncPeriod: 5 * time.Minute,
WatchNamespace: "",
SyncRunnersInterval: 5 * time.Second,
MetricsBindAddress: ":8080",
HealthProbeBindAddress: ":8081",
LeaderElection: false,
SyncPeriod: 5 * time.Minute,
WatchNamespace: "",
SyncRunnersInterval: 5 * time.Second,
RunnerConcurrency: 50,
RepositoryConcurrency: 10,
EnterpriseConcurrency: 1,
OrganizationConcurrency: 5,
PoolConcurrency: 10,
},
Garm: GarmConfig{
Server: "http://localhost:9997",
Expand All @@ -91,12 +101,17 @@ func TestGenerateConfig(t *testing.T) {
},
wantCfg: AppConfig{
Operator: OperatorConfig{
MetricsBindAddress: ":8080",
HealthProbeBindAddress: ":8081",
LeaderElection: false,
SyncPeriod: 5 * time.Minute,
WatchNamespace: "",
SyncRunnersInterval: 10 * time.Second,
MetricsBindAddress: ":8080",
HealthProbeBindAddress: ":8081",
LeaderElection: false,
SyncPeriod: 5 * time.Minute,
WatchNamespace: "",
SyncRunnersInterval: 10 * time.Second,
RunnerConcurrency: 50,
RepositoryConcurrency: 10,
EnterpriseConcurrency: 1,
OrganizationConcurrency: 5,
PoolConcurrency: 10,
},
Garm: GarmConfig{
Server: "http://localhost:9997",
Expand All @@ -120,12 +135,17 @@ func TestGenerateConfig(t *testing.T) {
},
wantCfg: AppConfig{
Operator: OperatorConfig{
MetricsBindAddress: ":7000",
HealthProbeBindAddress: ":7001",
LeaderElection: true,
SyncPeriod: 10 * time.Minute,
WatchNamespace: "garm-operator-namespace",
SyncRunnersInterval: 15 * time.Second,
MetricsBindAddress: ":7000",
HealthProbeBindAddress: ":7001",
LeaderElection: true,
SyncPeriod: 10 * time.Minute,
WatchNamespace: "garm-operator-namespace",
SyncRunnersInterval: 15 * time.Second,
RunnerConcurrency: 50,
RepositoryConcurrency: 10,
EnterpriseConcurrency: 1,
OrganizationConcurrency: 5,
PoolConcurrency: 10,
},
Garm: GarmConfig{
Server: "http://garm-server:9997",
Expand Down
7 changes: 7 additions & 0 deletions pkg/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,11 @@ const (
// default values for garm configuration
DefaultGarmInit = true
DefaultGarmEmail = "garm-operator@localhost"

// default values for controller concurrency configuration
DefaultRunnerConcurrency = 50
DefaultRepositoryConcurrency = 10
DefaultEnterpriseConcurrency = 1
DefaultOrganizationConcurrency = 5
DefaultPoolConcurrency = 10
)
Loading

0 comments on commit 835d118

Please sign in to comment.