周哥教IT-Kafka基础与实战
Kafka是由Apache软件基金会基于Scala和Java编写开发的一个高吞吐量的分布式发布订阅消息系统(消息中间件的一种),可以处理网站中的所有动作流数据(用户行为:网页浏览,搜索和其他用户的行动),可将这些数据(日志)通过kafka最终落入hadoop,spark等系统,为日志分析,数据挖掘,机器学习等提供大数据来源。
Kafka特性:
通过O(1)的磁盘数据结构提供消息的持久化,对于数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使普通的硬件Kafka也可支持每秒数百万的消息,wicked fast(变态快)。
支持通过Kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载(flume,kafka-connect,gobblin等)。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。消息生产者和消费者彼此独立。消费者可以实时/同步或批量/离线/异步获取消息。
VIP视频课程
Kafka基础与实战
课程课件
kafka课程课件pdf课程参考
Kafka官网kafka golang生产者、消费者代码参考
// golang kafka producer demo
package main
import (
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
"sync"
)
func main() {
config := sarama.NewConfig()
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewRandomPartitioner
client,err := sarama.NewClient([]string{"localhost:9192","localhost:9292","localhost:9392"}, config)
defer client.Close()
if err != nil {
panic(err)
}
producer, err := sarama.NewAsyncProducerFromClient(client)
if err != nil {
panic(err)
}
// Trap SIGINT to trigger a graceful shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
var (
wg sync.WaitGroup
enqueued, successes, errors int
)
wg.Add(1)
// start a groutines to count successes num
go func() {
defer wg.Done()
for range producer.Successes() {
successes++
}
}()
wg.Add(1)
// start a groutines to count error num
go func() {
defer wg.Done()
for err := range producer.Errors() {
log.Println(err)
errors++
}
}()
ProducerLoop:
for {
message := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 123")}
select {
case producer.Input() <- message:
enqueued++
case <-signals:
producer.AsyncClose() // Trigger a shutdown of the producer.
break ProducerLoop
}
}
wg.Wait()
log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
}
// kafka golang consumer demo
package main
import (
"github.com/Shopify/sarama"
"log"
"os"
"os/signal"
)
func main() {
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
client,err := sarama.NewClient([]string{"localhost:9192","localhost:9292","localhost:9392"}, config)
defer client.Close()
if err != nil {
panic(err)
}
consumer, err := sarama.NewConsumerFromClient(client)
defer consumer.Close()
if err != nil {
panic(err)
}
// get partitionId list
partitions,err := consumer.Partitions("my_topic")
if err != nil {
panic(err)
}
for _, partitionId := range partitions{
// create partitionConsumer for every partitionId
partitionConsumer, err := consumer.ConsumePartition("my_topic", partitionId, sarama.OffsetNewest)
if err != nil {
panic(err)
}
go func(pc *sarama.PartitionConsumer) {
defer (*pc).Close()
// block
for message := range (*pc).Messages(){
value := string(message.Value)
log.Printf("Partitionid: %d; offset:%d, value: %s\n",
message.Partition,message.Offset, value)
}
}(&partitionConsumer)
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
}
}
// kafka golang consumer-group demo
package main
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"os"
"os/signal"
"sync"
)
type consumerGroupHandler struct{
name string
}
func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error
{ return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error
{ return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession,
claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("%s Message topic:%q partition:%d offset:%d value:%s\n",
h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
// 手动确认消息
sess.MarkMessage(msg, "")
}
return nil
}
func handleErrors(group *sarama.ConsumerGroup,wg *sync.WaitGroup ){
wg.Done()
for err := range (*group).Errors() {
fmt.Println("ERROR", err)
}
}
func consume(group *sarama.ConsumerGroup,wg *sync.WaitGroup, name string) {
fmt.Println(name + "start")
wg.Done()
ctx := context.Background()
for {
topics := []string{"my_topic"}
handler := consumerGroupHandler{name: name}
err := (*group).Consume(ctx, topics, handler)
if err != nil {
panic(err)
}
}
}
func main(){
var wg sync.WaitGroup
config := sarama.NewConfig()
config.Consumer.Return.Errors = false
config.Version = sarama.V0_10_2_0
client,err := sarama.NewClient([]string{"localhost:9192","localhost:9292","localhost:9392"}, config)
defer client.Close()
if err != nil {
panic(err)
}
group1, err := sarama.NewConsumerGroupFromClient("c1", client)
if err != nil {
panic(err)
}
group2, err := sarama.NewConsumerGroupFromClient("c2", client)
if err != nil {
panic(err)
}
group3, err := sarama.NewConsumerGroupFromClient("c3", client)
if err != nil {
panic(err)
}
defer group1.Close()
defer group2.Close()
defer group3.Close()
wg.Add(3)
go consume(&group1,&wg,"c1")
go consume(&group2,&wg,"c2")
go consume(&group3,&wg,"c3")
wg.Wait()
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
select {
case <-signals:
}
}