נבנה בזמן אמת: העברת הודעות נתונים גדולות עם אפאצ'י קפקא, חלק 1

כשהתחילה תנועת הנתונים הגדולים היא התמקדה בעיקר בעיבוד אצווה. כלי אחסון נתונים ושאילתות מבוזרים כמו MapReduce, Hive ו- Pig תוכננו כולם לעבד נתונים בקבוצות ולא ברציפות. עסקים היו מפעילים מספר משרות מדי לילה כדי לחלץ נתונים ממסד נתונים, ואז מנתחים, משנים ובסופו של דבר מאחסנים את הנתונים. לאחרונה ארגונים גילו את העוצמה של ניתוח ועיבוד נתונים ואירועים תוך כדי התרחשותם , לא פעם בכמה שעות. עם זאת, רוב מערכות המסר המסורתיות אינן מתרחבות בכדי להתמודד עם נתונים גדולים בזמן אמת. אז מהנדסים בלינקדאין בנו אפאצ'ה קפקא ומקור פתוח: מסגרת העברת הודעות מבוזרת העונה על דרישות הביג דאטה על ידי קנה המידה בחומרת סחורות.

במהלך השנים האחרונות התפתח אפאצ'י קפקא כדי לפתור מגוון מקרי שימוש. במקרה הפשוט ביותר, זה יכול להיות מאגר פשוט לאחסון יומני יישומים. בשילוב עם טכנולוגיה כמו הזרמת ניצוצות, ניתן להשתמש בה כדי לעקוב אחר שינויים בנתונים ולנקוט פעולה בנתונים אלה לפני שמירתם ליעד סופי. מצב החיזוי של קפקא הופך אותו לכלי רב עוצמה לאיתור הונאות, כמו למשל לבדוק את תוקפה של עסקה בכרטיס אשראי כשזו מתרחשת, ולא לחכות לעיבוד אצווה לאחר מכן.

הדרכה דו-חלקית זו מציגה את קפקא, החל כיצד להתקין ולהפעיל אותו בסביבת הפיתוח שלך. תקבל סקירה כללית על הארכיטקטורה של קפקא, ואחריה מבוא לפיתוח מערכת העברת הודעות אפאצ'ה קפקא מחוץ לקופסה. לבסוף, תבנה יישום מותאם אישית של יצרן / צרכנים ששולח וצורך הודעות דרך שרת Kafka. במחצית השנייה של ההדרכה תלמד כיצד לחלק ולקבץ הודעות וכיצד לשלוט באילו הודעות צרכן קפקא יצרכ.

מה זה אפאצ'י קפקא?

אפאצ'י קפקא היא מערכת העברת הודעות המיועדת להגדלה של נתונים גדולים. בדומה ל- Apache ActiveMQ או ל- RabbitMq, Kafka מאפשר ליישומים הבנויים בפלטפורמות שונות לתקשר באמצעות העברת הודעות אסינכרוניות. אך קפקא שונה ממערכות המסרים המסורתיות הללו בדרכים עיקריות:

  • הוא נועד להתאמה אופקית על ידי הוספת שרתי סחורות נוספים.
  • זה מספק תפוקה גבוהה בהרבה הן עבור תהליכים של יצרנים והן של צרכנים.
  • ניתן להשתמש בו לתמיכה הן במקרי שימוש בזמן אמת והן בזמן אמת.
  • הוא אינו תומך ב- JMS, ה- API של תוכנת התיווך המכוונת להודעות של Java.

האדריכלות של אפאצ'י קפקא

לפני שנחקור את הארכיטקטורה של קפקא, כדאי שתכירו את המינוח הבסיסי שלה:

  • מפיק הוא תהליך שיכול לפרסם הודעה לנושא.
  • צרכן הוא תהליך שיכול להירשם לנושאים אחד או יותר ולצרוך הודעות שפורסמו לנושאים.
  • לקטגוריית נושא היא השם של ההזנה שאלי הודעות מתפרסמות.
  • ברוקר הוא תהליך לרוץ על מחשב בודד.
  • אשכול הוא קבוצה של ברוקרים שעובדים יחד.

הארכיטקטורה של אפאצ'י קפקא פשוטה מאוד, מה שיכול לגרום לביצועים ותפוקה טובים יותר במערכות מסוימות. כל נושא בקפקא הוא כמו קובץ יומן פשוט. כאשר מפיק מפרסם הודעה, שרת קפקא מוסיף אותה בסוף קובץ היומן עבור הנושא הנתון שלו. השרת גם מקצה קיזוז , שהוא מספר המשמש לזיהוי קבוע של כל הודעה. ככל שמספר ההודעות גדל, הערך של כל קיזוז עולה; למשל אם המפיק מפרסם שלוש הודעות הראשונה עשויה לקבל קיזוז של 1, השנייה קיזוז של 2 והשלישית קיזוז של 3.

כאשר צרכן קפקא מתחיל לראשונה, הוא ישלח בקשת משיכה לשרת, בבקשה לאחזר כל הודעה עבור נושא מסוים עם ערך קיזוז גבוה מ 0. השרת יבדוק את קובץ היומן של הנושא ויחזיר את שלוש ההודעות החדשות . הצרכן יעבד את ההודעות, ואז ישלח בקשה להודעות עם קיזוז גבוה מ- 3, וכן הלאה.

ב- Kafka הלקוח אחראי לזכור את ספירת הקיזוזים ואחזור ההודעות. שרת Kafka אינו עוקב אחר ניהול צריכת הודעות או ניהולו. כברירת מחדל, שרת Kafka ישמור הודעה במשך שבעה ימים. שרשור רקע בשרת בודק ומוחק הודעות שגילן שבעה ימים ומעלה. צרכן יכול לגשת להודעות כל עוד הן בשרת. הוא יכול לקרוא הודעה מספר פעמים, ואפילו לקרוא הודעות בסדר קבלה הפוך. אך אם הצרכן לא מצליח לאחזר את ההודעה לפני שבעת הימים, הוא יחמיץ את המסר הזה.

מדדי קפקא

שימוש בייצור על ידי לינקדאין וארגונים אחרים הראה כי בתצורה נכונה אפאצ'י קפקא מסוגל לעבד מאות גיגה בייט של נתונים מדי יום. בשנת 2011, שלושה מהנדסי לינקדאין השתמשו בבדיקות ביצועים כדי להוכיח כי קפקא יכול להשיג תפוקה גבוהה בהרבה מ- ActiveMQ ו- RabbitMQ.

התקנה והדגמה מהירה של אפאצ'י קפקא

אנו בונים יישום מותאם אישית במדריך זה, אך נתחיל בהתקנה ובדיקה של מופע קפקא אצל יצרן וצרכן מהקופסה.

  1. בקר בדף ההורדה של קפקא כדי להתקין את הגרסה העדכנית ביותר (0.9 נכון לכתיבת שורות אלה).
  2. חלץ את הקבצים הבינאריים software/kafkaלתיקיה. לגרסה הנוכחית זה software/kafka_2.11-0.9.0.0.
  3. שנה את הספרייה הנוכחית כך שתצביע על התיקיה החדשה.
  4. התחל השרת שומר גן החיות על ידי ביצוע הפקודה: bin/zookeeper-server-start.sh config/zookeeper.properties.
  5. התחל שרת קפקא על ידי ביצוע: bin/kafka-server-start.sh config/server.properties.
  6. צור נושא הבחינה כי אתה יכול להשתמש לבדיקה: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld.
  7. התחל צרכן קונסולה פשוט שיכול לצרוך הודעות שפורסמו על נושא מסוים, כגון javaworld: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning.
  8. התחל את קונסולת מפיק פשוט שיכול לפרסם הודעות לנושא הבדיקה: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld.
  9. נסה להקליד הודעה אחת או שתיים במסוף המפיק. ההודעות שלך אמורות להופיע במסוף הצרכנים.

יישום לדוגמא עם Apache Kafka

ראית איך אפאצ'י קפקא עובד מהקופסה. לאחר מכן, בואו נפתח יישום מותאם אישית למפיק / לצרכן. המפיק ישיג קלט משתמשים מהקונסולה וישלח כל שורה חדשה כהודעה לשרת Kafka. הצרכן ישלים הודעות לנושא נתון וידפיס אותם למסוף. רכיבי המפיק והצרכן במקרה זה הם היישומים שלך kafka-console-producer.shושל kafka-console-consumer.sh.

נתחיל ביצירת Producer.javaכיתה. מחלקת לקוח זו מכילה לוגיקה לקריאת קלט משתמשים מהקונסולה ושליחת קלט זה כהודעה לשרת קפקא.

אנו מגדירים את המפיק על ידי יצירת אובייקט java.util.Propertiesמהמחלקה והגדרת המאפיינים שלו. מחלקת ProducerConfig מגדירה את כל המאפיינים השונים הקיימים, אך ערכי ברירת המחדל של קפקא מספיקים לרוב השימושים. עבור תצורת ברירת המחדל עלינו להגדיר רק שלושה מאפיינים חובה:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)מגדיר רשימה של מארח: זוגות יציאות המשמשים ליצירת החיבורים הראשוניים לאשכול Kakfa host1:port1,host2:port2,...בפורמט. גם אם יש לנו יותר מתווך אחד באשכול קפקא שלנו, עלינו רק לציין את הערך של המתווך הראשון host:port. לקוח קפקא ישתמש בערך זה כדי לבצע שיחת גילוי למתווך, שתחזיר רשימה של כל המתווכים באשכול. מומלץ לציין יותר מתווך אחד ב- BOOTSTRAP_SERVERS_CONFIG, כך שאם אותו מתווך ראשון לא יהיה הלקוח יוכל לנסות מתווכים אחרים.

שרת קפקא מצפה להודעות byte[] key, byte[] valueבפורמט. במקום להמיר כל מפתח וערך, הספרייה בצד לקוח של קפקא מאפשרת לנו להשתמש בסוגים וידידותיים כמו Stringו intלשליחת הודעות. הספריה תמיר אותם לסוג המתאים. לדוגמא, לאפליקציה לדוגמה אין מפתח ספציפי להודעה, לכן נשתמש ב- null עבור המפתח. עבור הערך נשתמש ב- a String, שהם הנתונים שהזין המשתמש במסוף.

להגדרת תצורת מפתח ההודעה , הגדרנו ערך KEY_SERIALIZER_CLASS_CONFIGעל org.apache.kafka.common.serialization.ByteArraySerializer. זה עובד מכיוון שאין צורך להמיר nullbyte[] . עבור ערך ההודעה , הגדרנו VALUE_SERIALIZER_CLASS_CONFIGאת ה- org.apache.kafka.common.serialization.StringSerializer, כי המחלקה הזו יודעת להמיר a Stringל- a byte[].

אובייקט מפתח / ערך מותאם אישית

בדומה ל- StringSerializer, Kafka מספק סדרתיות עבור פרימיטיבים אחרים כמו intו- long. על מנת להשתמש באובייקט מותאם אישית עבור המפתח או הערך שלנו, עלינו ליצור יישום מחלקה org.apache.kafka.common.serialization.Serializer. לאחר מכן נוכל להוסיף לוגיקה בכדי לסדר את הכיתה byte[]. נצטרך להשתמש גם בניתוק deserial המקביל בקוד הצרכן שלנו.

מפיק קפקא

After filling the Properties class with the necessary configuration properties, we can use it to create an object of KafkaProducer. Whenever we want to send a message to the Kafka server after that, we'll create an object of ProducerRecord and call the KafkaProducer's send() method with that record to send the message. The ProducerRecord takes two parameters: the name of the topic to which message should be published, and the actual message. Don't forget to call the Producer.close() method when you're done using the producer:

Listing 1. KafkaProducer

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer"); org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, line); producer.send(rec); line = in.nextLine(); } in.close(); producer.close(); } } 

Configuring the message consumer

Next we'll create a simple consumer that subscribes to a topic. Whenever a new message is published to the topic, it will read that message and print it to the console. The consumer code is quite similar to the producer code. We start by creating an object of java.util.Properties, setting its consumer-specific properties, and then using it to create a new object of KafkaConsumer. The ConsumerConfig class defines all the properties that we can set. There are just four mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Just as we did for the producer class, we'll use BOOTSTRAP_SERVERS_CONFIG to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format.

As I previously noted, the Kafka server expects messages in byte[] key and byte[] value formats, and has its own implementation for serializing different types into byte[]. Just as we did with the producer, on the consumer side we'll have to use a custom deserializer to convert byte[] back into the appropriate type.

במקרה של יישום לדוגמא, אנו יודעים שהמפיק משתמש ByteArraySerializerלמפתח StringSerializerולערך. בצד הלקוח אנו צריכים להשתמש org.apache.kafka.common.serialization.ByteArrayDeserializerעבור המפתח org.apache.kafka.common.serialization.StringDeserializerועבור הערך. הגדרה אלה כיתות כערכים עבור KEY_DESERIALIZER_CLASS_CONFIGו VALUE_DESERIALIZER_CLASS_CONFIGיאפשר לצרכן deserialize byte[]סוגי מקודדים נשלחו על ידי היצרן.

לבסוף, עלינו לקבוע את הערך של ה- GROUP_ID_CONFIG. זה צריך להיות שם קבוצה בפורמט מחרוזת. אני אסביר עוד רגע על התצורה הזו. לעת עתה, רק הסתכל על צרכן קפקא עם ארבעת המאפיינים החובה שנקבעו: