נבנה בזמן אמת: העברת הודעות נתונים גדולות עם אפאצ'י קפקא, חלק 2

במחצית הראשונה של מבוא זה ל- JavaWorld ל- Apache Kafka, פיתחת כמה יישומי יצרנים / צרכנים בקנה מידה קטן באמצעות Kafka. מתרגילים אלה עליכם להכיר את היסודות של מערכת ההודעות אפאצ'י קפקא. במחצית השנייה זו תלמד כיצד להשתמש במחיצות כדי להפיץ עומס ולדרג את היישום שלך בצורה אופקית, תוך טיפול במיליוני הודעות ביום. תוכלו ללמוד גם כיצד קפקא משתמש בקיזוזי הודעות כדי לעקוב אחר וניהול עיבוד מסרים מורכב וכיצד להגן על מערכת העברת ההודעות של אפאצ'י קפקא מפני כישלון במקרה של צרכן. אנו נפתח את יישום הדוגמה מחלק 1 הן למקרי שימוש לפרסום כמנוי והן לנקודה לנקודה.

מחיצות באפאצ'י קפקא

ניתן לחלק את נושאי הקפקא למחיצות. לדוגמה, בעת יצירת נושא בשם הדגמה, אתה יכול להגדיר אותו כך שיהיה לו שלוש מחיצות. השרת ייצור שלושה קבצי יומן, אחד לכל אחת ממחיצות ההדגמה. כאשר מפיק פרסם הודעה לנושא, הוא יקצה מזהה מחיצה להודעה זו. לאחר מכן השרת היה מצרף את ההודעה לקובץ היומן עבור אותה מחיצה בלבד.

אם אז התחלת שני צרכנים, השרת עשוי להקצות מחיצות 1 ו -2 לצרכן הראשון, ומחיצה 3 לצרכן השני. כל צרכן היה קורא רק מהמחיצות שהוקצו לו. אתה יכול לראות את נושא ההדגמה שהוגדר לשלוש מחיצות באיור 1.

כדי להרחיב את התרחיש, דמיין אשכול קפקא עם שני מתווכים, השוכן בשתי מכונות. כאשר אתה מחלק את נושא ההדגמה, היית מגדיר אותו כך שיהיה לו שתי מחיצות ושתי עותקים משוכפלים. עבור סוג זה של תצורה, שרת Kafka יקצה את שתי המחיצות לשני המתווכים באשכול שלך. כל מתווך יהיה המנהיג של אחת המחיצות.

כאשר מפיק פרסם הודעה, הוא היה עובר למנהיג המחיצה. המנהיג היה לוקח את ההודעה ומצרף אותה לקובץ היומן במכונה המקומית. המתווך השני ישכפל באופן פסיבי את יומן ההתחייבות למכונה שלו. אם מנהיג המחיצה יירד, המתווך השני יהפוך למנהיג החדש ויתחיל לשרת בקשות לקוח. באותו אופן, כאשר צרכן ישלח בקשה למחיצה, בקשה זו תעבור תחילה למוביל המחיצה, שיחזיר את ההודעות המבוקשות.

היתרונות של חלוקה

שקול את היתרונות של חלוקת מערכת העברת הודעות מבוססת קפקא:

  1. מדרגיות : במערכת עם מחיצה אחת בלבד, הודעות המתפרסמות לנושא נשמרות בקובץ יומן רישום הקיים במחשב יחיד. מספר ההודעות לנושא חייב להתאים לקובץ יומן התחייבות יחיד, וגודל ההודעות המאוחסנות לעולם לא יכול להיות יותר משטח הדיסק של המכונה. חלוקת נושא מאפשרת לך לשנות את קנה המידה של המערכת על ידי אחסון הודעות במכונות שונות באשכול. אם ברצונך לאחסן 30 ג'יגה-בתים (GB) של הודעות לנושא הדגמה, למשל, תוכל לבנות אשכול קפקא של שלוש מכונות, שלכל אחת מהן 10 GB של שטח דיסק. אז תגדיר את הנושא כך שיהיו שלוש מחיצות.
  2. איזון עומסי שרתים : בעל מחיצות מרובות מאפשר לך להפיץ בקשות הודעה על פני מתווכים. לדוגמא, אם היה לך נושא שעיבד מיליון הודעות בשנייה, תוכל לחלק אותו ל 100 מחיצות ולהוסיף 100 מתווכים לאשכול שלך. כל מתווך יהיה המוביל למחיצה אחת, האחראי לענות על 10,000 בקשות לקוח בלבד בשנייה.
  3. איזון עומס צרכני : בדומה לאיזון עומסי שרתים, אירוח מספר צרכנים במכונה שונה מאפשר לך להפיץ את עומס הצרכנים. נניח שרציתם לצרוך מיליון הודעות לשנייה מתוך נושא עם 100 מחיצות. אתה יכול ליצור 100 צרכנים ולהפעיל אותם במקביל. שרת קפקא ייעד מחיצה אחת לכל אחד מהצרכנים, וכל צרכן יעבד 10,000 הודעות במקביל. מכיוון שקפקא מקצה כל מחיצה לצרכן אחד בלבד, בתוך המחיצה כל מסר ייצרך לפי הסדר.

שתי דרכים לחלוקה

המפיק אחראי להחליט לאיזו מחיצה הודעה תעבור. למפיק שתי אפשרויות לשלוט במשימה זו:

  • מחיצה מותאמת אישית : ניתן ליצור מחלקה המיישמת את org.apache.kafka.clients.producer.Partitionerהממשק. מנהג Partitionerזה יישם את ההיגיון העסקי כדי להחליט היכן נשלחים הודעות.
  • DefaultPartitioner : אם אתה לא יוצר מחלקת מחיצות מותאמת אישית, כברירת מחדל org.apache.kafka.clients.producer.internals.DefaultPartitionerישתמש בכיתה. מחצן המחדל המוגדר כברירת מחדל מספיק טוב לרוב המקרים, ומספק שלוש אפשרויות:
    1. ידני : כאשר אתה יוצר a ProducerRecord, השתמש בבנאי העמוס new ProducerRecord(topicName, partitionId,messageKey,message)כדי לציין מזהה מחיצה.
    2. Hashing (רגיש למיקום) : כאשר אתה יוצר a ProducerRecord, ציין a על messageKeyידי שיחה new ProducerRecord(topicName,messageKey,message). DefaultPartitionerישתמש בחשיש המפתח כדי להבטיח שכל ההודעות עבור אותו מפתח יועברו לאותו מפיק. זו הגישה הקלה והנפוצה ביותר.
    3. ריסוס (איזון עומסים אקראי) : אם אינך רוצה לשלוט לאילו הודעות מחיצה עוברות, פשוט התקשר new ProducerRecord(topicName, message)כדי ליצור שלך ProducerRecord. במקרה זה המחיצה תשלח הודעות לכל המחיצות באופן סיבובי, והבטיח עומס שרת מאוזן.

מחיצה על יישום Apache Kafka

לדוגמא היצרן / צרכנית הפשוטה בחלק 1, השתמשנו ב- DefaultPartitioner. כעת ננסה ליצור מחיצה מותאמת אישית במקום. לדוגמא זו, נניח שיש לנו אתר קמעונאי בו הצרכנים יכולים להשתמש כדי להזמין מוצרים בכל מקום בעולם. בהתבסס על השימוש אנו יודעים שרוב הצרכנים נמצאים בארצות הברית או בהודו. אנו רוצים לחלק את בקשתנו לשליחת הזמנות מארה"ב או הודו לצרכנים שלהם, בעוד שהזמנות מכל מקום אחר יועברו לצרכן שלישי.

כדי להתחיל, ניצור CountryPartitionerיישום המיישם את org.apache.kafka.clients.producer.Partitionerהממשק. עלינו ליישם את השיטות הבאות:

  1. קפקא יקרא configure () כאשר אנו מאותחלים את Partitionerהמחלקה, עם Mapמאפייני תצורה. שיטה זו מאתחלת פונקציות ספציפיות ללוגיקה העסקית של היישום, כגון חיבור למסד נתונים. במקרה זה אנו רוצים מחיצה די גנרית שלוקחת countryNameכנכס. לאחר מכן configProperties.put("partitions.0","USA")נוכל למפות את זרימת ההודעות למחיצות. בעתיד נוכל להשתמש בפורמט זה כדי לשנות את המדינות שיקבלו מחיצה משלהן.
  2. ה- ProducerAPI קורא למחיצה () פעם אחת לכל הודעה. במקרה זה נשתמש בה כדי לקרוא את ההודעה ולנתח את שם המדינה מההודעה. אם שם המדינה הוא ב countryToPartitionMap, היא תחזור partitionIdמאוחסנת ב Map. אם לא, זה יהיה hash את ערך המדינה וישתמש בו כדי לחשב לאיזו מחיצה היא צריכה ללכת.
  3. אנו קוראים לסגור () כדי לכבות את המחיצה. שימוש בשיטה זו מבטיח כי כל המשאבים שנרכשו במהלך האתחול מנוקים במהלך הכיבוי.

שים לב שכאשר קפקא מתקשר configure(), המפיק של קפקא יעביר את כל המאפיינים שהגדרנו עבור המפיק Partitionerלמחלקה. חשוב לקרוא רק את המאפיינים שמתחילים איתם partitions., לנתח אותם בכדי לקבל את partitionIdהמאגר ולשמור את המזהה countryToPartitionMap.

להלן היישום המותאם אישית שלנו של Partitionerהממשק.

רישום 1. CountryPartitioner

 public class CountryPartitioner implements Partitioner { private static Map countryToPartitionMap; public void configure(Map configs) { System.out.println("Inside CountryPartitioner.configure " + configs); countryToPartitionMap = new HashMap(); for(Map.Entry entry: configs.entrySet()){ if(entry.getKey().startsWith("partitions.")){ String keyName = entry.getKey(); String value = (String)entry.getValue(); System.out.println( keyName.substring(11)); int paritionId = Integer.parseInt(keyName.substring(11)); countryToPartitionMap.put(value,paritionId); } } } public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.availablePartitionsForTopic(topic); String valueStr = (String)value; String countryName = ((String) value).split(":")[0]; if(countryToPartitionMap.containsKey(countryName)){ //If the country is mapped to particular partition return it return countryToPartitionMap.get(countryName); }else { //If no country is mapped to particular partition distribute between remaining partitions int noOfPartitions = cluster.topics().size(); return value.hashCode()%noOfPartitions + countryToPartitionMap.size() ; } } public void close() {} } 

Producerבכיתה ב Listing 2 (להלן) דומה מאוד המפיק פשוט שלנו חלק 1, עם שני שינויים בהדגשה:

  1. הגדרנו מאפיין תצורה עם מפתח השווה לערך ProducerConfig.PARTITIONER_CLASS_CONFIG, התואם את השם המלא של CountryPartitionerהכיתה שלנו . גם קבענו countryNameל partitionId, ובכך למפות את המאפיינים שאנחנו רוצים להעביר CountryPartitioner.
  2. אנו מעבירים מופע של מחלקה המיישמת את org.apache.kafka.clients.producer.Callbackהממשק כטיעון שני producer.send()לשיטה. לקוח קפקא יתקשר onCompletion()לשיטה שלו לאחר פרסום בהצלחה של הודעה, המצרפת RecordMetadataאובייקט. נוכל להשתמש באובייקט זה כדי לגלות לאיזו מחיצה נשלחה הודעה, כמו גם את הקיזוז שהוקצה להודעה שפורסמה.

רישום 2. מפיק מחולק

 public class Producer { private static Scanner in; public static void main(String[] argv)throws Exception { if (argv.length != 1) { System.err.println("Please specify 1 parameters "); System.exit(-1); } String topicName = argv[0]; in = new Scanner(System.in); System.out.println("Enter message(type exit to quit)"); //Configure the Producer Properties configProperties = new Properties(); configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer"); configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");  configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName()); configProperties.put("partition.1","USA"); configProperties.put("partition.2","India");  org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties); String line = in.nextLine(); while(!line.equals("exit")) { ProducerRecord rec = new ProducerRecord(topicName, null, line); producer.send(rec, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset()); ; } }); line = in.nextLine(); } in.close(); producer.close(); } } 

הקצאת מחיצות לצרכנים

שרת קפקא מבטיח כי מחיצה מוקצית לצרכן אחד בלבד, ובכך מבטיח את סדר צריכת המסרים. ניתן להקצות ידנית מחיצה או להקצות אותה אוטומטית.

אם ההיגיון העסקי שלך דורש שליטה רבה יותר, יהיה עליך להקצות מחיצות באופן ידני. במקרה זה היית KafkaConsumer.assign()מעביר לשרת Kakfa רשימה של מחיצות שכל צרכן היה מעוניין בהן.

הקצאת מחיצות אוטומטית היא ברירת המחדל והנפוצה ביותר. במקרה זה, שרת Kafka יקצה מחיצה לכל צרכן, ויקצה מחיצות מחדש בקנה מידה לצרכנים חדשים.

נניח שאתה יוצר נושא חדש עם שלוש מחיצות. כשתתחיל את הצרכן הראשון לנושא החדש, קפקא יקצה את כל שלוש המחיצות לאותו צרכן. אם לאחר מכן תתחיל צרכן שני, קפקא ייחס מחדש את כל המחיצות, ויקצה מחיצה אחת לצרכן הראשון ושתי המחיצות הנותרות לצרכן השני. אם תוסיף צרכן שלישי, Kafka ישוב להקצות את המחיצות שוב, כך שלכל צרכן מוקצה מחיצה אחת. לבסוף, אם תתחיל לצרכן רביעי וחמישי, אז לשלושה מהצרכנים תהיה מחיצה שהוקצתה, אך האחרים לא יקבלו שום הודעה. אם אחת משלוש המחיצות הראשוניות תרד, קפקא ישתמש באותו לוגיקת מחיצה כדי להקצות מחדש את מחיצת הצרכן לאחד הצרכנים הנוספים.