TL;DR;
Kafka Streams' local state store (rocksDB) could not operate
on windows because of permission issue.
Environment
Key | Value |
OS | Windows |
Kafka clients version | v3.5.1 |
Kafka streams version | v3.5.1 |
JDK | OpenJDK 20.0.2 |
Kotlin version | 1.9.10 |
Source Code
KafkaConfig.kt
@Configuration
@EnableKafka
@EnableKafkaStreams
class KafkaConfig {
companion object {
const val SOURCE_TOPIC: String = "source-topic"
const val SINK_TOPIC: String = "sink-topic"
const val GROUP_ID: String = "process-group"
}
@Bean
fun kafkaTemplate(kafkaProperties: KafkaProperties): KafkaTemplate<String, String> {
return KafkaTemplate(DefaultKafkaProducerFactory(kafkaProperties.buildProducerProperties()))
}
@Bean
fun kafkaProducerFactory(kafkaProperties: KafkaProperties): ProducerFactory<String, String> {
val producerProperties = kafkaProperties.buildProducerProperties()
val config = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to producerProperties.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to producerProperties.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG),
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to producerProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG),
SaslConfigs.SASL_MECHANISM to producerProperties.get(SaslConfigs.SASL_MECHANISM),
SaslConfigs.SASL_JAAS_CONFIG to producerProperties.get(SaslConfigs.SASL_JAAS_CONFIG),
)
return DefaultKafkaProducerFactory(config)
}
@Bean
fun kafkaConsumerFactory(kafkaProperties: KafkaProperties): ConsumerFactory<String, String> {
val consumerProperties = kafkaProperties.buildConsumerProperties()
val config = mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to consumerProperties.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to consumerProperties.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to consumerProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG),
SaslConfigs.SASL_MECHANISM to consumerProperties.get(SaslConfigs.SASL_MECHANISM),
SaslConfigs.SASL_JAAS_CONFIG to consumerProperties.get(SaslConfigs.SASL_JAAS_CONFIG),
)
return DefaultKafkaConsumerFactory(config)
}
@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
fun kafkaStreamsConfig(kafkaProperties: KafkaProperties): KafkaStreamsConfiguration {
val producerProperties = kafkaProperties.buildProducerProperties()
val consumerProperties = kafkaProperties.buildConsumerProperties()
val streamsProperties = kafkaProperties.buildStreamsProperties()
val config = mapOf(
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperties.bootstrapServers,
StreamsConfig.APPLICATION_ID_CONFIG to streamsProperties.get(StreamsConfig.APPLICATION_ID_CONFIG),
StreamsConfig.CLIENT_ID_CONFIG to streamsProperties.get(StreamsConfig.CLIENT_ID_CONFIG),
StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG to Serdes::String.javaClass,
StreamsConfig.STATE_DIR_CONFIG to streamsProperties.get(StreamsConfig.STATE_DIR_CONFIG),
StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG to Serdes::String.javaClass,
StreamsConfig.consumerPrefix(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) to consumerProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG),
StreamsConfig.consumerPrefix(SaslConfigs.SASL_MECHANISM) to consumerProperties.get(SaslConfigs.SASL_MECHANISM),
StreamsConfig.consumerPrefix(SaslConfigs.SASL_JAAS_CONFIG) to consumerProperties.get(SaslConfigs.SASL_JAAS_CONFIG),
StreamsConfig.producerPrefix(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG) to consumerProperties.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG),
StreamsConfig.producerPrefix(SaslConfigs.SASL_MECHANISM) to producerProperties.get(SaslConfigs.SASL_MECHANISM),
StreamsConfig.producerPrefix(SaslConfigs.SASL_JAAS_CONFIG) to producerProperties.get(SaslConfigs.SASL_JAAS_CONFIG),
)
return KafkaStreamsConfiguration(config)
}
}
WordCountStreamService.kt
@Service
class WordCountStreamService {
@Autowired
fun process(streamsBuilder: StreamsBuilder) {
// (1) stream from kafka
val wordCountInput : KStream<String, String> = streamsBuilder.stream(
KafkaConfig.SOURCE_TOPIC,
Consumed.with(Serdes.String(), Serdes.String())
)
val wordCounts = wordCountInput
// (2) map values to lowercase
.mapValues(String::lowercase)
// FIXME: is it required to as `Arrays.toList`?
// (3) FlatMap split by space
.flatMapValues{ words-> words.split(" ")}
// (4) SelectKey set each word as key
.selectKey{_ ,word -> word}
// (5) Group By Key
.groupByKey()
// (6) Store current state at state store
.count(Named.`as`("Word-Count")) // ⚠️ Error occurs
wordCounts.toStream().to(
KafkaConfig.SINK_TOPIC,
Produced.with(Serdes.String(), Serdes.Long())
)
}
}
Error log
ERROR 9416 --- [ main] o.a.k.s.p.internals.StateDirectory : Failed to change permissions for the directory C:\Users\{username}\Downloads\state-store
ERROR 9416 --- [ main] o.a.k.s.p.internals.StateDirectory : Failed to change permissions for the directory C:\Users\{username}\Downloads\state-store\kafka-streams-application
INFO 9416 --- [ main] o.a.k.s.p.internals.StateDirectory : Reading UUID from process file: fbe9005b-007c-4084-b4e6-395292833af9
INFO 9416 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
I checked the permission of target directory (state.dir) and modified it another directory.
However, It was not still valid.
Cause of Error
This error occurs when creating and update local state store (RocksDB) on windows
That system call function uses POSIX windows doesn't support.
Solution
Replace another statestore or Change Operating System.
References
'기타 > Kafka' 카테고리의 다른 글
[Kafka] Avro Serialize/Deserialize (0) | 2023.10.13 |
---|---|
Kafka 설정 방법 on Windows, Mac (0) | 2023.09.13 |