1
2
3
4 package org.jscsi.target.storage;
5
6
7 import static com.google.common.base.Preconditions.checkState;
8
9 import java.io.File;
10 import java.io.FileNotFoundException;
11 import java.io.IOException;
12 import java.security.InvalidKeyException;
13 import java.security.Key;
14 import java.security.NoSuchAlgorithmException;
15 import java.util.Map;
16 import java.util.Properties;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.CompletionService;
19 import java.util.concurrent.ConcurrentHashMap;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.ExecutorCompletionService;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.Future;
25
26 import javax.crypto.Cipher;
27 import javax.crypto.NoSuchPaddingException;
28 import javax.crypto.spec.SecretKeySpec;
29
30 import org.jclouds.ContextBuilder;
31 import org.jclouds.blobstore.BlobStore;
32 import org.jclouds.blobstore.BlobStoreContext;
33 import org.jclouds.blobstore.domain.Blob;
34 import org.jclouds.domain.Location;
35 import org.jclouds.filesystem.reference.FilesystemConstants;
36
37 import com.google.common.annotations.Beta;
38 import com.google.common.cache.Cache;
39 import com.google.common.cache.CacheBuilder;
40 import com.google.common.io.ByteArrayDataOutput;
41 import com.google.common.io.ByteStreams;
42
43
44
45
46
47
48
49
50
51 @Beta
52 public class JCloudsStorageModule implements IStorageModule {
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79 public static final int BLOCK_IN_CLUSTER = 512;
80
81 private static final int BUCKETS_TO_PREFETCH = 3;
82
83 private static final boolean ENCRYPT = false;
84 private static final String ALGO = "AES";
85 private static byte[] keyValue = new byte[] { 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k' };
86 private static final Key KEY = new SecretKeySpec(keyValue, "AES");
87
88
89 public final static int SIZE_PER_BUCKET = BLOCK_IN_CLUSTER * VIRTUAL_BLOCK_SIZE;
90
91 public final static String CONTAINERNAME = "bench53473ResourcegraveISCSI9284";
92
93 private final long mNumberOfCluster;
94
95 private final String mContainerName;
96
97 private final BlobStore mStore;
98
99 private final BlobStoreContext mContext;
100
101 private final Cache<Integer , byte[]> mByteCache;
102
103 private int lastIndexWritten;
104 private byte[] lastBlobWritten;
105
106 private final CompletionService<Integer> mWriterService;
107 private final CompletionService<Map.Entry<Integer , byte[]>> mReaderService;
108 private final ConcurrentHashMap<Integer , Future<Integer>> mRunningWriteTasks;
109 private final ConcurrentHashMap<Integer , Future<Map.Entry<Integer , byte[]>>> mRunningReadTasks;
110
111
112
113
114
115
116
117
118
119 public JCloudsStorageModule (final long pSizeInBlocks, final File pFile) {
120
121
122
123
124 mNumberOfCluster = 1048576 / BLOCK_IN_CLUSTER;
125 mContainerName = CONTAINERNAME;
126 String[] credentials = getCredentials();
127 if (credentials.length == 0) {
128 Properties properties = new Properties();
129 properties.setProperty(FilesystemConstants.PROPERTY_BASEDIR, pFile.getAbsolutePath());
130 mContext = ContextBuilder.newBuilder("filesystem").overrides(properties).credentials("testUser", "testPass").buildView(BlobStoreContext.class);
131 } else {
132 mContext = ContextBuilder.newBuilder("aws-s3").credentials(getCredentials()[0], getCredentials()[1]).buildView(BlobStoreContext.class);
133 }
134
135
136 mStore = mContext.getBlobStore();
137 if (!mStore.containerExists(mContainerName)) {
138 Location locToSet = null;
139 for (Location loc : mStore.listAssignableLocations()) {
140 if (loc.getId().equals("eu-west-1")) {
141 locToSet = loc;
142 break;
143 }
144 }
145
146 mStore.createContainerInLocation(locToSet, mContainerName);
147 }
148
149 final ExecutorService writerService = Executors.newFixedThreadPool(20);
150 final ExecutorService readerService = Executors.newFixedThreadPool(20);
151 mRunningWriteTasks = new ConcurrentHashMap<Integer , Future<Integer>>();
152 mRunningReadTasks = new ConcurrentHashMap<Integer , Future<Map.Entry<Integer , byte[]>>>();
153
154 mReaderService = new ExecutorCompletionService<Map.Entry<Integer , byte[]>>(readerService);
155 final Thread readHashmapCleaner = new Thread(new ReadFutureCleaner());
156 readHashmapCleaner.setDaemon(true);
157 readHashmapCleaner.start();
158
159 mWriterService = new ExecutorCompletionService<Integer>(writerService);
160 final Thread writeHashmapCleaner = new Thread(new WriteFutureCleaner());
161 writeHashmapCleaner.setDaemon(true);
162 writeHashmapCleaner.start();
163
164 mByteCache = CacheBuilder.newBuilder().maximumSize(100).build();
165 lastIndexWritten = -1;
166 }
167
168
169
170
171 @Override
172 public int checkBounds (long logicalBlockAddress, int transferLengthInBlocks) {
173 if (logicalBlockAddress < 0 || logicalBlockAddress >= getSizeInBlocks()) {
174 return 1;
175 } else
176
177
178
179 if (transferLengthInBlocks < 0 || logicalBlockAddress + transferLengthInBlocks > getSizeInBlocks()) {
180 return 2;
181 } else {
182 return 0;
183 }
184 }
185
186
187
188
189 @Override
190 public long getSizeInBlocks () {
191 return mNumberOfCluster * BLOCK_IN_CLUSTER;
192 }
193
194
195
196
197 @Override
198 public synchronized void read (byte[] bytes, long storageIndex) throws IOException {
199
200 final int bucketIndex = (int) (storageIndex / SIZE_PER_BUCKET);
201 final int bucketOffset = (int) (storageIndex % SIZE_PER_BUCKET);
202 try {
203 storeBucket(-1, null);
204
205
206
207
208
209
210
211 byte[] data = mByteCache.getIfPresent(bucketIndex);
212 if (data == null) {
213 data = getAndprefetchBuckets(bucketIndex);
214 }
215
216 final ByteArrayDataOutput output = ByteStreams.newDataOutput(bytes.length);
217 int length = -1;
218 if (bucketOffset + bytes.length > SIZE_PER_BUCKET) {
219 length = SIZE_PER_BUCKET - bucketOffset;
220 } else {
221 length = bytes.length;
222 }
223
224 output.write(data, bucketOffset, length);
225
226 if (bucketOffset + bytes.length > SIZE_PER_BUCKET) {
227 data = mByteCache.getIfPresent(bucketIndex + 1);
228 if (data == null) {
229 data = getAndprefetchBuckets(bucketIndex + 1);
230 }
231
232 output.write(data, 0, bytes.length - (SIZE_PER_BUCKET - bucketOffset));
233 }
234
235 System.arraycopy(output.toByteArray(), 0, bytes, 0, bytes.length);
236 } catch (ExecutionException | InterruptedException exc) {
237 throw new IOException(exc);
238 }
239 }
240
241 private final byte[] getAndprefetchBuckets (final int pBucketStartId) throws InterruptedException , ExecutionException {
242 byte[] returnval = null;
243 Future<Map.Entry<Integer , byte[]>> startTask = null;
244 for (int i = pBucketStartId; i < pBucketStartId + BUCKETS_TO_PREFETCH; i++) {
245 Future<Map.Entry<Integer , byte[]>> currentTask = mRunningReadTasks.remove(i);
246 if (currentTask == null) {
247 currentTask = mReaderService.submit(new ReadTask(i));
248 mRunningReadTasks.put(i, currentTask);
249 }
250 if (i == pBucketStartId) {
251 startTask = currentTask;
252 }
253 }
254 returnval = startTask.get().getValue();
255 return returnval;
256
257 }
258
259 private final void storeBucket (int pBucketId, byte[] pData) throws InterruptedException , ExecutionException {
260 if (lastIndexWritten != pBucketId && lastBlobWritten != null) {
261 Future<Integer> writeTask = mRunningWriteTasks.remove(lastIndexWritten);
262 if (writeTask != null) {
263 writeTask.cancel(false);
264 }
265 mRunningWriteTasks.put(lastIndexWritten, mWriterService.submit(new WriteTask(lastBlobWritten, lastIndexWritten)));
266 }
267 lastIndexWritten = pBucketId;
268 lastBlobWritten = pData;
269 }
270
271
272
273
274
275
276 @Override
277 public synchronized void write (byte[] bytes, long storageIndex) throws IOException {
278 final int bucketIndex = (int) (storageIndex / SIZE_PER_BUCKET);
279 final int bucketOffset = (int) (storageIndex % SIZE_PER_BUCKET);
280 try {
281
282
283
284
285
286
287
288 byte[] data = mByteCache.getIfPresent(bucketIndex);
289 if (data == null) {
290 data = getAndprefetchBuckets(bucketIndex);
291 }
292
293 System.arraycopy(bytes, 0, data, bucketOffset, bytes.length + bucketOffset > SIZE_PER_BUCKET ? SIZE_PER_BUCKET - bucketOffset
294 : bytes.length);
295 storeBucket(bucketIndex, data);
296 mByteCache.put(bucketIndex, data);
297
298 if (bucketOffset + bytes.length > SIZE_PER_BUCKET) {
299 data = mByteCache.getIfPresent(bucketIndex + 1);
300 if (data == null) {
301 data = getAndprefetchBuckets(bucketIndex + 1);
302 }
303
304 System.arraycopy(bytes, SIZE_PER_BUCKET - bucketOffset, data, 0, bytes.length - (SIZE_PER_BUCKET - bucketOffset));
305 storeBucket(bucketIndex + 1, data);
306 mByteCache.put(bucketIndex + 1, data);
307 }
308 } catch (ExecutionException | InterruptedException exc) {
309 throw new IOException(exc);
310 }
311 }
312
313
314
315
316 @Override
317 public void close () throws IOException {
318 mContext.close();
319 }
320
321
322
323
324
325
326 private static String[] getCredentials () {
327 return new String[0];
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344 }
345
346
347
348
349
350
351
352 class ReadTask implements Callable<Map.Entry<Integer , byte[]>> {
353
354 final Cipher mCipher;
355
356
357
358
359 final int mBucketId;
360
361 ReadTask (final int pBucketId) {
362 if (ENCRYPT) {
363 try {
364 mCipher = Cipher.getInstance(ALGO);
365 mCipher.init(Cipher.DECRYPT_MODE, KEY);
366 } catch (NoSuchAlgorithmException | NoSuchPaddingException | InvalidKeyException e) {
367 throw new RuntimeException(e);
368 }
369 } else {
370 mCipher = null;
371 }
372 this.mBucketId = pBucketId;
373 }
374
375 @Override
376 public Map.Entry<Integer , byte[]> call () throws Exception {
377 byte[] data = mByteCache.getIfPresent(mBucketId);
378 if (data == null) {
379
380 Blob blob = mStore.getBlob(mContainerName, Integer.toString(mBucketId));
381 if (blob == null) {
382 data = new byte[SIZE_PER_BUCKET];
383
384
385
386
387 } else {
388 data = ByteStreams.toByteArray(blob.getPayload().getInput());
389
390
391
392
393 while (data.length < SIZE_PER_BUCKET) {
394 blob = mStore.getBlob(mContainerName, Integer.toString(mBucketId));
395 data = ByteStreams.toByteArray(blob.getPayload().getInput());
396
397
398
399
400
401 }
402 if (ENCRYPT) {
403 data = mCipher.doFinal(data);
404 }
405 }
406 }
407 if (data.length < SIZE_PER_BUCKET) {
408 System.out.println(data.length);
409
410
411 }
412 mByteCache.put(mBucketId, data);
413 final byte[] finalizedData = data;
414 return new Map.Entry<Integer , byte[]>() {
415 @Override
416 public byte[] setValue (byte[] value) {
417 throw new UnsupportedOperationException();
418 }
419
420 @Override
421 public byte[] getValue () {
422 return finalizedData;
423 }
424
425 @Override
426 public Integer getKey () {
427 return mBucketId;
428 }
429 };
430 }
431 }
432
433
434
435
436
437
438
439 class WriteTask implements Callable<Integer> {
440
441
442
443 final byte[] mData;
444 final int mBucketIndex;
445 final Cipher mCipher;
446
447 WriteTask (byte[] pData, int pBucketIndex) {
448 checkState(pData.length == SIZE_PER_BUCKET);
449 if (ENCRYPT) {
450 try {
451 mCipher = Cipher.getInstance(ALGO);
452 mCipher.init(Cipher.ENCRYPT_MODE, KEY);
453 } catch (NoSuchAlgorithmException | NoSuchPaddingException | InvalidKeyException e) {
454 throw new RuntimeException(e);
455 }
456 } else {
457 mCipher = null;
458 }
459 this.mData = pData;
460 this.mBucketIndex = pBucketIndex;
461 }
462
463 @Override
464 public Integer call () throws Exception {
465 boolean finished = false;
466
467 while (!finished) {
468 try {
469
470 byte[] data = mData;
471 if (ENCRYPT) {
472 data = mCipher.doFinal(mData);
473 }
474 Blob blob = mStore.blobBuilder(Integer.toString(mBucketIndex)).build();
475 blob.setPayload(data);
476 mStore.putBlob(mContainerName, blob);
477
478
479
480
481
482
483 } catch (Exception exc) {
484
485 }
486 finished = true;
487 }
488
489 return mBucketIndex;
490 }
491 }
492
493 class ReadFutureCleaner extends Thread {
494
495 public void run () {
496 while (true) {
497 try {
498 Future<Map.Entry<Integer , byte[]>> element = mReaderService.take();
499 if (!element.isCancelled()) {
500 mRunningReadTasks.remove(element.get().getKey());
501 }
502 } catch (Exception exc) {
503 throw new RuntimeException(exc);
504 }
505 }
506 }
507 }
508
509 class WriteFutureCleaner extends Thread {
510
511 public void run () {
512 while (true) {
513 try {
514 Future<Integer> element = mWriterService.take();
515 if (!element.isCancelled()) {
516 mRunningWriteTasks.remove(element.get());
517 }
518 } catch (Exception exc) {
519 throw new RuntimeException(exc);
520 }
521 }
522
523 }
524 }
525
526 }