От события до дашборда в облаках: практика по созданию потоковой платформы на Kubernetes

Емельянов Сергей

Почему Kubernetes - лучшее решение

  1. Полезен компаниям любого размера
  2. Демократизация
  3. "Вселенная" Kubernetes
Django из мира разработки
Lego из физического мира
Детали конструктора
https://landscape.cncf.io/

Бизнес процесс

%%{init: {'theme': 'light', 'themeVariables': { 'darkMode': false }}}%%
flowchart LR
    A:::wide
    B:::wide
    C:::wide
    D:::wide
    A@{ shape: rounded, label: "Покупки" } -->B@{ shape: rounded, label: "Агрегация" }
    B -->C@{ shape: rounded, label: "Хранение" }
    D@{ shape: rounded, label: "Аналитика" } --> C

    classDef wide font-size:1.4em,font-weight:300
Технический процесс
%%{init: {'theme': 'light', 'themeVariables': { 'darkMode': false }}}%%
flowchart RL
    A:::wide
    B:::wide
    C:::wide
    D:::wide
    E:::wide
    
    A@{ shape: rounded, label: "Generator" } --> B@{ shape: das, label: "Kafka Topic" };
    C@{ shape: rounded, label: "Flink" } --> B
    C --> D@{ shape: cyl, label: "Clickhouse" };
    E@{ shape: rounded, label: "Metabase" } --> D

    classDef wide font-size:1.4em,font-weight:300
Топология
kubectl create namespace operators 
kubectl create namespace common 
kubectl create namespace stage
kubectl create namespace prod
Топология
kubectl get nodes -o custom-columns=NAME:.metadata.name
NAME
di-common-0
di-master-0
di-prod-gz1-0
di-prod-me1-0
di-prod-ms1-0
di-stage-0
Подготовка
Multi AZ storageclass
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
    name: csi-ceph-ssd
    annotations:
    storageclass.kubernetes.io/is-default-class: "true"
parameters:
    type: ceph-ssd
provisioner: cinder.csi.openstack.org
reclaimPolicy: Delete
volumeBindingMode: WaitForFirstConsumer
allowVolumeExpansion: true
Операторы
helm install \
  strimzi-kafka-operator \
  oci://quay.io/strimzi-helm/strimzi-kafka-operator \
  --version 0.47.0 \
  -n operators \
  -f files/strimzi-values.yaml

# Clickhouse operator
helm repo add \
  altinity-clickhouse-operator \
  https://docs.altinity.com/clickhouse-operator/

helm install \
  altinity-clickhouse-operator \
  altinity-clickhouse-operator/altinity-clickhouse-operator \
  --version 0.25.3 \
  -n operators \
  -f files/altinity-values.yaml

# Flink operator
helm repo add \
  flink-operator-repo \
  https://downloads.apache.org/flink/flink-kubernetes-operator-1.12.1

helm install \
  flink-kubernetes-operator \
  flink-operator-repo/flink-kubernetes-operator \
  -n operators \
  -f files/flink-values.yaml

# Envoy gateway
helm install \
  eg \
  oci://docker.io/envoyproxy/gateway-helm \
  --version v1.5.0 \
  -n operators

# Grafana operator
helm repo add \
  grafana \
  https://grafana.github.io/helm-charts

helm install \
  grafana-operator \
  grafana/grafana-operator \
  -n operators

# VictoriaMetrics operator
helm install \
  victoria-metrics \
  oci://ghcr.io/victoriametrics/helm-charts/victoria-metrics-operator \
  -n operators  

# Keda operator
helm repo add \
  kedacore \
  https://kedacore.github.io/charts  

helm install \
  keda \
  kedacore/keda \
  -n operators

# Chaos mest operator
helm repo add \
  chaos-mesh \
  https://charts.chaos-mesh.org

helm install \
  chaos-mesh \
  chaos-mesh/chaos-mesh \
  --version 2.7.2 \
  -n operators \
  -f files/chaos-mesh-values.yaml
Gateway kustomize
resources:
    - gatewayclass.yaml
    - gateway.yaml
    - reference-grant-prod.yaml
    - reference-grant-stage.yaml
    - reference-grant-operators.yaml
    - config-map-index.yaml
    - deployment-index.yaml
    - service-index.yaml
Gateway
apiVersion: gateway.networking.k8s.io/v1
kind: Gateway
metadata:
    name: eg
    namespace: common
spec:
  gatewayClassName: eg
  listeners:
    - name: web
      protocol: HTTP
      port: 80
    - name: metabase-stage
      protocol: HTTP
      port: 8000
    - name: metabase-prod
      protocol: HTTP
      port: 9000
    - name: chaos
      protocol: HTTP
       port: 10000
    - name: grafana
      protocol: HTTP
      port: 8080
    - name: victoria-metrics
      protocol: HTTP
      port: 8428
    - name: victoria-metrics-agent
      protocol: HTTP
      port: 8429
    - name: victoria-metrics-logs
      protocol: HTTP
      port: 9428
Reference grant
apiVersion: gateway.networking.k8s.io/v1beta1
kind: ReferenceGrant
metadata:
  name: rg-common
  namespace: prod
spec:
  from:
    - group: gateway.networking.k8s.io
      kind: HTTPRoute
      namespace: common
  to:
    - group: ""
      kind: Service
Routes kustomize
namespace: common
secretGenerator:
  - name: basic-auth
    options:
      disableNameSuffixHash: true
      files:
        - .htpasswd

resources:
  - http-route-metabase-stage.yaml
  - http-route-metabase-prod.yaml
  - http-route-grafana.yaml
  - http-route-victoria-metrics.yaml
  - http-route-victoria-metrics-agent.yaml
  - http-route-victoria-metrics-logs.yaml
  - http-route-index.yaml
  - security-policy-chaos.yaml
  - http-route-chaos.yaml
HTTP route
apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
metadata:
    name: chaos-dashboard
spec:
    parentRefs:
    - name: eg
        sectionName: chaos
    rules:
    - backendRefs:
        - group: ""
            kind: Service
            name: chaos-dashboard
            port: 2333
            weight: 1
            namespace: operators
        matches:
        - path:
            type: PathPrefix
            value: /
Security policy
apiVersion: gateway.envoyproxy.io/v1alpha1
kind: SecurityPolicy
metadata:
  name: chaos-basic-auth
spec:
  targetRefs:
    - group: gateway.networking.k8s.io
      kind: HTTPRoute
      name: chaos-dashboard
  basicAuth:
    users:
      name: basic-auth
Victoria stack
# Victoria Metrics
helm install \
  victoria-metrics \
  oci://ghcr.io/victoriametrics/helm-charts/victoria-metrics-k8s-stack \
  -n common \
  -f files/victoria-metrics-values.yaml

# Victoria Logs
helm install \
  victoria-logs \
  oci://ghcr.io/victoriametrics/helm-charts/victoria-logs-single \
  -n common \
  -f files/victoria-logs-values.yaml
Victoria metrics values
victoria-metrics-operator:
  enabled: false

fullnameOverride: example

defaultDashboards:
  enabled: true
  grafanaOperator:
    enabled: true
    spec:
      instanceSelector:
        matchLabels:
          dashboards: example
      allowCrossNamespaceImport: true

defaultDatasources:
  enabled: true
  grafanaOperator:
    enabled: true
    spec:
      instanceSelector:
        matchLabels:
          dashboards: example
      allowCrossNamespaceImport: true

vmsingle:
  spec:
    retentionPeriod: "24h"
    storage:
      accessModes:
        - ReadWriteOnce
      storageClassName: "csi-ceph-ssd"
      resources:
        requests:
          storage: 10Gi

grafana:
  enabled: false
  forceDeployDatasource: true

kubeProxy:
  enabled: true
Victoria logs values
dashboards:
  enabled: true

  grafanaOperator:
    enabled: true
    spec:
      allowCrossNamespaceImport: true
      instanceSelector:
        matchLabels:
          dashboards: example

server:
  retentionPeriod: "24h"
  persistentVolume:
    enable: true
    accessModes:
      - ReadWriteOnce
    storageClassName: "csi-ceph-ssd"
    size: 20Gi

  vmServiceScrape:
    enabled: true

vector:
  enabled: true
  tolerations:
    - effect: NoSchedule
      key: env
      operator: Exists
Metrics
Agent
Logs
Scrapes kustomize
namespace: common

resources:
  - service-scrape-clickhouse.yaml
  - service-scrape-keda.yaml
  - pod-scrape-flink.yaml
  - pod-scrape-kafka.yaml
apiVersion: operator.victoriametrics.com/v1beta1
kind: VMPodScrape
metadata:
  name: kafka-brokers
spec:
  namespaceSelector:
    any: true
  podMetricsEndpoints:
    - port: tcp-prometheus
      scheme: http
  selector:
    matchLabels:
      strimzi.io/kind: Kafka
apiVersion: operator.victoriametrics.com/v1beta1
kind: VMServiceScrape
metadata:
  name: keda
spec:
  namespaceSelector:
    any: true
  endpoints:
    - port: metrics
  selector:
    matchLabels:
      app.kubernetes.io/name: keda-operator-metrics-apiserver
Grafana kustomize
namespace: common
configMapGenerator:
- name: grafana-dashboards
    files:
    - dashboard-logs.json
    - dashboard-show.json
    options:
    disableNameSuffixHash: true

resources:
    - grafana.yaml
    - datasource-chaos.yaml
    - datasource-victoria-logs.yaml
    - dashboard-clickhouse.yaml
    - dashboard-flink.yaml
    - dashboard-kafka-exporter.yaml
    - dashboard-keda.yaml
    - dashboard-logs.yaml
    - dashboard-show.yaml
Grafana
apiVersion: grafana.integreatly.org/v1beta1
kind: Grafana
metadata:
    name: example
    labels:
      dashboards: example
spec:
    persistentVolumeClaim:
      spec:
        accessModes:
         - ReadWriteOnce
        resources:
          requests:
            storage: 10Gi
        storageClassName: csi-ceph-ssd
    config:
      security:
        admin_user: admin
        admin_password: admin
    service:
    spec:
      ports:
        - protocol: TCP
          port: 3000
          targetPort: 3000
    deployment:
      spec:
        template:
          spec:
            securityContext:
            fsGroup: 472
            volumes:
              - name: grafana-data
                persistentVolumeClaim:
                  claimName: example-pvc
Flink dashboard
apiVersion: grafana.integreatly.org/v1beta1
kind: GrafanaDashboard
metadata:
  name: flink-dashboard
spec:
  instanceSelector:
    matchLabels:
      dashboards: example
  allowCrossNamespaceImport: true
  grafanaCom:
    id: 14911

Результат:

  • добавили StorageClass, для мультизональной установки
  • установили операторы, которые будут выполнять всю работу
  • добавили Gateway, обеспечили сетевой доступ
  • установили Grafana, чтобы любоваться красивыми графиками
  • добавили VictoriaMetrics и VictoriaLogs, чтобы больше знать о нашей системе
Stage
Kafka kustomize
resources:
  - cluster.yaml
  - node-pool.yaml
  - topic.yaml
Kafka cluster
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: example
  annotations:
    strimzi.io/node-pools: enabled
    strimzi.io/kraft: enabled
spec:
  kafka:
    version: 4.0.0
    metadataVersion: 4.0-IV3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
  entityOperator:
    topicOperator: {}
    userOperator: {}
  kafkaExporter:
    topicRegex: ".*"
    groupRegex: ".*"
Kafka node pool
rapiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: dual-role
  labels:
    strimzi.io/cluster: example
spec:
  template:
    pod: {}
  replicas: 1
  roles:
    - controller
    - broker
  storage:
    type: jbod
    volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        deleteClaim: true
        kraftMetadata: shared
        class: csi-ceph-ssd
  resources:
    limits:
      cpu: "2"
      memory: "2Gi"
    requests:
      memory: "512Mi"
      cpu: "250m"
Kafka topic
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: events
  labels:
    strimzi.io/cluster: example
spec:
  partitions: 5
  replicas: 1
  config:
    retention.ms: 86400000
Clickhouse kustomize
resources:
  - cluster.yaml
  - keeper.yaml
  - migrate-configmap.yaml
  - migrate-job.yaml
Clickhouse
apiVersion: clickhouse.altinity.com/v1
kind: ClickHouseInstallation
metadata:
  name: example
spec:
  defaults:
    templates:
      volumeClaimTemplate: default
      podTemplate: default
  configuration:
    settings:
      mark_cache_size: 524288000
      max_server_memory_usage: 1500000000
    profiles:
      default:
        max_threads: 1
        max_block_size: 8192
        max_download_threads: 1
        input_format_parallel_parsing: 0
        output_format_parallel_formatting: 0
    users:
      default/networks/ip:
        - 0.0.0.0/0
    zookeeper:
      nodes:
        - host: keeper-example
          port: 2181
    clusters:
      - name: replicated
        layout:
          replicasCount: 1
  templates:
    volumeClaimTemplates:
      - name: default
        spec:
          storageClassName: csi-ceph-ssd
          accessModes:
            - ReadWriteOnce
          resources:
            requests:
              storage: 10Gi
    podTemplates:
      - name: default
        spec:
          containers:
            - name: clickhouse-pod
              image: clickhouse/clickhouse-server:24.8
              resources:
                limits:
                  memory: "2048Mi"
                  cpu: "2"
                requests:
                  memory: "512Mi"
                  cpu: "250m"
Clickhouse keeper
apiVersion: clickhouse-keeper.altinity.com/v1
kind: ClickHouseKeeperInstallation
metadata:
  name: example
spec:
  defaults:
    templates:
      volumeClaimTemplate: default
      podTemplate: default
  configuration:
    clusters:
      - name: replicated
        layout:
          replicasCount: 1
  templates:
    volumeClaimTemplates:
      - name: default
        spec:
          storageClassName: csi-ceph-ssd
          accessModes:
            - ReadWriteOnce
          resources:
            requests:
              storage: 500Mi
    podTemplates:
      - name: default
        spec:
          containers:
            - name: clickhouse-pod
              image: clickhouse/clickhouse-server:24.8
              resources:
                limits:
                  memory: "2048Mi"
                  cpu: "2"
                requests:
                  memory: "512Mi"
                  cpu: "250m"
Migrate configmap
apiVersion: v1
kind: ConfigMap
metadata:
  name: clickhouse-migration
data:
  events_table.sql: |
    -- Создание реплицированной таблицы для агрегированных данных пользователей
    CREATE TABLE IF NOT EXISTS user_aggregations ON CLUSTER '{cluster}' (
    user_id String,
    total_amount Float64,
    purchase_count UInt64,
    window_start Int64,
    window_end Int64,
    last_update_time Int64,
    created_at DateTime DEFAULT now()
    ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/user_aggregations', '{replica}')
    ORDER BY (user_id, window_start)
    PARTITION BY toYYYYMM(toDateTime(window_start / 1000));
    
    -- Создание распределенной таблицы для запросов
    CREATE TABLE IF NOT EXISTS user_aggregations_distributed ON CLUSTER '{cluster}' AS user_aggregations
    ENGINE = Distributed('{cluster}', default, user_aggregations, rand());
Migrate job
apiVersion: batch/v1
kind: Job
metadata:
  name: clickhouse-migrate
spec:
  template:
    spec:
      initContainers:
        - name: wait-clickhouse
          image: jwilder/dockerize:v0.9.5
          command: ["dockerize", "-wait", "http://clickhouse-example:8123", "--wait", "tcp://keeper-example:2181", "-timeout", "300s"]
      containers:
        - name: migrate
          image: clickhouse/clickhouse-server:24.8
          command: ["clickhouse-client", "--host", "clickhouse-example", "--queries-file", "/events_table.sql"]
          volumeMounts:
            - mountPath: /events_table.sql
              subPath: events_table.sql
              name: create-table
      volumes:
        - name: create-table
          configMap:
            name: clickhouse-migration
      restartPolicy: Never
  backoffLimit: 4
Metabase kustomize
resources:
  - deployment.yaml
  - service.yaml
  - pvc.yaml
  - setup-job.yaml
Metabase deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: metabase
  labels:
    app: metabase
spec:
  replicas: 1
  selector:
    matchLabels:
      app: metabase
  template:
    metadata:
      labels:
        app: metabase
    spec:
      containers:
      - name: metabase
        image: metabase/metabase:latest
        ports:
          - containerPort: 3000
            name: http
        env:
          - name: MB_DB_TYPE
            value: "h2"
          - name: MB_DB_FILE
            value: "/metabase-data/metabase.db"
          - name: JAVA_TIMEZONE
            value: "UTC"
          - name: MB_ENCRYPTION_SECRET_KEY
            value: "your-secret-key-here-change-in-production"
          - name: MB_SETUP_TOKEN
            value: "0198bcd4-1492-78dd-98f0-261832a0378f"
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "2Gi"
            cpu: "2"
        livenessProbe:
          httpGet:
            path: /api/health
            port: 3000
          initialDelaySeconds: 60
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /api/health
            port: 3000
          initialDelaySeconds: 30
          periodSeconds: 10
        volumeMounts:
          - name: metabase-data
            mountPath: /metabase-data
      volumes:
      - name: metabase-data
        persistentVolumeClaim:
          claimName: metabase-pvc
Metabase setup job
apiVersion: batch/v1
kind: Job
metadata:
  name: metabase-setup
spec:
  template:
    spec:
      initContainers:
        - name: metabase-init-clickhouse
          image: jwilder/dockerize:v0.9.5
          command: [ "dockerize", "-wait", "http://metabase-service:3000", "-timeout", "300s" ]
      containers:
      - name: metabase-setup
        image: curlimages/curl:latest      
        command:
        - /bin/sh
        - -c
        - |          
          echo "Setting up Metabase admin user..."
          curl -X POST http://metabase-service:3000/api/setup \
            -H "Content-Type: application/json" \
            -d '{
              "prefs": {
                "site_name": "Data internals 2025",
                "site_locale": "ru"
              },
              "token": "0198bcd4-1492-78dd-98f0-261832a0378f",
              "user": {
                "first_name": "Admin",
                "last_name": "Admin",
                "email": "admin@example.com",
                "password": "admin123z!"
              }
            }'
          
          echo "Setup completed!"
      restartPolicy: OnFailure
Generator
apiVersion: apps/v1
kind: Deployment
metadata:
  name: generator
  labels:
    app: generator
spec:
  replicas: 1
  selector:
    matchLabels:
      app: generator
  template:
    metadata:
      labels:
        app: generator
    spec:
      initContainers:
        - name: wait-kafka
          image: jwilder/dockerize:v0.9.5
          command: ["dockerize", "-wait", "tcp://example-kafka-bootstrap:9092", "-timeout", "300s"]
      containers:
        - name: generator
          image: totaki/generator:0.0.2
          env:
            - name: KAFKA_BROKERS
              value: "example-kafka-bootstrap:9092"
            - name: GENERATOR_RATE
              value: "1"
            - name: BATCH_SIZE
              value: "1"
            - name: BURST_EVERY_MS
              value: ""
Flink kustomize
resources:
  - deployment.yaml
  - role-binding.yaml
  - service-account.yaml

secretGenerator:
  - name: aws-params
    envs:
      - .env
    options:
      disableNameSuffixHash: true
Flink
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink:1.20
  flinkVersion: v1_20
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    
    # Метрики 
    metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prom.port: 9249-9250

    # Параметры высокой доступности
    execution.checkpointing.interval: 10s
    execution.checkpointing.mode: AT_LEAST_ONCE
    execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
    high-availability.type: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    s3.endpoint: https://hb.ru-msk.vkcloud-storage.ru
    s3.path.style.access: "true"
    s3.endpoint.region: ru-msk
    
    # Стратегия перезапуска для обработки сбоев
    restart-strategy: failure-rate
    restart-strategy.failure-rate.max-failures-per-interval: "5"
    restart-strategy.failure-rate.failure-rate-interval: 5min
    restart-strategy.failure-rate.delay: 10s
  
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    replicas: 1
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: https://data-internals-2025.hb.ru-msk.vkcloud-storage.ru/job/flink-user-aggregation-1.0.5.jar
    parallelism: 2
    upgradeMode: stateless
  podTemplate:
    spec:
      initContainers:
        - name: wait-kafka-clickhouse
          image: jwilder/dockerize:v0.9.5
          command: ["dockerize", "-wait", "tcp://example-kafka-bootstrap:9092", "-wait", "http://clickhouse-example:8123", "-timeout", "300s"]
      containers:
        - name: flink-main-container
          envFrom:
            - secretRef:
                name: aws-params
          env:
            - name: "KAFKA_BROKERS"
              value: "example-kafka-bootstrap:9092"
            - name: "CLICKHOUSE_URL"
              value: "jdbc:clickhouse://clickhouse-example:8123/default"
            - name: ENABLE_BUILT_IN_PLUGINS
              value: flink-s3-fs-presto-1.20.2.jar
            - name: "PROCESSING_DELAY_MS"
              value: "0"
            - name: "CLICKHOUSE_BATCH_SIZE"
              value: "100"
            - name: "CLICKHOUSE_BATCH_INTERVAL_MS"
              value: "50"
          ports:
            - name: prometheus
              containerPort: 9249
              protocol: TCP

Результат:

  • установили Kafka и добавили топик
  • установили Clickhouse
  • накатили миграции Clickhouse
  • установили Metabase
  • инициализировали Metabase
  • запустили Flink job
  • запустили генератор тестовых данных
Kafka metrics
Clickhouse metrics
Metabase
Metabase
Production
Kafka
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: example
spec:
  cruiseControl:
    autoRebalance:
      - mode: add-brokers
        template:
          name: example-add-brokers-rebalancing-template
      - mode: remove-brokers
        template:
          name: example-remove-brokers-rebalancing-template
Kafka node pool
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaNodePool
metadata:
  name: dual-role
spec:
  replicas: 2
  template:
    pod:
      tolerations:
        - key: "env"
          operator: "Equal"
          value: "prod"
          effect: "NoSchedule"
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: strimzi.io/name
                    operator: In
                    values:
                      - example-kafka
              topologyKey: topology.kubernetes.io/zone
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: env
                    operator: In
                    values:
                      - prod
Kafka topic
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: events
spec:
  partitions: 10
  replicas: 3
Clickhouse
- op: replace
  path: /spec/configuration/clusters/0/layout/replicasCount
  value: 2

- op: replace
  path: /spec/templates/podTemplates/0/spec/tolerations
  value:
    - key: "env"
      operator: "Equal"
      value: "prod"
      effect: "NoSchedule"

- op: replace
  path: /spec/templates/podTemplates/0/spec/affinity
  value:
    podAntiAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        - labelSelector:
            matchExpressions:
              - key: clickhouse.altinity.com/chi
                operator: In
                values:
                  - example
          topologyKey: topology.kubernetes.io/zone
    nodeAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        nodeSelectorTerms:
          - matchExpressions:
              - key: env
                operator: In
                values:
                  - prod  
Clickhouse keeper
- op: replace
  path: /spec/configuration/clusters/0/layout/replicasCount
  value: 2

- op: replace
  path: /spec/templates/podTemplates/0/spec/tolerations
  value:
    - key: "env"
      operator: "Equal"
      value: "prod"
      effect: "NoSchedule"

- op: replace
  path: /spec/templates/podTemplates/0/spec/affinity
  value:
    podAntiAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        - labelSelector:
            matchExpressions:
              - key: clickhouse.altinity.com/chk
                operator: In
                values:
                  - example
          topologyKey: topology.kubernetes.io/zone
    nodeAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        nodeSelectorTerms:
          - matchExpressions:
              - key: env
                operator: In
                values:
                  - prod
Metabase
apiVersion: apps/v1
kind: Deployment
metadata:
  name: metabase
spec:
  template:
    spec:
      tolerations:
        - key: "env"
          operator: "Equal"
          value: "prod"
          effect: "NoSchedule"
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: env
                    operator: In
                    values:
                      - prod    
Flink
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  flinkConfiguration:
    execution.checkpointing.savepoint-dir: s3://data-internals-2025/prod/savepoints
    state.checkpoints.dir: s3://data-internals-2025/prod/checkpoints
    high-availability.storageDir: s3://data-internals-2025/prod/ha
  podTemplate:
    spec:
      tolerations:
        - key: "env"
          operator: "Equal"
          value: "prod"
          effect: "NoSchedule"
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: env
                    operator: In
                    values:
                      - prod
Role binding
- op: replace
  path: /subjects/0/namespace
  value: prod

Результат:

  • добавили реплики для Kafka и Clickhouse
  • добавили авто балансировку для Kafka
  • растянули на несколько зон доступности
Масштабирование
Ручное
namespace: prod
resources:
  - ../base

patches:
  - target:
      kind: ClickHouseInstallation
      name: example
    patch: |-
      - op: replace
        path: /spec/configuration/clusters/0/layout/replicasCount
        value: 3

  - target:
      kind: ClickHouseKeeperInstallation
      name: example
    patch: |-
      - op: replace
        path: /spec/configuration/clusters/0/layout/replicasCount
        value: 3

  - target:
      kind: KafkaNodePool
      name: dual-role
    patch: |-
      - op: replace
        path: /spec/replicas
        value: 3

HorizontalPodAutoscaler (HPA)

y = x * (zc/zt)

  • x - текущее количество реплик
  • zc - текущие значение метрки
  • zt - пороговое значение метрики
Keda generator
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: generator-scaler
spec:
  scaleTargetRef:
    name: generator
    kind: Deployment
  pollingInterval: 10
  cooldownPeriod: 120
  minReplicaCount: 1
  maxReplicaCount: 3
  advanced:
    horizontalPodAutoscalerConfig:                   
      behavior:
        scaleUp:
          stabilizationWindowSeconds: 120
        scaleDown:
          stabilizationWindowSeconds: 120
  triggers:
    - type: prometheus
      metadata:
        serverAddress: http://vmsingle-example.common.svc.cluster.local:8428/prometheus
        query: 1 + max(0, (1 - (count(kube_pod_info{namespace="prod", pod=~"generator.*"}) - count(kube_pod_info{namespace="prod", created_by_name="basic-example"}))) * 0.2)
        threshold: '1'
Keda Flink
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: flink-kafka-scaler
spec:
  scaleTargetRef:
    name: basic-example
    kind: FlinkDeployment
    apiVersion: flink.apache.org/v1beta1
  pollingInterval: 10
  cooldownPeriod: 120
  minReplicaCount: 1
  maxReplicaCount: 3
  advanced:
    horizontalPodAutoscalerConfig:                   
      behavior:
        scaleDown:
          stabilizationWindowSeconds: 120
  triggers:
    - type: prometheus
      metricType: Value
      metadata:
        serverAddress: http://vmsingle-example.common.svc.cluster.local:8428/prometheus
        query: max(1, 1.2 - max(0, (1 - (count(kube_pod_info{namespace="prod", pod=~"generator.*"}) - count(kube_pod_info{namespace="prod", created_by_name="basic-example"}))) * 0.2))
        threshold: '1'
Keda stop flink
- op: add
  path: /metadata/annotations
  value:
    autoscaling.keda.sh/paused-replicas: "1"
Keda stop generator
- op: replace
  path: /spec/triggers/0/metadata/query
  value: 'max(0, 1 - (count(kube_pod_info{namespace="prod", pod=~"generator.*"}) - count(kube_pod_info{namespace="prod", created_by_name="basic-example"})) * 0.2)'  

Результат:

  • масштабировали Kafka
  • масштабировали Clickhouse
  • посмотрели, как работает автоматическое масштабирование
Хаос
PodChaos
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
  name: pod-failure-example
spec:
  action: pod-failure
  mode: fixed-percent
  value: "100"
  duration: '120s'
  selector:
    nodeSelectors:
      mcs.mail.ru/mcs-nodepool: prod-ms1

Результат:

  • посмотрели на Chaos Mesh и его возможности
  • провели эксперимент с недоступностью одной зоны доступности
Chaos
Chaos

Что можно дальше посмотреть:

  • если не любите облака, то дистрибутивы (k3s, k0s)
  • если хотите больше автоматизации, то GitOps (ArgoCD, Flux)
  • если не хотите kubectl, то ищите "ваш" инструмент (tanka, terraform, pullumi)

А где найти?

https://gitverse.ru/emelianovss/kube-for-data-2025

Спасибо за внимание!

t.me/totaki