以下内容是针对在
Ambari上启用了Kerberos认证的情况下处理的。
首先开启
ranger权限管理image.png
配置
admin用户权限image.png
控制台使用方法
消费者
kafka-console-consumer.sh --bootstrap-server t2.demo.com:9092 --topic test_hello --security-protocol PLAINTEXTSASL
生产者
kafka-console-producer.sh --broker-list storm2.starsriver.cn:9092 --topic test_hello --property key.separator=' ' --security-protocol PLAINTEXTSASL
启动消费者
[root@t1 bin]# kafka-console-consumer.sh --bootstrap-server t2.demo.com:9092 --topic test_hello --security-protocol PLAINTEXTSASL hi I'm lake
启动生产品
[root@t1 bin]# kafka-console-producer.sh --broker-list t2.demo.com:9092 --topic test_hello --property key.separator=' ' --security-protocol PLAINTEXTSASL >hi I'm lake >
Java API 方式
导入依赖包
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '1.0.1'compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.0.1'
kafka_jass.conf配置文件
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
doNotPrompt=true
useTicketCache=true
principal="admin/[email protected]"
useKeyTab=true
serviceName="kafka"
keyTab="/etc/security/keytabs/admin.keytab"
client=true;
};生产者
@Testpublic void testProducer(){
System.setProperty("java.security.auth.login.config", "/path/kafka_jass.conf");
System.setProperty("java.security.krb5.conf","/etc/krb5.conf");
System.setProperty("java.security.krb5.debug","true");
Properties props = new Properties();
props.put("metadata.broker.list", "t2.demo.com,t3.demo.com,t4.demo.com");
props.put("bootstrap.servers", "t2.demo.com:9092,t3.demo.com:9092,t4.demo.com:9092");
props.put("key.serializer",StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("request.required.acks", "1");
props.put("security.protocol", "SASL_PLAINTEXT");
KafkaProducer producer = new KafkaProducer(props);
producer.send(new ProducerRecord<String,String>("test_hello","hi","lake"));
producer.close();
}消费者
@Testpublic void testConsumer(){
System.setProperty("java.security.krb5.conf","/etc/krb5.conf");
System.setProperty("java.security.auth.login.config","/path/kafka_jass.conf");
Properties props = new Properties();
props.put("group.id", "hi");
props.put("bootstrap.servers", "t2.demo.com:9092,t3.demo.com:9092,t4.demo.com:9092");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("request.required.acks", "1");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("test_hello")); for(;;){
ConsumerRecords<String,String> list = consumer.poll(100); for(ConsumerRecord<String,String> record : list){
System.out.println(record.key()+":"+record.value());
}
}
}
作者:dounine
链接:https://www.jianshu.com/p/0e8c5108452e
點擊查看更多內容
為 TA 點贊
評論
評論
共同學習,寫下你的評論
評論加載中...
作者其他優質文章
正在加載中
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦

