首页 > Kafka 阅读:756,920

周哥教IT-Kafka基础与实战

Kafka是由Apache软件基金会基于Scala和Java编写开发的一个高吞吐量的分布式发布订阅消息系统(消息中间件的一种),可以处理网站中的所有动作流数据(用户行为:网页浏览,搜索和其他用户的行动),可将这些数据(日志)通过kafka最终落入hadoop,spark等系统,为日志分析,数据挖掘,机器学习等提供大数据来源。

Kafka特性:
通过O(1)的磁盘数据结构提供消息的持久化,对于数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使普通的硬件Kafka也可支持每秒数百万的消息,wicked fast(变态快)。
支持通过Kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载(flume,kafka-connect,gobblin等)。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。消息生产者和消费者彼此独立。消费者可以实时/同步或批量/离线/异步获取消息。

VIP视频课程

Kafka基础与实战 

课程课件

kafka课程课件pdf

课程参考

Kafka官网

kafka golang生产者、消费者代码参考


// golang kafka producer demo
package main

import (
    "github.com/Shopify/sarama"
    "log"
    "os"
    "os/signal"
    "sync"
)

func main() {
    config := sarama.NewConfig()

    config.Producer.Return.Successes = true
    config.Producer.Partitioner = sarama.NewRandomPartitioner

    client,err := sarama.NewClient([]string{"localhost:9192","localhost:9292","localhost:9392"}, config)
    defer client.Close()
    if err != nil {
        panic(err)
    }
    producer, err := sarama.NewAsyncProducerFromClient(client)
    if err != nil {
        panic(err)
    }

    // Trap SIGINT to trigger a graceful shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    var (
        wg        sync.WaitGroup
        enqueued, successes, errors int
    )

    wg.Add(1)
    // start a groutines to count successes num
    go func() {
        defer wg.Done()
        for range producer.Successes() {
            successes++
        }
    }()

    wg.Add(1)
    // start a groutines to count error num
    go func() {
        defer wg.Done()
        for err := range producer.Errors() {
            log.Println(err)
            errors++
        }
    }()

ProducerLoop:
    for {
        message := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 123")}
        select {
        case producer.Input() <- message:
            enqueued++

        case <-signals:
            producer.AsyncClose() // Trigger a shutdown of the producer.
            break ProducerLoop
        }
    }

    wg.Wait()

    log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
}


// kafka golang consumer demo
package main

import (
    "github.com/Shopify/sarama"
    "log"
    "os"
    "os/signal"
)

func main()  {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    client,err := sarama.NewClient([]string{"localhost:9192","localhost:9292","localhost:9392"}, config)
    defer client.Close()
    if err != nil {
        panic(err)
    }
    consumer, err := sarama.NewConsumerFromClient(client)

    defer consumer.Close()
    if err != nil {
        panic(err)
    }
    // get partitionId list
    partitions,err := consumer.Partitions("my_topic")
    if err != nil {
        panic(err)
    }

    for _, partitionId := range partitions{
        // create partitionConsumer for every partitionId
        partitionConsumer, err := consumer.ConsumePartition("my_topic", partitionId, sarama.OffsetNewest)
        if err != nil {
            panic(err)
        }

        go func(pc *sarama.PartitionConsumer) {
            defer (*pc).Close()
            // block
            for message := range (*pc).Messages(){
                value := string(message.Value)
                log.Printf("Partitionid: %d; offset:%d, value: %s\n", 
				    message.Partition,message.Offset, value)
            }

        }(&partitionConsumer)
    }
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    select {
    case <-signals:

    }
}


// kafka golang consumer-group demo
package main

import (
    "context"
    "fmt"
    "github.com/Shopify/sarama"
    "os"
    "os/signal"
    "sync"
)
type consumerGroupHandler struct{
    name string
}

func (consumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error   
{ return nil }
func (consumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error 
{ return nil }
func (h consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, 
    claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("%s Message topic:%q partition:%d offset:%d  value:%s\n",
		    h.name, msg.Topic, msg.Partition, msg.Offset, string(msg.Value))
        // 手动确认消息
        sess.MarkMessage(msg, "")
    }
    return nil
}

func handleErrors(group *sarama.ConsumerGroup,wg  *sync.WaitGroup ){
    wg.Done()
    for err := range (*group).Errors() {
        fmt.Println("ERROR", err)
    }
}

func consume(group *sarama.ConsumerGroup,wg  *sync.WaitGroup, name string) {
    fmt.Println(name + "start")
    wg.Done()
    ctx := context.Background()
    for {
        topics := []string{"my_topic"}
        handler := consumerGroupHandler{name: name}
        err := (*group).Consume(ctx, topics, handler)
        if err != nil {
            panic(err)
        }
    }
}

func main(){
    var wg sync.WaitGroup
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = false
    config.Version = sarama.V0_10_2_0
    client,err := sarama.NewClient([]string{"localhost:9192","localhost:9292","localhost:9392"}, config)
    defer client.Close()
    if err != nil {
        panic(err)
    }
    group1, err := sarama.NewConsumerGroupFromClient("c1", client)
    if err != nil {
        panic(err)
    }
    group2, err := sarama.NewConsumerGroupFromClient("c2", client)
    if err != nil {
        panic(err)
    }
    group3, err := sarama.NewConsumerGroupFromClient("c3", client)
    if err != nil {
        panic(err)
    }
    defer group1.Close()
    defer group2.Close()
    defer group3.Close()
    wg.Add(3)
    go consume(&group1,&wg,"c1")
    go consume(&group2,&wg,"c2")
    go consume(&group3,&wg,"c3")
    wg.Wait()
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)
    select {
    case <-signals:
    }
}

底部图片广告2_PC

周哥教IT,一个分享编程知识的公众号。跟着周哥一起学习,每天都有进步。

通俗易懂,深入浅出,一篇文章只讲一个知识点。

文章不烧脑细胞,人人都可以学习。

当你决定关注「周哥教IT」,你已然超越了90%的程序员!

IT黄埔-周哥教IT技术交流QQ群:213774841, 期待您的加入!

二维码
微信扫描二维码关注