Skip to content

Commit c8567aa

Browse files
author
俟命
committed
support v4SigningKey auto update
support otsReader
1 parent 0718c5d commit c8567aa

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+3242
-50
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,19 @@
99
- 阿里云表格存储是阿里云自主研发的NoSQL数据存储服务,提供海量结构化数据的存储和实时访问。
1010

1111
## 版本
12-
- 当前版本:5.16.0
12+
- 当前版本:5.16.1
1313

1414
## 运行环境
1515
- JDK 6及其以上
1616

1717
## 安装
1818
#### Maven方式
19-
下载[最新版JAR包](https://search.maven.org/remotecontent?filepath=com/aliyun/openservices/tablestore/5.16.0/tablestore-5.16.0.jar)或者通过Maven:
19+
下载[最新版JAR包](https://search.maven.org/remotecontent?filepath=com/aliyun/openservices/tablestore/5.16.1/tablestore-5.16.1.jar)或者通过Maven:
2020
```xml
2121
<dependency>
2222
<groupId>com.aliyun.openservices</groupId>
2323
<artifactId>tablestore</artifactId>
24-
<version>5.16.0</version>
24+
<version>5.16.1</version>
2525
</dependency>
2626
```
2727

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>com.aliyun.openservices</groupId>
55
<artifactId>tablestore</artifactId>
6-
<version>5.16.0</version>
6+
<version>5.16.1</version>
77
<packaging>jar</packaging>
88
<name>AliCloud TableStore SDK for Java</name>
99
<url>http://www.aliyun.com</url>
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
package com.alicloud.openservices.tablestore;
2+
3+
import java.util.HashMap;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.concurrent.CountDownLatch;
7+
import java.util.concurrent.Executor;
8+
import java.util.concurrent.Executors;
9+
import java.util.concurrent.Future;
10+
import java.util.concurrent.ScheduledExecutorService;
11+
import java.util.concurrent.Semaphore;
12+
import java.util.concurrent.ThreadFactory;
13+
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.atomic.AtomicBoolean;
15+
import java.util.concurrent.atomic.AtomicInteger;
16+
17+
import com.alicloud.openservices.tablestore.core.ResourceManager;
18+
import com.alicloud.openservices.tablestore.core.auth.CredentialsProvider;
19+
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentialProvider;
20+
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
21+
import com.alicloud.openservices.tablestore.model.DescribeTableRequest;
22+
import com.alicloud.openservices.tablestore.model.DescribeTableResponse;
23+
import com.alicloud.openservices.tablestore.model.PrimaryKey;
24+
import com.alicloud.openservices.tablestore.model.RowQueryCriteria;
25+
import com.alicloud.openservices.tablestore.model.TableMeta;
26+
import com.alicloud.openservices.tablestore.reader.PrimaryKeyWithTable;
27+
import com.alicloud.openservices.tablestore.reader.ReaderBucket;
28+
import com.alicloud.openservices.tablestore.reader.ReaderDispatcher;
29+
import com.alicloud.openservices.tablestore.reader.ReaderEvent;
30+
import com.alicloud.openservices.tablestore.reader.ReaderGroup;
31+
import com.alicloud.openservices.tablestore.reader.ReaderResult;
32+
import com.alicloud.openservices.tablestore.reader.ReaderStatistics;
33+
import com.alicloud.openservices.tablestore.reader.ReaderUtils;
34+
import com.alicloud.openservices.tablestore.reader.RowReadResult;
35+
import com.alicloud.openservices.tablestore.reader.TableStoreReaderConfig;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
public class DefaultTableStoreReader implements TableStoreReader {
40+
private static final int SCHEDULED_CORE_POOL_SIZE = 2;
41+
private final AtomicBoolean closed = new AtomicBoolean(false);
42+
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(SCHEDULED_CORE_POOL_SIZE, new ThreadFactory() {
43+
private final AtomicInteger counter = new AtomicInteger(0);
44+
45+
@Override
46+
public Thread newThread(Runnable r) {
47+
return new Thread(r, "reader-scheduled-pool-%d" + counter.getAndIncrement());
48+
}
49+
});
50+
private final Logger logger = LoggerFactory.getLogger(DefaultTableStoreReader.class);
51+
private final AsyncClientInterface ots;
52+
private final TableStoreReaderConfig config;
53+
private final TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback;
54+
private final Executor executor;
55+
private final ReaderBucket[] buckets;
56+
private final Semaphore semaphore;
57+
private final ReaderStatistics statistics;
58+
private final ReaderDispatcher dispatcher;
59+
private final Map<String, TableMeta> metaMap;
60+
61+
public DefaultTableStoreReader(AsyncClientInterface ots, TableStoreReaderConfig config, Executor executor, TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback) {
62+
this.ots = ots;
63+
this.config = config;
64+
this.executor = executor;
65+
this.callback = callback;
66+
this.statistics = new ReaderStatistics();
67+
68+
semaphore = new Semaphore(config.getConcurrency());
69+
metaMap = new HashMap<String, TableMeta>();
70+
71+
this.buckets = new ReaderBucket[config.getBucketCount()];
72+
for (int i = 0; i < buckets.length; i++) {
73+
buckets[i] = new ReaderBucket(ots, semaphore, config, callback, executor, statistics);
74+
}
75+
dispatcher = new ReaderDispatcher(buckets.length);
76+
77+
startFlushTimer(config.getFlushInterval());
78+
startLogTimer(config.getLogInterval());
79+
}
80+
81+
@Override
82+
public void addPrimaryKey(String tableName, PrimaryKey primaryKey) {
83+
if (config.isCheckTableMeta()) {
84+
checkPrimaryKeyWithTable(tableName, primaryKey);
85+
}
86+
87+
ReaderGroup group = new ReaderGroup(1);
88+
PrimaryKeyWithTable primaryKeyWithTable = new PrimaryKeyWithTable(tableName, primaryKey);
89+
while (true) {
90+
if (!addPrimaryKeyWithTableInternal(primaryKeyWithTable, group)) {
91+
try {
92+
Thread.sleep(1);
93+
} catch (InterruptedException exp) {
94+
}
95+
} else {
96+
break;
97+
}
98+
}
99+
}
100+
101+
@Override
102+
public Future<ReaderResult> addPrimaryKeyWithFuture(String tableName, PrimaryKey primaryKey) {
103+
if (config.isCheckTableMeta()) {
104+
checkPrimaryKeyWithTable(tableName, primaryKey);
105+
}
106+
107+
ReaderGroup group = new ReaderGroup(1);
108+
PrimaryKeyWithTable primaryKeyWithTable = new PrimaryKeyWithTable(tableName, primaryKey);
109+
while (true) {
110+
if (!addPrimaryKeyWithTableInternal(primaryKeyWithTable, group)) {
111+
try {
112+
Thread.sleep(1);
113+
} catch (InterruptedException exp) {
114+
}
115+
} else {
116+
break;
117+
}
118+
}
119+
return group.getFuture();
120+
}
121+
122+
@Override
123+
public void addPrimaryKeys(String tableName, List<PrimaryKey> primaryKeys) {
124+
ReaderGroup group = new ReaderGroup(primaryKeys.size());
125+
for (PrimaryKey primaryKey : primaryKeys) {
126+
if (config.isCheckTableMeta()) {
127+
checkPrimaryKeyWithTable(tableName, primaryKey);
128+
}
129+
PrimaryKeyWithTable primaryKeyWithTable = new PrimaryKeyWithTable(tableName, primaryKey);
130+
while (true) {
131+
if (!addPrimaryKeyWithTableInternal(primaryKeyWithTable, group)) {
132+
try {
133+
Thread.sleep(1);
134+
} catch (InterruptedException exp) {
135+
}
136+
} else {
137+
break;
138+
}
139+
}
140+
}
141+
}
142+
143+
@Override
144+
public Future<ReaderResult> addPrimaryKeysWithFuture(String tableName, List<PrimaryKey> primaryKeys) {
145+
ReaderGroup group = new ReaderGroup(primaryKeys.size());
146+
for (PrimaryKey primaryKey : primaryKeys) {
147+
if (config.isCheckTableMeta()) {
148+
checkPrimaryKeyWithTable(tableName, primaryKey);
149+
}
150+
PrimaryKeyWithTable primaryKeyWithTable = new PrimaryKeyWithTable(tableName, primaryKey);
151+
while (true) {
152+
if (!addPrimaryKeyWithTableInternal(primaryKeyWithTable, group)) {
153+
try {
154+
Thread.sleep(1);
155+
} catch (InterruptedException exp) {
156+
}
157+
} else {
158+
break;
159+
}
160+
}
161+
}
162+
163+
return group.getFuture();
164+
}
165+
166+
@Override
167+
public void setRowQueryCriteria(RowQueryCriteria rowQueryCriteria) {
168+
for (ReaderBucket bucket : buckets) {
169+
bucket.setRowQueryCriteria(rowQueryCriteria);
170+
}
171+
}
172+
173+
@Override
174+
public void send() {
175+
logger.debug("trigger send data.");
176+
if (closed.get()) {
177+
throw new ClientException("The reader has been closed.");
178+
}
179+
180+
triggerEvent(ReaderEvent.EventType.SEND);
181+
182+
logger.debug("user trigger send finished.");
183+
}
184+
185+
@Override
186+
public void flush() {
187+
logger.debug("trigger flush and waiting.");
188+
if (closed.get()) {
189+
throw new ClientException("The reader has been closed.");
190+
}
191+
192+
CountDownLatch latch = triggerEvent(ReaderEvent.EventType.FLUSH);
193+
try {
194+
latch.await();
195+
} catch (InterruptedException e) {
196+
throw new ClientException(e);
197+
}
198+
logger.info("Reader statistics: " + statistics);
199+
logger.debug("user trigger flush finished.");
200+
}
201+
202+
@Override
203+
public void setCallback(TableStoreCallback<PrimaryKeyWithTable, RowReadResult> callback) {
204+
for (ReaderBucket bucket : buckets) {
205+
bucket.setCallback(callback);
206+
}
207+
}
208+
209+
private CountDownLatch triggerEvent(ReaderEvent.EventType type) {
210+
CountDownLatch latch = new CountDownLatch(1);
211+
for (ReaderBucket bucket : buckets) {
212+
bucket.addSignal(latch, type);
213+
}
214+
return latch;
215+
}
216+
217+
public synchronized void close() {
218+
if (closed.get()) {
219+
throw new ClientException("The reader has already been closed.");
220+
}
221+
flush();
222+
223+
scheduledExecutorService.shutdown();
224+
for (ReaderBucket bucket : buckets) {
225+
bucket.close();
226+
}
227+
closed.set(true);
228+
}
229+
230+
private boolean addPrimaryKeyWithTableInternal(PrimaryKeyWithTable primaryKeyWithTable, final ReaderGroup readerGroup) {
231+
if (closed.get()) {
232+
throw new ClientException("The reader has been closed.");
233+
}
234+
235+
int dispatchIndex = dispatcher.getDispatchIndex(primaryKeyWithTable.getPrimaryKey());
236+
return buckets[dispatchIndex].addPrimaryKeyWithTable(primaryKeyWithTable, readerGroup);
237+
}
238+
239+
public void startFlushTimer(int flushInterval) {
240+
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
241+
@Override
242+
public void run() {
243+
triggerEvent(ReaderEvent.EventType.FLUSH);
244+
}
245+
}, 0, flushInterval, TimeUnit.MILLISECONDS);
246+
}
247+
248+
private void startLogTimer(int interval) {
249+
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
250+
@Override
251+
public void run() {
252+
StringBuilder ringBufferRemain = new StringBuilder("RingBuffer Remain: ");
253+
for (ReaderBucket bucket : buckets) {
254+
ringBufferRemain.append(bucket.getRingBuffer().remainingCapacity());
255+
ringBufferRemain.append(", ");
256+
}
257+
logger.debug(ringBufferRemain.toString());
258+
}
259+
}, 0, interval, TimeUnit.MILLISECONDS);
260+
}
261+
262+
private void checkPrimaryKeyWithTable(String tableName, PrimaryKey primaryKey) {
263+
if (metaMap.containsKey(tableName) && metaMap.get(tableName) == null) {
264+
throw new ClientException("The table : {" + tableName + "} does not exist.");
265+
}
266+
267+
if (!metaMap.containsKey(tableName)) {
268+
try {
269+
DescribeTableResponse response = ots.asSyncClient().describeTable(new DescribeTableRequest(tableName));
270+
metaMap.put(tableName, response.getTableMeta());
271+
} catch (TableStoreException e) {
272+
metaMap.put(tableName, null);
273+
throw new ClientException("The table : {" + tableName + "} does not exist.");
274+
}
275+
}
276+
ReaderUtils.checkTableMeta(metaMap.get(tableName), primaryKey);
277+
}
278+
279+
public ReaderStatistics getStatistics() {
280+
return statistics;
281+
}
282+
283+
public TableStoreReaderConfig getConfig() {
284+
return config;
285+
}
286+
}

src/main/java/com/alicloud/openservices/tablestore/DefaultTableStoreTimeseriesWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.alicloud.openservices.tablestore;
22

3+
import com.alicloud.openservices.tablestore.core.ResourceManager;
4+
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentialProvider;
35
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
46
import com.alicloud.openservices.tablestore.core.utils.Preconditions;
57
import com.alicloud.openservices.tablestore.model.timeseries.*;
@@ -108,7 +110,7 @@ public DefaultTableStoreTimeseriesWriter(
108110
cc.setRetryStrategy(new CertainCodeRetryStrategy());
109111
}
110112

111-
this.ots = new AsyncTimeseriesClient(endpoint, credentials.getAccessKeyId(), credentials.getAccessKeySecret(), instanceName, cc, credentials.getSecurityToken());
113+
this.ots = new AsyncTimeseriesClient(endpoint, new DefaultCredentialProvider(credentials), instanceName, cc, new ResourceManager(cc, null));
112114
this.timeseriesWriterConfig = config;
113115
this.resultCallback = resultCallback;
114116
this.executor = this.createThreadPool(config);
@@ -138,7 +140,7 @@ public DefaultTableStoreTimeseriesWriter(
138140
TableStoreCallback<TimeseriesTableRow, TimeseriesRowResult> resultCallback) {
139141
this.allowDuplicatePkInBatchRequest = true;
140142
this.timeseriesWriterHandleStatistics = new TimeseriesWriterHandleStatistics();
141-
this.ots = new AsyncTimeseriesClient(endpoint, credentials.getAccessKeyId(), credentials.getAccessKeySecret(), instanceName, cc, credentials.getSecurityToken());
143+
this.ots = new AsyncTimeseriesClient(endpoint, new DefaultCredentialProvider(credentials), instanceName, cc, new ResourceManager(cc, null));
142144
this.timeseriesWriterConfig = config;
143145
this.resultCallback = resultCallback;
144146
this.executor = this.createThreadPool(config);

src/main/java/com/alicloud/openservices/tablestore/DefaultTableStoreWriter.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.alicloud.openservices.tablestore;
22

3+
import com.alicloud.openservices.tablestore.core.ResourceManager;
4+
import com.alicloud.openservices.tablestore.core.auth.DefaultCredentialProvider;
35
import com.alicloud.openservices.tablestore.core.auth.ServiceCredentials;
46
import com.alicloud.openservices.tablestore.core.utils.ParamChecker;
57
import com.alicloud.openservices.tablestore.core.utils.Preconditions;
@@ -105,7 +107,7 @@ public DefaultTableStoreWriter(
105107
default:
106108
cc.setRetryStrategy(new CertainCodeRetryStrategy());
107109
}
108-
this.ots = new AsyncClient(endpoint, credentials.getAccessKeyId(), credentials.getAccessKeySecret(), instanceName, cc, credentials.getSecurityToken());
110+
this.ots = new AsyncClient(endpoint, new DefaultCredentialProvider(credentials), instanceName, cc, new ResourceManager(cc, null));
109111
this.tableName = tableName;
110112
this.writerConfig = config;
111113
this.callback = null;
@@ -129,7 +131,7 @@ public DefaultTableStoreWriter(
129131
TableStoreCallback<RowChange, RowWriteResult> resultCallback) {
130132
Preconditions.checkArgument(tableName != null && !tableName.isEmpty(), "The table name can not be null or empty.");
131133
this.writerStatistics = new WriterHandleStatistics();
132-
this.ots = new AsyncClient(endpoint, credentials.getAccessKeyId(), credentials.getAccessKeySecret(), instanceName, cc, credentials.getSecurityToken());
134+
this.ots = new AsyncClient(endpoint, new DefaultCredentialProvider(credentials), instanceName, cc, new ResourceManager(cc, null));
133135
this.tableName = tableName;
134136
this.writerConfig = config;
135137
this.callback = null;

0 commit comments

Comments
 (0)