本文介紹了以下內容:
1.什么是Kafka?
2.為什么我們需要使用Kafka這樣的消息系統(tǒng)及使用它的好處
3.如何將Kafka使用到我們的后端設計中。
譯自timber.io:《hello-world-in-kafka-using-python》,有部分刪改。
1.Kafka是什么、為什么我們需要它?
簡而言之,Kafka是一個分布式消息系統(tǒng)。這是什么意思呢?
想象一下,你現(xiàn)在有一個簡單的Web應用,其包含了網頁前端客戶端(Client)、服務端和數(shù)據(jù)庫:

你需要記錄所有發(fā)生在你的Web應用的事件,比如點擊、請求、搜索等,以便后續(xù)進行計算和運營分析。
假設每個事件都由單獨的APP完成,那么一個簡單的解決方案就是將數(shù)據(jù)存儲在數(shù)據(jù)庫中,所有APP連接到數(shù)據(jù)庫進行存儲:

這看起來簡單,但是其中還會出現(xiàn)許多問題:
1.點擊、請求、搜索等事件會產生大量的數(shù)據(jù)到數(shù)據(jù)庫中,這可能會導致插入事件存在延遲。
2.如果選擇將高頻數(shù)據(jù)存儲在SQL或MongoDB等數(shù)據(jù)庫中,很難再原有歷史數(shù)據(jù)的基礎上擴展數(shù)據(jù)庫。
3.如果你需要用這些數(shù)據(jù)進行數(shù)據(jù)分析,你可能無法直接對數(shù)據(jù)庫進行高頻率的讀取操作。
4.每個APP可以遵循自己的數(shù)據(jù)格式,這就意味著當你需要在不同的APP進行數(shù)據(jù)交換時,你需要進行數(shù)據(jù)格式的轉換。
通過使用像Kafka這樣的消息流系統(tǒng),可以很好地解決這些問題,因為他們可以執(zhí)行以下操作:
1.存儲的大量數(shù)據(jù)可以被持久化、校驗和復制,具備容錯能力。
2.支持跨系統(tǒng)實時處理連續(xù)的數(shù)據(jù)流。
3.允許APP獨立發(fā)布數(shù)據(jù)或數(shù)據(jù)流,并與使用它的APP無關。
那么它和傳統(tǒng)數(shù)據(jù)庫有何不同?
盡管Kafka可以持久化地存儲數(shù)據(jù),但它不是數(shù)據(jù)庫。
Kafka不僅允許APP存儲或提取連續(xù)的數(shù)據(jù)流,還支持實時處理。這與對被動數(shù)據(jù)執(zhí)行CRUD操作或對傳統(tǒng)數(shù)據(jù)庫執(zhí)行查詢的方式不同。
聽起來不錯,那么Kafka是如何解決以上挑戰(zhàn)的?
Kafka是一個分布式平臺,是為規(guī)模而構建的,這意味著它可以處理高頻率的讀寫和存儲大量數(shù)據(jù)。它確保數(shù)據(jù)始終可靠。它還支持從故障中恢復的強大機制。
以下是為什么應該使用Kafka的一些關鍵因素:
1.1 簡化后端架構
在Kafka的幫助下,我們前面的結構會變得簡單一些:

1.2 通用數(shù)據(jù)管道
如上所示,Kafka充當多個APP和服務的通用數(shù)據(jù)管道,這給了我們兩個好處:
1.數(shù)據(jù)是集成的,我們將來自不同系統(tǒng)的數(shù)據(jù)都存在一個地方,這使得Kafka成為真正的數(shù)據(jù)源。任何APP都可以將數(shù)據(jù)推送到該平臺,然后由另一個APP提取數(shù)據(jù)。
2.Kafka使得應用程序之間交換數(shù)據(jù)變得容易。因為我們可以標準化數(shù)據(jù)格式,減少了數(shù)據(jù)格式的轉換。
1.3 通用連接性
盡管Kafka允許你使用標準數(shù)據(jù)格式,但并不意味著你的APP就不需要數(shù)據(jù)轉換了,它只是減少了我們轉換數(shù)據(jù)的頻率罷了。
此外,Kafka提供了一個叫 Kafka Connect 的框架允許我們維護遺留的老系統(tǒng)。
1.4 實時數(shù)據(jù)處理
類似于監(jiān)控系統(tǒng)這樣的實時APP,往往需要連續(xù)的數(shù)據(jù)流,這些數(shù)據(jù)需要被立即處理或盡量減少延遲處理。
Kafka的流式處理,使得處理引擎可以在很短的時間內(幾毫米到幾分鐘)內取數(shù)、分析、以及響應。
2.Kafka入門
2.1 安裝
安裝Kafka是一個相當簡單的過程。只需遵循以下給定步驟:
1.下載最新的1.1.0版本的Kafka
2.使用以下命令解壓縮下載文件: tar -xzf kafka_2.11-1.1.0.tgz
3.cd到Kafka目錄開始使用它: cd kafka_2.11-1.1.0
2.2 啟動服務器
ZooKeeper是一個針對Kafka等分布式環(huán)境的集中管理工具,它為大型分布式系統(tǒng)提供配置服務、同步服務及命名注冊表。
因此,我們需要先啟動ZooKeeper服務器,然后再啟動Kafka服務器。使用以下命令即可:
# Start ZooKeeper Server
bin/zookeeper-server-start.sh config/zookeeper.properties
# Start Kafka Server
bin/kafka-server-start.sh config/server.properties
2.3 Kafka 基本概念
我們快速介紹一下Kafka體系結構的核心概念:
1.Kafka在一個或多個服務器上作為集群運行。
2.Kafka將數(shù)據(jù)流存儲在名為topics的類別中。每條數(shù)據(jù)均由鍵、值、時間戳組成。
3.Kafka使用發(fā)布-訂閱模式。它允許某些APP充當producers(生產者),記錄數(shù)據(jù)并將數(shù)據(jù)發(fā)布到Kafka topic中。
同樣,它允許某些APP充當consumer(消費者)和訂閱Kafka topic并處理由它產生的數(shù)據(jù)。
4.除了Prodcuer API 和 Consumer API,Kafka還為應用提供了一個 Streams API 作為流處理器。通過 Connector API 我們可以將Kafka連接到其他現(xiàn)有的應用程序和數(shù)據(jù)系統(tǒng)。
2.4 架構

如你所見,每個Kafka的 Topic 可以分為多個Partition(分區(qū)),可以使用broker(經紀人)在不同的計算機上復制這些 Topic,從而使消費者可以并行讀取 Topic.
kafka的復制是針對分區(qū)的:

比如上圖中有4個broker, 1個topic, 2個分區(qū),復制因子是3。當producer發(fā)送一個消息的時候,它會選擇一個分區(qū),比如topic1-part1分區(qū),將消息發(fā)送給這個分區(qū)的leader, broker2、broker3會拉取這個消息,一旦消息被拉取過來,slave會發(fā)送ack給master,這時候master才commit這個log。
因此,整個系統(tǒng)的容錯級別極高。當系統(tǒng)正常運行時,對Topic的所有讀取和寫入都將通過leader,且leader會保證所有其他broker均被更新。
如果Broker失效了,系統(tǒng)會自動重新配置,此時副本也可以接管成為Leader.
2.5 創(chuàng)建Kafka Topic
讓我們創(chuàng)建一個名為 sample,含有一個partition(分區(qū))和一個replica(副本)的Kafka Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sample
列出所有的Kafka Topics,檢查是否成功創(chuàng)建了sample Topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
describe topics 命令還可以獲得特定Topic的詳細信息:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic sample
2.6 創(chuàng)建生產者與消費者
這里是本章的代碼實戰(zhàn)部分,利用Kafka-Python實現(xiàn)簡單的生產者和消費者。
1.首先需要安裝kafka-python:
pip install kafka-python
2.創(chuàng)建消費者(consumer.py)
from kafka import KafkaConsumer
consumer = KafkaConsumer('sample')
for message in consumer:
print (message)
3.創(chuàng)建生產者(producer.py)
有一個消費者正在訂閱我們的消息流,因此我們要創(chuàng)建一個生產者,發(fā)布消息到Kafka:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('sample', b'Hello, World!')
producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')
現(xiàn)在,你重新運行消費者(consumer.py),你就會接收到生產者發(fā)送過來的消息。
-
存儲
+關注
關注
13文章
4791瀏覽量
90058 -
數(shù)據(jù)庫
+關注
關注
7文章
4020瀏覽量
68342 -
服務端
+關注
關注
0文章
69瀏覽量
7364 -
Web應用
+關注
關注
0文章
16瀏覽量
3683 -
kafka
+關注
關注
0文章
55瀏覽量
5570
發(fā)布評論請先 登錄
探討如何將機器學習應用到物聯(lián)網中
如何將DRIVEDONE屬性傳播到我的mcs文件生成?
如何將RAFL添加到我的項目的適當示例和/或文檔?
IDE如何將外圍頭文件包含在我的項目中并連接到我的項目?
如何將ESP32置于認證過程所需的模式中?
如何將MCU應用到FPGA中:關于FPGA(1)
如何將物聯(lián)網數(shù)據(jù)從設備連接到Kafka集群?
如何將物聯(lián)網數(shù)據(jù)從設備連接到Kafka集群?
如何將轉換器設計指標應用到 Fly-Buck 電路設計中
如何將Kafka使用到我們的后端設計中
評論