Skip to content

Commit

Permalink
Release 3.6.1
Browse files Browse the repository at this point in the history
  • Loading branch information
weiqiangliu committed Jan 9, 2023
1 parent f33b124 commit 70e82dc
Show file tree
Hide file tree
Showing 11 changed files with 325 additions and 14 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ SensorsAnalytics SDK 是国内第一家开源商用版用户行为采集 SDK,

## License

Copyright 2015-2022 Sensors Data Inc.
Copyright 2015-2023 Sensors Data Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion SensorsAnalyticsSDK/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<groupId>com.sensorsdata.analytics.javasdk</groupId>
<name>SensorsAnalyticsSDK</name>
<artifactId>SensorsAnalyticsSDK</artifactId>
<version>3.6.0</version>
<version>3.6.1</version>
<description>The official Java SDK of Sensors Analytics</description>
<url>http://sensorsdata.cn</url>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ private SensorsConst() {
/**
* 当前JDK版本号,注意要和pom文件里面的version保持一致
*/
public static final String SDK_VERSION = "3.6.0";
public static final String SDK_VERSION = "3.6.1";
/**
* 当前语言类型
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -20,10 +22,13 @@ public class BatchConsumer implements Consumer {

private final List<Map<String, Object>> messageList;
private final HttpConsumer httpConsumer;
private final InstantHttpConsumer instantHttpConsumer;
private final ObjectMapper jsonMapper;
private final int bulkSize;
private final boolean throwException;
private final int maxCacheSize;
private List<String> instantEvents;
private boolean isInstantStatus;

public BatchConsumer(final String serverUrl) {
this(serverUrl, 50);
Expand Down Expand Up @@ -58,8 +63,22 @@ public BatchConsumer(final String serverUrl, final int bulkSize, final int maxCa

public BatchConsumer(HttpClientBuilder httpClientBuilder, final String serverUrl, final int bulkSize, final int maxCacheSize,
final boolean throwException, final int timeoutSec) {
this(httpClientBuilder, serverUrl, bulkSize, maxCacheSize, throwException, timeoutSec, new ArrayList<String>());
}


public BatchConsumer(final String serverUrl, final int bulkSize, final int maxCacheSize,
final boolean throwException, final int timeoutSec, List<String> instantEvents) {
this(HttpClients.custom(), serverUrl, bulkSize, maxCacheSize, throwException, timeoutSec, instantEvents);
}

public BatchConsumer(HttpClientBuilder httpClientBuilder, final String serverUrl, final int bulkSize, final int maxCacheSize,
final boolean throwException, final int timeoutSec, List<String> instantEvents) {
this.messageList = new LinkedList<>();
this.isInstantStatus = false;
this.instantEvents = instantEvents;
this.httpConsumer = new HttpConsumer(httpClientBuilder, serverUrl, Math.max(timeoutSec, 1));
this.instantHttpConsumer = new InstantHttpConsumer(httpClientBuilder, serverUrl, Math.max(timeoutSec, 1));
this.jsonMapper = SensorsAnalyticsUtil.getJsonObjectMapper();
this.bulkSize = Math.min(MAX_FLUSH_BULK_SIZE, Math.max(1, bulkSize));
if (maxCacheSize > MAX_CACHE_SIZE) {
Expand All @@ -78,6 +97,7 @@ public BatchConsumer(HttpClientBuilder httpClientBuilder, final String serverUrl
@Override
public void send(Map<String, Object> message) {
synchronized (messageList) {
dealInstantSignal(message);
int size = messageList.size();
if (maxCacheSize <= 0 || size < maxCacheSize) {
messageList.add(message);
Expand Down Expand Up @@ -110,7 +130,11 @@ public void flush() {
}
log.debug("Will be send data:{}.", sendingData);
try {
this.httpConsumer.consume(sendingData);
if (isInstantStatus) {
this.instantHttpConsumer.consume(sendingData);
} else {
this.httpConsumer.consume(sendingData);
}
sendList.clear();
} catch (Exception e) {
log.error("Failed to send data:{}.", sendingData, e);
Expand All @@ -131,4 +155,23 @@ public void close() {
httpConsumer.close();
log.info("Call close method.");
}

private void dealInstantSignal(Map<String, Object> message) {

/*
* 如果当前是「instant」状态,且(message中不包含event 或者 event 不是「instant」的,则刷新,设置 「非instant」状态
*/
if (isInstantStatus && (!message.containsKey("event") || !instantEvents.contains(message.get("event")))) {
flush();
isInstantStatus = false;
}

/*
* 如果当前是 「非instant」状态,且(message中包含event 且 event 是「instant」的,则刷新,设置 「instant」状态
*/
if (!isInstantStatus && message.containsKey("event") && instantEvents.contains(message.get("event"))) {
flush();
isInstantStatus = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ public class FastBatchConsumer implements Consumer {

private final LinkedBlockingQueue<Map<String, Object>> buffer;
private final HttpConsumer httpConsumer;
private final InstantHttpConsumer instantHttpConsumer;
private final ObjectMapper jsonMapper;
private final Callback callback;
private final int bulkSize;
private final ScheduledExecutorService executorService;
private List<String> instantEvents;
private boolean isInstantStatus;

public FastBatchConsumer(@NonNull String serverUrl, @NonNull Callback callback) {
this(serverUrl, false, callback);
Expand Down Expand Up @@ -69,12 +72,21 @@ public FastBatchConsumer(@NonNull String serverUrl, final boolean timing, final

public FastBatchConsumer(HttpClientBuilder httpClientBuilder, @NonNull String serverUrl, final boolean timing, final int bulkSize, int maxCacheSize,
int flushSec, int timeoutSec, @NonNull Callback callback) {
this(httpClientBuilder, serverUrl, timing, bulkSize, maxCacheSize, flushSec, timeoutSec, callback, new ArrayList<String>());
}

public FastBatchConsumer(HttpClientBuilder httpClientBuilder, @NonNull String serverUrl, final boolean timing, final int bulkSize, int maxCacheSize,
int flushSec, int timeoutSec, @NonNull Callback callback, List<String> instantEvents) {
this.buffer =
new LinkedBlockingQueue<>(Math.min(Math.max(MIN_CACHE_SIZE, maxCacheSize), MAX_CACHE_SIZE));
this.httpConsumer = new HttpConsumer(httpClientBuilder, serverUrl, Math.max(timeoutSec, 1));
this.instantHttpConsumer = new InstantHttpConsumer(httpClientBuilder, serverUrl, Math.max(timeoutSec, 1));

this.jsonMapper = SensorsAnalyticsUtil.getJsonObjectMapper();
this.callback = callback;
this.bulkSize = Math.min(MIN_CACHE_SIZE, Math.max(bulkSize, MIN_BULK_SIZE));
this.instantEvents = instantEvents;

executorService = new ScheduledThreadPoolExecutor(1);
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
Expand All @@ -95,15 +107,33 @@ public void run() {

@Override
public void send(Map<String, Object> message) {
if (!buffer.offer(message)) {
List<Map<String, Object>> res = new ArrayList<>(1);
res.add(message);
callback.onFailed(new FailedData("can't offer to buffer.", res));
log.error("Failed to load data into the cache.The cache current size is {}.", buffer.size());
dealInstantSignal(message);
if (buffer.remainingCapacity() == 0) {
flush();
}
buffer.offer(message);
log.info("Successfully save data to cache.The cache current size is {}.", buffer.size());
}

private void dealInstantSignal(Map<String, Object> message) {

/*
* 如果当前是「instant」状态,且(message中不包含event 或者 event 不是「instant」的,则刷新,设置 「非instant」状态
*/
if (isInstantStatus && (!message.containsKey("event") || !instantEvents.contains(message.get("event")))) {
flush();
isInstantStatus = false;
}

/*
* 如果当前是 「非instant」状态,且(message中包含event 且 event 是「instant」的,则刷新,设置 「instant」状态
*/
if (!isInstantStatus && message.containsKey("event") && instantEvents.contains(message.get("event"))) {
flush();
isInstantStatus = true;
}
}

/**
* This method don't need to be called actively.Because instance will create scheduled thread to do.
*/
Expand All @@ -130,7 +160,11 @@ public void flush() {
}
log.debug("Data will be sent.{}", sendingData);
try {
this.httpConsumer.consume(sendingData);
if (isInstantStatus) {
this.instantHttpConsumer.consume(sendingData);
} else {
this.httpConsumer.consume(sendingData);
}
} catch (Exception e) {
log.error("Failed to send data:{}.", sendingData, e);
callback.onFailed(new FailedData(String.format("failed to send data,message:%s.", e.getMessage()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ HttpUriRequest getHttpRequest(final String data) throws IOException {
}

UrlEncodedFormEntity getHttpEntry(final String data) throws IOException {
List<NameValuePair> nameValuePairs = getNameValuePairs(data);
return new UrlEncodedFormEntity(nameValuePairs);
}

List<NameValuePair> getNameValuePairs(String data) throws IOException {

byte[] bytes = data.getBytes(Charset.forName("UTF-8"));

List<NameValuePair> nameValuePairs = new ArrayList<NameValuePair>();
Expand All @@ -118,14 +124,14 @@ UrlEncodedFormEntity getHttpEntry(final String data) throws IOException {

nameValuePairs.add(new BasicNameValuePair("gzip", "1"));
nameValuePairs.add(new BasicNameValuePair("data_list", new String(Base64Coder.encode
(compressed))));
(compressed))));
} else {
nameValuePairs.add(new BasicNameValuePair("gzip", "0"));
nameValuePairs.add(new BasicNameValuePair("data_list", new String(Base64Coder.encode
(bytes))));
(bytes))));
}

return new UrlEncodedFormEntity(nameValuePairs);
return nameValuePairs;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.sensorsdata.analytics.javasdk.consumer;

import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicNameValuePair;

import java.io.IOException;
import java.util.List;
import java.util.Map;

public class InstantHttpConsumer extends HttpConsumer{

public InstantHttpConsumer(String serverUrl, int timeoutSec) {
super(serverUrl, timeoutSec);
}

public InstantHttpConsumer(String serverUrl, Map<String, String> httpHeaders) {
super(serverUrl, httpHeaders);
}

InstantHttpConsumer(String serverUrl, Map<String, String> httpHeaders, int timeoutSec) {
super(serverUrl, httpHeaders, timeoutSec);
}

public InstantHttpConsumer(HttpClientBuilder httpClientBuilder, String serverUrl,
Map<String, String> httpHeaders) {
super(httpClientBuilder, serverUrl, httpHeaders);
}

public InstantHttpConsumer(HttpClientBuilder httpClientBuilder, String serverUrl, int timeoutSec) {
super(httpClientBuilder, serverUrl, timeoutSec);
}

InstantHttpConsumer(HttpClientBuilder httpClientBuilder, String serverUrl, Map<String, String> httpHeaders,
int timeoutSec) {
super(httpClientBuilder, serverUrl, httpHeaders, timeoutSec);
}

@Override
UrlEncodedFormEntity getHttpEntry(final String data) throws IOException {
List<NameValuePair> nameValuePairs = getNameValuePairs(data);
nameValuePairs.add(new BasicNameValuePair("instant_event", "true"));
return new UrlEncodedFormEntity(nameValuePairs);
}
}
Loading

0 comments on commit 70e82dc

Please sign in to comment.