כיצד לבנות יישומי סטרימינג בסטטס עם Apache Flink

פביאן היסקה הוא חבר וחבר PMC בפרויקט Apache Flink ומייסד שותף של Data Artisans.

אפאצ'י פלינק הוא מסגרת ליישום יישומי עיבוד זרם סטטיסטיים ולהפעלתם בקנה מידה באשכול מחשוב. במאמר קודם בדקנו מהו עיבוד סטרים סטטיסטי, לאילו מקרי שימוש הוא מטפל ומדוע עליכם ליישם ולהריץ את יישומי הזרמתכם באמצעות Apache Flink.

במאמר זה אציג דוגמאות לשני מקרי שימוש נפוצים של עיבוד זרם סטטיסטי ונדון כיצד ניתן ליישם אותם עם Flink. מקרה השימוש הראשון הוא יישומים מונעי אירועים, כלומר יישומים אשר בולעים זרמי אירועים רציפים ומיישמים היגיון עסקי כלשהו באירועים אלה. השני הוא מקרה השימוש בניתוח סטרימינג, בו אציג שתי שאילתות אנליטיות המיושמות באמצעות ה- SQL API של Flink, המאגדות נתוני סטרימינג בזמן אמת. אנו ב- Data Artisans מספקים את קוד המקור של כל הדוגמאות שלנו במאגר GitHub ציבורי.

לפני שנצלול לפרטי הדוגמאות, אציג את זרם האירועים הנבלע על ידי יישומי הדוגמה ואסביר כיצד תוכלו להריץ את הקוד שאנו מספקים.

זרם של אירועי נסיעה במונית

יישומי הדוגמה שלנו מבוססים על מערך נתונים ציבורי אודות נסיעות במוניות שאירעו בעיר ניו יורק בשנת 2013. מארגני ה- DEBS 2015 (הכנס הבינלאומי ACM למערכות מבוססות אירועים מבוזרים) ארגנו מחדש את מערך הנתונים המקורי והמירו אותו קובץ CSV יחיד ממנו אנו קוראים את תשעת השדות הבאים.

  • מדליון - מזהה סכום MD5 של המונית
  • Hack_license - מזהה סכום MD5 של רישיון המונית
  • Pickup_datetime - הזמן בו נאספו הנוסעים
  • Dropoff_datetime - הזמן בו הורדו הנוסעים
  • Pickup_lengthitude - קו האורך של מיקום האיסוף
  • Pickup_latitude - קו הרוחב של מיקום האיסוף
  • אורך ירידה - אורך מיקום ההורדה
  • Dropoff_latitude - קו הרוחב של מיקום ההורדה
  • סכום כולל - סך הכל ששולם בדולרים

קובץ ה- CSV שומר את הרשומות בסדר עולה של מאפיין זמן ההורדה שלהם. מכאן שניתן להתייחס לקובץ כאל יומן מסודר של אירועים שפורסמו עם סיום הטיול. כדי להפעיל את הדוגמאות שאנו מספקים ב- GitHub, עליך להוריד את מערך הנתונים של אתגר ה- DEBS מ- Google Drive.

כל היישומים לדוגמה קוראים ברצף את קובץ ה- CSV ומבלעים אותו כזרם של אירועי נסיעה במונית. משם והלאה, היישומים מעבדים את האירועים בדיוק כמו כל זרם אחר, כלומר כמו זרם שנבלע מהמערכת מבוססת יומן הרשמה לפרסום, כמו Apache Kafka או Kinesis. למעשה, קריאת קובץ (או כל סוג אחר של נתונים קבועים) והתייחסות אליו כאל זרם הם אבן יסוד בגישתו של פלינק לאיחוד עיבוד אצווה וזרם.

הפעלת דוגמאות Flink

כאמור, פרסמנו את קוד המקור של יישומי הדוגמה שלנו במאגר GitHub. אנו ממליצים לך למזלג ולשבט את המאגר. ניתן לבצע את הדוגמאות בקלות מתוך IDE שבחרת; אתה לא צריך להגדיר ולהגדיר אשכול Flink כדי להריץ אותם. ראשית, ייבא את קוד המקור של הדוגמאות כפרויקט Maven. לאחר מכן בצע את המחלקה הראשית של יישום וספק את מיקום האחסון של קובץ הנתונים (ראה קישור להורדת הנתונים לעיל) כפרמטר של תוכנית.

לאחר שהפעלת יישום, הוא יתחיל מופע Flink מקומי ומוטמע בתוך תהליך ה- JVM של היישום וישלח את היישום לביצועו. תראה חבורה של הצהרות יומן בזמן ש- Flink מתחיל ומשימות התפקיד מתוזמנות. לאחר היישום פועל, הפלט שלו ייכתב לפלט הסטנדרטי.

בניית יישום מונחה אירועים בפלינק

עכשיו, בואו נדון במקרה השימוש הראשון שלנו, שהוא יישום מונחה אירועים. יישומים מונעי אירועים בולעים זרמים של אירועים, מבצעים חישובים עם קבלת האירועים ועשויים לפלוט אירועים חדשים או לעורר פעולות חיצוניות. ניתן להרכיב יישומים מונעי אירועים מרובים על ידי חיבורם יחד באמצעות מערכות יומן אירועים, בדומה לאופן שבו ניתן להרכיב מערכות גדולות ממיקרו-שירותים. יישומים מונחי אירועים, יומני אירועים ותמונות מצב של יישומים (המכונים Savepoints ב- Flink) כוללים דפוס עיצוב חזק מאוד מכיוון שתוכל לאפס את מצבם ולהשמיע מחדש את קלטם כדי להתאושש מכשל, לתקן באג או להעביר יישום לאשכול אחר.

במאמר זה נבחן יישום מונחה אירועים המגבה שירות, העוקב אחר שעות העבודה של נהגי המוניות. בשנת 2016, ועדת המוניות והלימוזינה של ניו יורק החליטה להגביל את שעות העבודה של נהגי המוניות למשמרות של 12 שעות ולדרוש הפסקה של שמונה שעות לפחות לפני תחילת המשמרת הבאה. משמרת מתחילה עם תחילת הנסיעה הראשונה. מכאן ואילך, נהג עשוי להתחיל בנסיעות חדשות בתוך חלון של 12 שעות. היישום שלנו עוקב אחר נסיעות הנהגים, מסמן את זמן הסיום של חלון 12 שעות שלהם (כלומר, הזמן בו הם עשויים להתחיל את הנסיעה האחרונה), ומסמן נסיעות שהפרו את התקנה. אתה יכול למצוא את קוד המקור המלא של דוגמה זו במאגר GitHub שלנו.

היישום שלנו מיושם באמצעות ה- DataStream API של Flink ו- KeyedProcessFunction. ה- API של DataStream הוא ממשק API פונקציונלי ומבוסס על הרעיון של זרמי נתונים שהוקלדו. A DataStreamהוא הייצוג ההגיוני של זרם אירועים מסוג T. זרם מעובד על ידי הפעלת פונקציה המייצרת זרם נתונים אחר, אולי מסוג אחר. Flink מעבד זרמים במקביל על ידי הפצת אירועים לזרם מחיצות והחלת מופעים שונים של פונקציות על כל מחיצה.

קטע הקוד הבא מציג את הזרימה ברמה הגבוהה של יישום הניטור שלנו.

// לבלוע זרם של נסיעות במוניות.

נסיעות DataStream = TaxiRides.getRides (env, inputPath);

זרם נתונים התראות = רוכב

   // זרם מחיצה לפי מזהה רישיון הנהיגה

   .keyBy (r -> r.licenseId)

   // לפקח על אירועי נסיעה וליצור התראות

   .process (MonitorWorkTime חדש ());

// הודעות הדפסה

notifications.print ();

היישום מתחיל לבלוע זרם של אירועי נסיעה במונית. בדוגמה שלנו, האירועים נקראים מקובץ טקסט, מנתחים ומאוחסנים TaxiRideבאובייקטים של POJO. יישום בעולם האמיתי בדרך כלל יבלע את האירועים מתור הודעות או יומן אירועים, כגון Apache Kafka או Pravega. השלב הבא הוא מפתח את TaxiRideהאירועים על ידי licenseIdהנהג. keyByמחיצות הפעולה נחל על המגרש הכריז, כזה שכל אירועים עם אותו המפתח מעובדים על ידי אותו בערכאה המקבילה של הפונקציה הבאה. במקרה שלנו, אנו licenseIdנפרדים על המגרש מכיוון שאנו רוצים לעקוב אחר זמן העבודה של כל נהג בודד.

לאחר מכן, אנו מיישמים את MonitorWorkTimeהפונקציה על TaxiRideהאירועים המחולקים . הפונקציה עוקבת אחר הנסיעות לכל נהג ועוקבת אחר משמרות וזמני ההפסקה שלהם. הוא פולט אירועים מסוגם Tuple2, כאשר כל צמרת מייצגת התראה המורכבת ממזהה הרישיון של הנהג והודעה. לבסוף, היישום שלנו פולט את ההודעות על ידי הדפסתן לפלט הסטנדרטי. יישום בעולם האמיתי יכתוב את ההודעות להודעה או למערכת אחסון חיצונית, כמו אפאצ'י קפקא, HDFS או מערכת מסד נתונים, או יפעיל שיחה חיצונית שתדחוף אותם מיד.

כעת, לאחר שדנו בזרימה הכוללת של היישום, בואו נסתכל על MonitorWorkTimeהפונקציה, המכילה את מרבית ההיגיון העסקי בפועל של היישום. MonitorWorkTimeהפונקציה היא מצב KeyedProcessFunctionכי בולע TaxiRideאירועים ופולטים Tuple2רשום. KeyedProcessFunctionהממשק כולל שתי שיטות נתון תהליך: processElement()ו onTimer(). processElement()השיטה נקראת עבור כל אירוע מגיע. onTimer()השיטה נקראת כאשר שריפות טיימר רשום בעבר. הקטע הבא מציג את שלד MonitorWorkTimeהפונקציה וכל מה שמוצהר מחוץ לשיטות העיבוד.

מחלקה סטטית ציבורית MonitorWorkTime

    מרחיב את KeyedProcessFunction {

  // קבועי זמן באלפיות השנייה

  גמר סטטי פרטי פרטי ארוך ALLOWED_WORK_TIME = 12 * 60 * 60 * 1000; // 12 שעות

  סופי סטטי פרטי פרטי REQ_BREAK_TIME = 8 * 60 * 60 * 1000; // 8 שעות

  סופי סטטי פרטי ארוך CLEAN_UP_INTERVAL = 28 * 60 * 60 * 1000; // 24 שעות

 מעצב DateTimeFormatter חולף פרטי;

  // ידית מדינה לאחסון זמן ההתחלה של משמרת

  ValueState shiftStart;

  @Override

  חלל ציבורי פתוח (תצורת תצורה) {

    // רשום ידית מדינה

    shiftStart = getRuntimeContext (). getState (

      ValueStateDescriptor חדש ("shiftStart", Types.LONG));

    // אתחל את מעצב הזמן

    this.formatter = DateTimeFormat.forPattern ("yyyy-MM-dd HH: mm: ss");

  }

  // processElement () ו- onTimer () נדונים בהרחבה להלן.

}

הפונקציה מצהירה על מספר קבועים עבור מרווחי זמן באלפיות השנייה, מעצב זמן וידית מצב עבור מצב מקוד שמנוהל על ידי פלינק. מצב מנוהל נבדק מעת לעת ומשוחזר אוטומטית במקרה של כשל. מצב מקשים מאורגן לכל מפתח, כלומר פונקציה תשמור על ערך אחד לכל ידית ומפתח. במקרה שלנו, MonitorWorkTimeהפונקציה שומרת על Longערך עבור כל מקש, כלומר עבור כל אחד licenseId. shiftStartהמדינה מאחסנת את שעת ההתחלה של השינוי של נהג. הידית של המדינה מאותחל open()בשיטה, הנקראת פעם אחת לפני שעיבוד האירוע הראשון.

עכשיו, בואו נסתכל על processElement()השיטה.

@Override

תהליך בטל ציבורי

    נסיעה במונית,

    הקשר CTX,

    אַסְפָן יוצא) זורק חריג {

  // חפש את זמן ההתחלה של המשמרת האחרונה

  התחלה ארוכה = shiftStart.value ();

  אם (startTs == null ||

    startTs <ride.pickUpTime - (ALLOWED_WORK_TIME + REQ_BREAK_TIME)) {

    // זו הנסיעה הראשונה של משמרת חדשה.

    startTs = ride.pickUpTime;

    shiftStart.update (startTs);

    end ends = startTs + ALLOWED_WORK_TIME;

    out.collect (Tuple2.of (ride.licenseId,

      "מותר לך לקבל נוסעים חדשים עד" + formatter.print (endTs)));

    // רשום טיימר כדי לנקות את המדינה תוך 24 שעות

    ctx.timerService (). registerEventTimeTimer (startTs + CLEAN_UP_INTERVAL);

  } אחר אם (startTs <ride.pickUpTime - ALLOWED_WORK_TIME) {

    // נסיעה זו התחילה לאחר סיום זמן העבודה המותר.

    // זו הפרה של התקנות!

    out.collect (Tuple2.of (ride.licenseId,

      "נסיעה זו הפרה את תקנות זמן העבודה."));

  }

}

processElement()השיטה נקראת עבור כל TaxiRideאירוע. ראשית, השיטה מביאה את זמן ההתחלה של מעבר הנהג מידית המדינה. אם המדינה אינה מכילה זמן התחלה ( startTs == null) או אם המשמרת האחרונה החלה יותר מ -20 שעות ( ALLOWED_WORK_TIME + REQ_BREAK_TIME) לפני הנסיעה הנוכחית, הנסיעה הנוכחית היא הנסיעה הראשונה במשמרת חדשה. בשני המקרים, הפונקציה מתחילה משמרת חדשה על ידי עדכון זמן ההתחלה של המשמרת לשעת ההתחלה של הנסיעה הנוכחית, שולחת הודעה לנהג עם שעת הסיום של המשמרת החדשה, ורושמת טיימר לניקוי המדינה תוך 24 שעות.

אם הנסיעה הנוכחית אינה הנסיעה הראשונה של משמרת חדשה, הפונקציה בודקת אם היא מפרה את תקנת זמן העבודה, כלומר האם היא התחילה יותר מ 12 שעות מאוחר יותר מתחילת המשמרת הנוכחית של הנהג. אם זה המקרה, הפונקציה משדרת הודעה ליידע את הנהג על ההפרה.

processElement()השיטה של MonitorWorkTimeהפונקציה רושמת טיימר כדי לנקות את המדינה 24 שעות לאחר תחילת משמרת. הסרת מצב שכבר אינו נחוץ חשובה למניעת הגדלות גדלים של המדינה עקב מצב דולף. טיימר נורה כאשר זמן היישום עובר את חותמת הזמן של הטיימר. בשלב זה onTimer()נקראת השיטה. בדומה למצב, טיימרים נשמרים לפי מפתח, והפונקציה מוכנסת להקשר של המפתח המשויך לפני onTimer()קריאת השיטה. לפיכך, כל הגישה למדינה מופנית למפתח שהיה פעיל בעת רישום הטיימר.

בואו נסתכל על onTimer()השיטה של MonitorWorkTime.

@Override

בטל פומבי onTimer (

    טיימר ארוך,

    OnTimerContext ctx,

    אַסְפָן יוצא) זורק חריג {

  // הסר את מצב המשמרת אם כבר לא התחילה משמרת חדשה.

  התחלה ארוכה = shiftStart.value ();

  if (startTs == timerTs - CLEAN_UP_INTERVAL) {

    shiftStart.clear ();

  }

}

processElement()טיימרים רושם שיטה 24 שעות לאחר משמרת התחילו לנקות המדינה כי אינו נחוץ עוד. ניקוי המדינה הוא ההיגיון היחיד onTimer()שהשיטה מיישמת. כאשר טיימר יורה, אנו בודקים אם הנהג התחיל משמרת חדשה בינתיים, כלומר האם זמן התחלת המשמרת השתנה. אם זה לא המקרה, אנו מנקים את מצב המשמרת עבור הנהג.