storm 处理后数据的几种输出方式

一、问题描述:

Storm主要用于分析处理实时数据流,本身没有给出具体数据输出方案。
一般来说,实际中使用Storm处理可能只是作为整体的一部分,还需要将Storm的处理结果反馈给其他业务或系统。那么根据实际的不同需求,尝试用多种方案来解决Storm数据输出问题。

二、可能方案:

由于Storm需要快速分析处理实时数据流,如果当前处理的数据需要与之前的数据交互,显然将之前的数据缓存在节点(内存)中是不太合理的,特别是当处理的数据量级很大。
本文主要讨论两种类型的数据输出方法:
第一种:数据输出后,不再与当前Storm任务有联系。
第二种:数据输出后,后续可能还需要与当前Storm任务进行交互。

第一种场景:

如果输出的数据量不大,后续依赖业务对该数据的需求时间不是特别紧迫。可考虑将结果写入到数据库,例如MySQL表中。若后续业务需求比较紧迫,则可以将该数据写入到HBase中。
如果输出的数据量较大,且允许有少量数据丢失,则可以用线程的方式发送,再在监听服务器端接收。若本身不允许数据有丢失,则可以考虑用TCP方式发送接收,或者更便捷的就是使用Kafka消息队列,在Storm任务的节点中写入,接收端只需要监听是否有新的消息被放入队列。

下面给出用线程方式发送消息的参考代码:

public class SendMsgBolt extends BaseBasicBolt{
	/** log info. */
	private static Logger log = LoggerFactory.getLogger(SendMsgBolt.class);

	/** msg queue. */
	private ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();

		@Override
	public void prepare(Map stormConf, TopologyContext context) {
		try {
			/* 创建发送日志线程对象 */
			sendThread = new SendMSGThread();
			sendThread.start();
		} catch (MqException e) {
			e.printStackTrace();
			log.error(this.getClass().getName(), e);
		}
	}

	@Override
	public void execute(Tuple input, BasicOutputCollector collector) {
		/* bolt节点主要用于处理实时数据流,并将处理后的消息添加到queue队列中 */
		String msg = NULL;
		// msg 就是最细粒度,待发送的数据
		queue.offer(msg);
	}
	
	@Override
	public void cleanup() {
		sendThread.setRunFlag(false);
	}

	/**
	 * Send msg by UDP.
	 */
	public class SendMSGThread extends Thread {
		private boolean run = true;
		private int counter = 0;
		private final int sendSize = 700;

		public SendMSGThread() {
			this.run = true;
		}

		@Override
		public void run() {
			while (run) {
				try {
					String msg = queue.poll();
					if (msg != null ) {	//poll
						// 此处调用封装好的接口来发送消息
						counter++;
						if(counter > sendSize){
							counter = 0;
							sleep(100);//可根据发送消息频率设置一个缓冲窗口
						}
					}
				} catch (Exception e) {
					log.error(e);
				}
			}
		}

		public void setRunFlag(boolean runFlag) {
			run = runFlag;
		}
	}
}

第二种场景:

如果历史积累的数据不是很大,时间限定在当天内,后续交互的需求不是特别多,我们可以考虑使用memcache 或redis等内存缓存的形式。
如果历史积累的数据行数较多,且可能存在跨天数据交互等问题,则可以考虑将对应数据存放到HBase表中,当然我们需要根据实际业务,来定义好key的设计方式。避免热点或数据倾斜问题。

三、小结

上述分析相对来说力度较浅,在实际具体业务中,会有很多细节点需要深入优化处理,才能保证需求被可靠的满足。比如Storm重启时,是否能恢复处理现场。在Storm任务中,若需要查询其他已存在数据,如何能保证节点的性能及处理效果。

 

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>