找到你要的答案

Q:Processing records in order in Storm

Q:按风暴处理记录

I'm new to Storm and I'm having problems to figure out how to process records in order.

I have a dataset which contains records with the following fields:

user_id, location_id, time_of_checking

Now, I would like to identify users which have fulfilled the path I specified (for example, users that went from location A to location B to location C).

I'm using Kafka producer and reading this records from a file to simulate live data. Data is sorted by date.

So, to check if my pattern is fulfilled I need to process records in order. The thing is, due to parallelization (bolt replication) I don't get check-ins of user in order. Because of that patterns won't work.

How to overcome this problem? How to process records in order?

我是新来的风暴,我有问题,找出如何处理顺序记录。

我有一个数据集,它包含以下字段的记录:

user_id,location_id,time_of_checking

现在,我想确定用户已经完成了我指定的路径(例如,用户从位置A到B位置到C位置)。

我使用卡夫卡制片人和读取此记录从一个文件来模拟现场数据。数据按日期排序。

所以,检查我的模式是否完成,我需要按顺序处理记录。问题是,由于并行化(螺栓复制)我不得到检查用户的顺序。因为这种模式行不通。

如何克服这个问题?如何处理记录顺序?

answer1: 回答1:

There is no general system support for ordered processing in Storm. Either you use a different system that supports ordered steam processing like Apache Flink (Disclaimer, I am a committer at Flink) or you need to take care of it in your bolt code by yourself.

The only support Storm delivers is using Trident. You can put tuples of a certain time period (for example one minute) into a single batch. Thus, you can process all tuples within a minute at once. However, this only works if your use case allows for it because you cannot related tuples from different batches to each other. In your case, this would only be the case, if you know that there are points in time, in which all users have reached their destination (and no other use started a new interaction); ie, you need points in time in which no overlap of any two users occurs. (It seems to me, that your use-case cannot fulfill this requirement).

For non-system, ie, customized user-code based solution, there would be two approaches:

You could for example buffer up tuples and sort on time stamp within a bolt before processing. To make this work properly, you need to inject punctuations/watermarks that ensure that no tuple with larger timestamp than the punctuation comes after a punctuation. If you received a punctuation from each parallel input substream you can safely trigger sorting and processing.

Another way would be to buffer tuples per incoming substream in district buffers (within a substream order is preserved) and merge the tuples from the buffers in order. This has the advantage that sorting is avoided. However, you need to ensure that each operator emits tuples ordered. Furthermore, to avoid blocking (ie, if no input is available for a substream) punctuations might be needed, too. (I implemented this approach. Feel free to use the code or adapt it to your needs: https://github.com/mjsax/aeolus/blob/master/queries/utils/src/main/java/de/hub/cs/dbis/aeolus/utils/TimestampMerger.java)

在风暴中没有有序的系统支持。无论你使用不同的系统,支持有序的蒸汽处理像Apache弗林克(声明,我在Flink提交者)或者你需要照顾它你自己锚代码。

唯一支持风暴提供使用三叉戟。你可以把一个特定的时间段内的元组(例如一分钟)到一个单一的批。因此,你可以处理所有的元组,一分钟内马上。然而,这只能在你的用例允许它因为你不能从不同批次的相互关系的元组。在您的情况下,这将是唯一的情况下,如果你知道有时间点,其中所有用户都达到了他们的目的地(并没有其他使用启动了新的互动),即,您需要时间点,其中没有重叠的任何两个用户发生。在我看来,你的用例不能满足这个要求)。

对于非系统,即基于用户自定义代码的解决方案,将有两种方法:

例如你可以缓冲了元组和时间戳排序在前处理的螺栓。为了使这项工作正常,你需要注入标点/水印,保证比标点符号标点符号更大的时间戳之后没有元组。如果你收到一个标点符号,每个并行输入数据可以安全地触发分类和处理。

另一个方法是在每个传入的数据元组缓冲区缓冲区(在子顺序)从缓冲区合并为元组。这有利于避免排序。然而,你需要确保每个操作员发出元组的有序。此外,为了避免阻塞(即,如果没有输入可用于支流)标点符号可能是需要的,太。(我实施这个方法。随意使用代码或使它适应您的需要:https://github.com/mjsax/aeolus/blob/master/queries/utils/src/main/java/de/hub/cs/dbis/aeolus/utils/timestampmerger.java)

apache-kafka  apache-storm