经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Spark » 查看文章
Spark Streaming任务延迟监控及告警
来源:cnblogs  作者:XIAO的博客  时间:2019/11/4 8:28:53  对本文有异议

概述

StreamingListener 是针对spark streaming的各个阶段的事件监听机制。

StreamingListener接口

  1. //需要监听spark streaming中各个阶段的事件只需实现这个特质中对应的事件函数即可
  2. //本身既有注释说明
  3. trait StreamingListener {
  4. /** Called when the streaming has been started */
  5. /** streaming 启动的事件 */
  6. def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }
  7. /** Called when a receiver has been started */
  8. /** 接收启动事件 */
  9. def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
  10. /** Called when a receiver has reported an error */
  11. def onReceiverError(receiverError: StreamingListenerReceiverError) { }
  12. /** Called when a receiver has been stopped */
  13. def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }
  14. /** Called when a batch of jobs has been submitted for processing. */
  15. /** 每个批次提交的事件 */
  16. def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
  17. /** Called when processing of a batch of jobs has started. */
  18. /** 每个批次启动的事件 */
  19. def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
  20. /** Called when processing of a batch of jobs has completed. */
  21. /** 每个批次完成的事件 */
  22. def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
  23. /** Called when processing of a job of a batch has started. */
  24. def onOutputOperationStarted(
  25. outputOperationStarted: StreamingListenerOutputOperationStarted) { }
  26. /** Called when processing of a job of a batch has completed. */
  27. def onOutputOperationCompleted(
  28. outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
  29. }

自定义StreamingListener

功能:监控批次处理时间,若超过阈值则告警,每次告警间隔2分钟

  1. class SparkStreamingDelayListener(private val appName:String, private val duration: Int,private val times: Int) extends StreamingListener{
  2. private val logger = LoggerFactory.getLogger("SparkStreamingDelayListener")
  3. //每个批次完成时执行
  4. override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
  5. val batchInfo = batchCompleted.batchInfo
  6. val processingStartTime = batchCompleted.batchInfo.processingStartTime
  7. val numRecords = batchCompleted.batchInfo.numRecords
  8. val processingEndTime = batchInfo.processingEndTime
  9. val processingDelay = batchInfo.processingDelay
  10. val totalDelay = batchInfo.totalDelay
  11. //将每次告警时间写入redis,用以判断告警间隔大于2分钟
  12. val jedis = RedisClusterClient.getJedisClusterClient()
  13. val current_time = (System.currentTimeMillis / 1000).toInt
  14. val redis_time = jedis.get(appName)
  15. var flag = false
  16. if(redis_time==null || current_time-redis_time.toInt>120){
  17. jedis.set(appName,current_time.toString)
  18. flag = true
  19. }
  20. //若批次处理延迟大于批次时长指定倍数,并且告警间隔大约2分钟,则告警
  21. if(totalDelay.get >= times * duration * 1000 && flag){
  22. val monitorContent = appName+": numRecords ->"+numRecords+",processingDelay ->"+processingDelay.get/1000+" s,totalDelay -> "+totalDelay.get/1000+"s"
  23. println(monitorContent)
  24. val msg = "Streaming_"+appName+"_DelayTime:"+totalDelay.get/1000+"S"
  25. val getURL = "http://node1:8002/message/weixin?msg="+msg
  26. HttpClient.doGet(getURL)
  27. }
  28. }
  29. }

应用

  1. //streamingListener不需要在配置中设置,可以直接添加到streamingContext中
  2. object My{
  3. def main(args : Array[String]) : Unit = {
  4. val sparkConf = new SparkConf()
  5. val ssc = new StreamingContext(sparkConf,Seconds(20))
  6. ssc.addStreamingListener(new SparkStreamingDelayListener("Userid2Redis", duration,times))
  7. ....
  8. }
  9. }

订阅关注微信公众号《大数据技术进阶》,及时获取更多大数据架构和应用相关技术文章!

原文链接:http://www.cnblogs.com/xiaodf/p/11776915.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号