Realize scalable stream processing: pulsar key_ Shared subscription mode

Baidu Developer Center 2021-10-14 06:36:14

This article is translated from StreamNative Blog 《Scalable Stream Processing with Pulsar’s Key_Shared Subscription》[1], author :David Kjerrumgaard.
translator : Liu Zilin 、 Duan Jia


1. Traditional messaging systems use a topic Multiple concurrent consumers on achieve high throughput 、 Stateless processing .2. The streaming system provides stateful processing for a single consumer , But there are reservations about throughput .3.Pulsar Of Key_Shared The subscription type allows for a single subscription topic High throughput and stateful processing .4.Pulsar Of Key_Shared The subscription type is suitable for user scenarios that require stateful processing of a large amount of data , For example, personalization 、 Real time marketing 、 Micro directional advertising and network security .

In establishment Pulsar Of Key_Shared Before subscribing , When using the traditional flow system framework, users must decide to be in a topic Having multiple consumers on for high throughput , Or have a consumer to get stateful processing . This blog will describe how to use Pulsar Of Key_Shared Subscribe to the behavior analysis of click stream data .

The difference between message system and flow system

Many developers believe that messaging systems and streaming systems are essentially the same , Therefore, the two terms are often mixed . However , Message system and flow system are very different , Understanding the differences between them allows users to choose the appropriate system according to their own user scenarios .

This section compares their message consumption and processing semantics , Help people understand why sometimes a separate messaging system and streaming system can't meet your scenario , And why some scenarios need a unified message and flow system .

The messaging system

The core data structure of message system is message queue . Incoming messages are first in first out (FIFO) Sequential storage of . The message is saved in the queue , Until it's consumed . Once the news is consumed , The message will be deleted , To make room for new incoming messages .

From the perspective of consumer processing , Messaging is completely stateless , Because each message contains all the information needed to perform processing . Therefore, the operation can be performed without any information from the previous message , Allows users to allocate message processing between multiple consumers , Reduce processing latency .

Message system is very suitable for users who want to expand a certain topic The number of concurrent consumers to increase the processing throughput . A good example is the traditional work queue , That is, an order execution microservice is needed to process the incoming e-commerce order . Because each order is independent of other orders , Meet the demand by increasing the number of micro service instances consumed from the queue .

Pulsar Shared subscriptions are designed for this type of scenario . Pictured 1 Shown , It provides messaging semantics by ensuring that each message is delivered accurately to a consumer with an additional subscription .

Icon 1:Pulsar The shared subscription type supports multiple consumers .

Flow system

In streaming , The central data structure is the log , It is a sequence of additional records sorted by time . The message is appended to the end of the log , The reading order is from the earliest to the latest . Message consumption is a non-destructive stream processing operation , Because the consumer just updates its position in the stream .

From a processing point of view , Flow is stateful , Because stream processing is carried out on a series of messages , These messages are usually grouped into fixed size messages according to time or size “ window ” ( for example : Every time 5 minute ). Stream processing relies on the information of all messages in the window to produce the correct results .

Flow systems are ideal for aggregation operations , For example, calculate a simple moving average of sensor readings , Because all sensor readings must be processed by the same consumer combination , In order to calculate the correct value .

Pulsar Exclusive subscriptions provide the correct stream processing semantics for this type of scenario . Pictured 2 Shown , The exclusive subscription mode ensures that all messages are delivered to a single consumer in the chronological order of receipt .

Icon 2:Pulsar The exclusive subscription model supports a single consumer .

Contrast and choice

As you can see , Message queues and flows provide different processing semantics . The message system supports high scalability by supporting multiple concurrent consumers . When dealing with large amounts of data that need to be processed quickly , You should use a messaging system , In this way, the delay between the generation and processing of each message is very low .

Flow systems have more complex analysis and processing capabilities , But at the expense of everyone topic At the expense of partition scalability . In order to get accurate results , Only a single consumer is allowed to process data , Therefore, the speed of processing data will be severely limited , This leads to higher latency in the streaming system scenario .

Although latency can be reduced by using sharding and partitioning , But scalability is still limited . Binding the scalability of processing to the number of partitions will reduce the flexibility of the architecture . Changing the number of partitions will also affect data publishing to topic The way . therefore , Stream processing should only be used when you need stateful processing and can tolerate slower processing .

However , If your scenario requires both low latency and stateful processing, how to choose ? If you are using Apache Pulsar, Then you should consider Key_Shared A subscription model , It provides processing semantics that combine messaging and flow processing .

Apache Pulsar’s Key_Shared A subscription model

The news is Pulsar The basic unit of , They don't just include raw bytes sent between producers and consumers , It also includes some metadata fields . Pictured 3 Shown , Every Pulsar One metadata field in the message is “key” Field , Can hold a string value . This is it. Key_Shared Subscribe to the fields used for grouping .

Icon 3: One Pulsar The message contains optional metadata fields , It contains a name called ”key” Field of , yes Key_Shared Subscribe to the fields used for grouping .

Pulsar Key_Shared The subscription mode supports multiple concurrent consumers , So you can easily reduce processing latency by increasing the number of consumers . In this regard , It provides the semantics of message queue types , Because each message can be processed independently of other messages .

However , This subscription type is different from the traditional Shared Subscription types differ in the way they distribute data between consumers . Unlike traditional messaging where any consumer can handle any message , stay Pulsar Of Key_Shared Subscription , Messages are assigned to consumers , And guarantee the same key The message is sent to the same consumer .

Icon 4:Pulsar Of Key_Shared The subscription type must have the same key Messages are sent to the same consumer in the order they are received .

Pulsar Through the introduction of key This guarantee is achieved by hashing the hash value and evenly distributing the hash value to all consumers of the subscription . therefore , We know that we have the same key The message will produce the same hash value , And sent to the same as before key The same consumer .

By ensuring that all have the same key All messages are sent to the same consumer , And consumers can guarantee to receive specific information in the order they receive it key All the news of , This is in line with the semantics of streaming consumption . Let's explore an effective way to use Pulsar Of Key_Shared Real use cases for subscriptions .

Scenario case : Click stream data (Clickstream Data) Behavioral analysis of

Providing real-time and targeted recommendations on e-commerce websites based on click stream data is a good case . Because it needs to process a large amount of data with low latency , We stream data by clicking (Clickstream Data) The behavior analysis explains Key_Shared A subscription model .

Click stream data

Click stream data refers to the click sequence executed by a single user when interacting with the website . Click stream contains all user interactions , For example, where to click 、 The pages visited and the time spent on each page .

Icon 5: Click stream data is a time series representing the interaction events between individuals and websites .

This kind of data can be used to analyze and report user behavior on a specific website , For example, routing 、 Stickiness and tracking of common user paths through websites . Click popularity is basically a sequence of user interaction with a specific website .

Data tracking

To receive click stream data , Some tracking software needs to be embedded into the website , To collect click stream events and forward them to the analysis system . These labels are usually a short paragraph JavaScript Code , Capture personal level user row data ( for example IP Address and cookie). Every time a user clicks on a marked website , The tracking software will detect this event , And pass HTTP POST Request for JSON Format to collect and forward information to the server .

list 1 It is generated by a tracking Library JSON Object example , These click stream events include aggregation before consumption analysis 、 Information used in processing such as filtering and filling .

{ "app_id":"", "platform":"web", "collector_tstamp":"2021-08-17T23:46:46.818Z", "dvce_created_tstamp":"2021-08-17T23:46:45.894Z", "event":"page_view", "event_id":"933b4974-ffbd-11eb-9a03-0242ac130003", "user_ipaddress":"", "domain_userid":"8bf27e62-ffbd-11eb-9a03-0242ac130003", "session_id":"7", "page_url":"" ...}

list 1: Example of click stream event containing personal user identification information .

There can be millions of active at any time JavaScript tracker , Each tracker collects click stream events of individual visitors on the company's website . These events are forwarded to a single tag collector , The tag collector publishes them directly to Pulsar topic in .

chart 6: The tracker collects click stream events for a single user , Forward it to a single collector . Once received, the event will be released , This causes data from multiple users to cross in the topic in .

From the picture 6 You can see the problem : Because of these JavaScript Labels don't coordinate with each other , Click stream data from multiple users will eventually be mixed in Pulsar In the theme . The reason is that we only analyze the click stream data of a single user .

Identity stitching

In order to correctly analyze the data , First, you need to combine the original click stream events of each user , To ensure that their interaction journey can be fully understood in the order in which they occur . This process of reconstructing each user's click stream from mixed data is called identity splicing . It is accomplished by associating click stream events based on as many user unique identifiers as possible .

This is it. Key_Shared The subscription pattern is the perfect use case : The complete event flow for each individual user needs to be processed in the order in which the events occur , Therefore, stream data processing semantics are needed , And this process needs to be extended to match the traffic on the company's website .Pulsar Of Key_Shared Subscriptions allow you to do both at the same time .

In order to reconstruct the click stream of each user , Use... In click stream events domain_userid Field , It is from JavaScript Unique identifier generated by the tag . This field is a randomly generated universal unique identifier (UUID), Used to uniquely identify each user . So all have the same domain_userid All click stream events of value belong to the same user . Use this value to make Pulsar Of Key_Shared Subscriptions combine the events of all users .

Use Key_Shared A subscription model

To achieve behavior analysis , We need to fully understand the interaction between users and the website , Therefore, we need to ensure that all the clicks of a single user are combined , And pass them on to the same consumer . As discussed in the previous section , In each click stream event domain_userid Fields contain the unique identifier of the user . By using this value as the message key , When we use Key_Shared When subscribing ,Pulsar It can ensure that all events of the same user are passed to the same consumer .

Data filling

This from JavaScript Tags are collected and forwarded JSON object , Contains the original JSON byte (key Duan is empty ). therefore , In order to take advantage of Key_Shared subscribe , First, you need to use each JSON Within the object domain_userid The message key is populated with the value of the field to enrich the message .

import org.apache.pulsar.functions.api.Context;import org.apache.pulsar.functions.api.Function;import com.fasterxml.jackson.databind.ObjectMapper;import com.manning.pulsar.chapter4.types.TrackingTag;import org.apache.pulsar.client.impl.schema.JSONSchema;public class WebTagEnricher implements Function<String, Void> { static final String TOPIC = "persistent://tracking/web-activity/tags"; @Override public Void process(String json, Context ctx) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); TrackingTag tag = objectMapper.readValue(json, TrackingTag.class);  ctx.newOutputMessage(TOPIC, JSONSchema.of(TrackingTag.class)) .key(tag.getDomainUserId()) .value(tag) .send();  return null; }}

list 2:Pulsar Function Convert the original label bytes to JSON object , And will domain_userid The value of the field is copied to the of the outgoing message key Field .

This can be done with a relatively simple piece of code , As listing 2 Shown , It parses JSON object 、 obtain domain_userid Value of field , And output a new message containing the original click stream event , The key of this event is populated with the user's UUID. This type of logical processing is Pulsar Functions The perfect scenario . Besides , Because logic is stateless , Therefore, you can use the shared subscription type to execute in parallel , This will minimize the processing time required to perform this task .

Use Key_Shared Subscribe for identity splicing

Once the correct key value is used, the message containing the click stream event will be enriched properly , The next step is to confirm Key_Shared Subscription processing performs identity splicing . list 3 Code in Key_Shared A total of five consumers were launched on the subscription .

public class ClickstreamAggregator { static final String PULSAR_SERVICE_URL = "pulsar://localhost:6650"; static final String MY_TOPIC = "persistent://tracking/web-activity/tags\""; static final String SUBSCRIPTION = "aggregator-sub"; public static void main() throws PulsarClientException { PulsarClient client = PulsarClient.builder() .serviceUrl(PULSAR_SERVICE_URL) .build(); ConsumerBuilder<TrackingTag> consumerBuilder =  client.newConsumer(JSONSchema.of(TrackingTag.class)) .topic(MY_TOPIC) .subscriptionName(SUBSCRIPTION) .subscriptionType(SubscriptionType.Key_Shared) .messageListener(new TagMessageListener());  IntStream.range(0, 4).forEach(i -> { String name = String.format("mq-consumer-%d", i); try { consumerBuilder .consumerName(name) .subscribe(); } catch (PulsarClientException e) { e.printStackTrace(); } }); }}

list 3: The main class uses MessageListener The interface is in the same Key_Shared Start consumer on subscription , This interface runs in the internal thread pool .

New events arrive  TagMessageListener  Class time , The processing logic is as follows . Because consumers are likely to be assigned multiple keys , Therefore, the incoming click stream events need to be stored in the internal mapping , This map uses the of each page visitor UUID As key . therefore , By using Apache Commons Least recently used in the library (LRU) Mapping implementation to achieve , Ensure that the mapping remains fixed size by deleting the oldest element in the event when the event becomes full .

import;import org.apache.pulsar.client.api.Consumer;import org.apache.pulsar.client.api.Message;import org.apache.pulsar.client.api.MessageListener;public class TagMessageListener implements MessageListener<TrackingTag> { private LRUMap<String, List<TrackingTag>> userActivity =  new LRUMap<String, List<TrackingTag>>(100); @Override public void received(Consumer<TrackingTag> consumer,  Message<TrackingTag> msg) { try { recordEvent(msg.getValue()); invokeML(msg.getValue().getDomainUserId()); consumer.acknowledge(msg); } catch (PulsarClientException e) { e.printStackTrace(); } } private void recordEvent(TrackingTag event) { if (!userActivity.containsKey(event.getDomainUserId())) { userActivity.put(event.getDomainUserId(),  new ArrayList<TrackingTag> ()); }  userActivity.get(event.getDomainUserId()).add(event); } // Invokes the ML model with the collected events for the user  private void invokeML(String domainUserId) { . . . } }

list 4: The class responsible for aggregating click stream Events LRU Mapping by user ID Sort Events . Each new event is appended to the previous event list . These lists can then be entered through the machine learning model to generate recommendation data .

When a new event arrives , It will be added to the click stream of the corresponding user , Thus, for the users assigned to consumers key Refactoring clickstream .

Real time behavior analysis

Now that the click stream has been reconstructed , They can be provided to the machine learning model , The model will provide targeted recommendations for each visitor to the company's website , For example, according to the goods in the shopping cart 、 Recently viewed items or coupons suggest adding items to the shopping cart . Through real-time behavior analysis , Can improve the user experience through personalized recommendation , It helps to improve the conversion rate and average order size .


Traditional message queuing is implemented by multiple concurrent consumers to a message topic To deal with . A typical scenario is the traditional work queue where the order micro service processes consumer orders . For such scenes , have access to Pulsar Shared subscription .

The traditional stream system performs stateful data processing , There is only one consumer on a theme , But there are limits on throughput . Flow systems can be used for more complex analytical processing capabilities .Pulsar Of Exclusive and Failover The subscription pattern is designed to support this semantics .

Pulsar Of Key_Shared The subscription type allows for a single subscription topic High throughput and stateful processing . It is very suitable for scenarios that require stateful processing of a large amount of data , For example, personalization 、 Real time marketing 、 Micro directional advertising and network security .

More information about Pulsar Of Key_Shared Subscription information , Can be read  Apache Pulsar file [2].


thank Apache Pulsar Community volunteer Liu Zilin @ Jue shirt 、 Duan Jia @Janusjia Translation of this article .

Reference link

[1] 《Scalable Stream Processing with Pulsar’s Key_Shared Subscription》:
[2] Apache Pulsar file :

Please bring the original link to reprint ,thank
Similar articles