package com.arms.config;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;


/*
Zookeeper Cluster
Zookeeper 는 분산 시스템에서 서비스 동기화와 naming registry 를 위해서 사용한다.

주로 하는 일은 Apache Kafka Cluster 의 상태를 추적하고 관리하는 역할을 한다.

이외에도 Zookeeper 는 모든 파티션에서 리더 팔로우 관계를 유지해주는 역할을 하도록 컨트롤러를 선택해주는 역할을 하기도 한다.

Kafka Cluster
Kafka 클러스터는 하나 이상의 Broker 로 구성된다. 주로 하는 역할은 Broker의 Controller 로서 역할을 한다.

Broker
Producer 로 부터 들어온 메시지를 저장하고 Consumer 가 이 메시지를 topic 별로 각 partition 에서 offset 을 기준으로 fetch 할 수 있도록 한다.

이중화를 하고 장애에 대응하는 역할도 한다.

Producer
데이터 스트림을 생산하는 역할을 producer 가 합니다. 토큰 또는 메시지를 생성하고 이를 Kafka 클러스터의 하나 이상의 topic 에 추가로 append 하기 위해 Apache Kafka Producer 를 사용합니다

Consumer
Consumer 는 Consumer Group 에 속해서 topic 을 subscribe 하고 해당 topic 에 있는 partition 에 데이터가 들어있다면 그 데이터를 가지고 오는 역할을 합니다.

Topic 과 partition
카프카에서 Producer 가 보내는 메세지는 topic 으로 분류되고, topic 은 여러개의 파티션으로 나눠 질 수 있다.

파티션내의 한 칸은 로그라고 불린다. 데이터는 한 칸의 로그에 순차적으로 append 된다.

메세지의 상대적인 위치를 나타내는게 offset이다. 배열에서의 index를 생각하면 된다.

Middle-Proxy KafkaConfig == Backend-Core KafkaConfig
(코드가 100% 동일합니다)
```

### 2. 하지만 역할은 정반대!
```
Middle-Proxy:
✅ Producer 사용 (메시지 발행)
❌ Consumer 미사용 (@KafkaListener 없음)

Backend-Core:
❌ Producer 미사용 (설정만 존재)
✅ Consumer 사용 (ReqAddConsumer)
```

### 3. 실제 메시지 플로우
```
Frontend
   │
   ▼ HTTP Request
Middle-Proxy (Producer)
   │ kafkaTemplate.send("requirement", key, value)
   ▼ Kafka Message
Kafka Cluster
   │ Consumer Poll
   ▼
Backend-Core (Consumer)
   │ @KafkaListener
   │ switch (CREATE/UPDATE/DELETE)
   │ internalService.process()
   ▼
Database

 */
@Configuration
@RefreshScope
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers:Kafka00Service:9094,Kafka01Service:9094,Kafka02Service:9094}")
    private String bootstrapServers;

    @Value("${spring.kafka.topic.reqadd:REQADD}")
    private String reqAddTopic;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        return new KafkaAdmin(configs);
    }

    @Bean
    public AdminClient kafkaAdminClient(KafkaAdmin kafkaAdmin) {
        return AdminClient.create(kafkaAdmin.getConfigurationProperties());
    }

    /**
     * REQADD 토픽 생성
     * Partitions: 3, Replication Factor: 1
     */
    @Bean
    public NewTopic reqAddTopic() {
        return new NewTopic(reqAddTopic, 3, (short) 1);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate<>(producerFactory);
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // ✅ 추가 권장 설정
        configs.put(ProducerConfig.ACKS_CONFIG, "all");  // 모든 replica 확인
        configs.put(ProducerConfig.RETRIES_CONFIG, 3);   // 재시도 3회
        configs.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 배치 대기
        configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy"); // 압축
        configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 멱등성

        configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        return new DefaultKafkaProducerFactory<>(configs);
    }

}
