View Javadoc

1   /**
2    * 
3    */
4   package org.jscsi.target.storage;
5   
6   
7   import static com.google.common.base.Preconditions.checkState;
8   
9   import java.io.File;
10  import java.io.FileNotFoundException;
11  import java.io.IOException;
12  import java.security.InvalidKeyException;
13  import java.security.Key;
14  import java.security.NoSuchAlgorithmException;
15  import java.util.Map;
16  import java.util.Properties;
17  import java.util.concurrent.Callable;
18  import java.util.concurrent.CompletionService;
19  import java.util.concurrent.ConcurrentHashMap;
20  import java.util.concurrent.ExecutionException;
21  import java.util.concurrent.ExecutorCompletionService;
22  import java.util.concurrent.ExecutorService;
23  import java.util.concurrent.Executors;
24  import java.util.concurrent.Future;
25  
26  import javax.crypto.Cipher;
27  import javax.crypto.NoSuchPaddingException;
28  import javax.crypto.spec.SecretKeySpec;
29  
30  import org.jclouds.ContextBuilder;
31  import org.jclouds.blobstore.BlobStore;
32  import org.jclouds.blobstore.BlobStoreContext;
33  import org.jclouds.blobstore.domain.Blob;
34  import org.jclouds.domain.Location;
35  import org.jclouds.filesystem.reference.FilesystemConstants;
36  
37  import com.google.common.annotations.Beta;
38  import com.google.common.cache.Cache;
39  import com.google.common.cache.CacheBuilder;
40  import com.google.common.io.ByteArrayDataOutput;
41  import com.google.common.io.ByteStreams;
42  
43  
44  /**
45   * JClouds-Binding to store blocks as buckets in clouds-backends. This class utilizes caching as well as multithreaded
46   * writing to improve performance.
47   * 
48   * @author Sebastian Graf, University of Konstanz
49   * 
50   */
51  @Beta
52  public class JCloudsStorageModule implements IStorageModule {
53  
54      // // START DEBUG CODE
55      // private final static File writeFile = new
56      // File("/Users/sebi/Desktop/writeaccess.txt");
57      // private final static File readFile = new
58      // File("/Users/sebi/Desktop/readaccess.txt");
59      // private final static File uploadFile = new
60      // File("/Users/sebi/Desktop/uploadaccess.txt");
61      // private final static File downloadFile = new
62      // File("/Users/sebi/Desktop/downloadaccess.txt");
63      // static final FileWriter writer;
64      // static final FileWriter reader;
65      // static final FileWriter upload;
66      // static final FileWriter download;
67      // static {
68      // try {
69      // writer = new FileWriter(writeFile);
70      // reader = new FileWriter(readFile);
71      // upload = new FileWriter(uploadFile);
72      // download = new FileWriter(downloadFile);
73      // } catch (IOException e) {
74      // throw new RuntimeException(e);
75      // }
76      // }
77  
78      /** Number of Blocks in one Cluster. */
79      public static final int BLOCK_IN_CLUSTER = 512;
80  
81      private static final int BUCKETS_TO_PREFETCH = 3;
82  
83      private static final boolean ENCRYPT = false;
84      private static final String ALGO = "AES";
85      private static byte[] keyValue = new byte[] { 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k', 'k' };
86      private static final Key KEY = new SecretKeySpec(keyValue, "AES");
87  
88      /** Number of Bytes in Bucket. */
89      public final static int SIZE_PER_BUCKET = BLOCK_IN_CLUSTER * VIRTUAL_BLOCK_SIZE;
90  
91      public final static String CONTAINERNAME = "bench53473ResourcegraveISCSI9284";
92  
93      private final long mNumberOfCluster;
94  
95      private final String mContainerName;
96  
97      private final BlobStore mStore;
98  
99      private final BlobStoreContext mContext;
100 
101     private final Cache<Integer , byte[]> mByteCache;
102 
103     private int lastIndexWritten;
104     private byte[] lastBlobWritten;
105 
106     private final CompletionService<Integer> mWriterService;
107     private final CompletionService<Map.Entry<Integer , byte[]>> mReaderService;
108     private final ConcurrentHashMap<Integer , Future<Integer>> mRunningWriteTasks;
109     private final ConcurrentHashMap<Integer , Future<Map.Entry<Integer , byte[]>>> mRunningReadTasks;
110 
111     /**
112      * Creates a new {@link JCloudsStorageModule} backed by the specified file. If no such file exists, a
113      * {@link FileNotFoundException} will be thrown.
114      * 
115      * @param pSizeInBlocks blocksize for this module
116      * @param pFile local storage, not used over here
117      * 
118      */
119     public JCloudsStorageModule (final long pSizeInBlocks, final File pFile) {
120         // number * 512 = size in bytes
121         // 4gig, bench for iozone and bonnie++
122         // mNumberOfCluster = 8388608 / BLOCK_IN_CLUSTER;
123         // 512m, bench for fio
124         mNumberOfCluster = 1048576 / BLOCK_IN_CLUSTER;
125         mContainerName = CONTAINERNAME;
126         String[] credentials = getCredentials();
127         if (credentials.length == 0) {
128             Properties properties = new Properties();
129             properties.setProperty(FilesystemConstants.PROPERTY_BASEDIR, pFile.getAbsolutePath());
130             mContext = ContextBuilder.newBuilder("filesystem").overrides(properties).credentials("testUser", "testPass").buildView(BlobStoreContext.class);
131         } else {
132             mContext = ContextBuilder.newBuilder("aws-s3").credentials(getCredentials()[0], getCredentials()[1]).buildView(BlobStoreContext.class);
133         }
134 
135         // Create Container
136         mStore = mContext.getBlobStore();
137         if (!mStore.containerExists(mContainerName)) {
138             Location locToSet = null;
139             for (Location loc : mStore.listAssignableLocations()) {
140                 if (loc.getId().equals("eu-west-1")) {
141                     locToSet = loc;
142                     break;
143                 }
144             }
145             // System.out.println(locToSet);
146             mStore.createContainerInLocation(locToSet, mContainerName);
147         }
148 
149         final ExecutorService writerService = Executors.newFixedThreadPool(20);
150         final ExecutorService readerService = Executors.newFixedThreadPool(20);
151         mRunningWriteTasks = new ConcurrentHashMap<Integer , Future<Integer>>();
152         mRunningReadTasks = new ConcurrentHashMap<Integer , Future<Map.Entry<Integer , byte[]>>>();
153 
154         mReaderService = new ExecutorCompletionService<Map.Entry<Integer , byte[]>>(readerService);
155         final Thread readHashmapCleaner = new Thread(new ReadFutureCleaner());
156         readHashmapCleaner.setDaemon(true);
157         readHashmapCleaner.start();
158 
159         mWriterService = new ExecutorCompletionService<Integer>(writerService);
160         final Thread writeHashmapCleaner = new Thread(new WriteFutureCleaner());
161         writeHashmapCleaner.setDaemon(true);
162         writeHashmapCleaner.start();
163 
164         mByteCache = CacheBuilder.newBuilder().maximumSize(100).build();
165         lastIndexWritten = -1;
166     }
167 
168     /**
169      * {@inheritDoc}
170      */
171     @Override
172     public int checkBounds (long logicalBlockAddress, int transferLengthInBlocks) {
173         if (logicalBlockAddress < 0 || logicalBlockAddress >= getSizeInBlocks()) {
174             return 1;
175         } else
176         // if the logical block address is in bounds but the transferlength
177         // either exceeds
178         // the device size or is faulty return 2
179         if (transferLengthInBlocks < 0 || logicalBlockAddress + transferLengthInBlocks > getSizeInBlocks()) {
180             return 2;
181         } else {
182             return 0;
183         }
184     }
185 
186     /**
187      * {@inheritDoc}
188      */
189     @Override
190     public long getSizeInBlocks () {
191         return mNumberOfCluster * BLOCK_IN_CLUSTER;
192     }
193 
194     /**
195      * {@inheritDoc}
196      */
197     @Override
198     public synchronized void read (byte[] bytes, long storageIndex) throws IOException {
199 
200         final int bucketIndex = (int) (storageIndex / SIZE_PER_BUCKET);
201         final int bucketOffset = (int) (storageIndex % SIZE_PER_BUCKET);
202         try {
203             storeBucket(-1, null);
204 
205             // // DEBUG CODE
206             // reader.write(bucketIndex + "," + storageIndex + "," +
207             // bucketOffset + "," + bytes.length +
208             // "\n");
209             // reader.flush();
210 
211             byte[] data = mByteCache.getIfPresent(bucketIndex);
212             if (data == null) {
213                 data = getAndprefetchBuckets(bucketIndex);
214             }
215 
216             final ByteArrayDataOutput output = ByteStreams.newDataOutput(bytes.length);
217             int length = -1;
218             if (bucketOffset + bytes.length > SIZE_PER_BUCKET) {
219                 length = SIZE_PER_BUCKET - bucketOffset;
220             } else {
221                 length = bytes.length;
222             }
223 
224             output.write(data, bucketOffset, length);
225 
226             if (bucketOffset + bytes.length > SIZE_PER_BUCKET) {
227                 data = mByteCache.getIfPresent(bucketIndex + 1);
228                 if (data == null) {
229                     data = getAndprefetchBuckets(bucketIndex + 1);
230                 }
231 
232                 output.write(data, 0, bytes.length - (SIZE_PER_BUCKET - bucketOffset));
233             }
234 
235             System.arraycopy(output.toByteArray(), 0, bytes, 0, bytes.length);
236         } catch (ExecutionException | InterruptedException exc) {
237             throw new IOException(exc);
238         }
239     }
240 
241     private final byte[] getAndprefetchBuckets (final int pBucketStartId) throws InterruptedException , ExecutionException {
242         byte[] returnval = null;
243         Future<Map.Entry<Integer , byte[]>> startTask = null;
244         for (int i = pBucketStartId; i < pBucketStartId + BUCKETS_TO_PREFETCH; i++) {
245             Future<Map.Entry<Integer , byte[]>> currentTask = mRunningReadTasks.remove(i);
246             if (currentTask == null) {
247                 currentTask = mReaderService.submit(new ReadTask(i));
248                 mRunningReadTasks.put(i, currentTask);
249             }
250             if (i == pBucketStartId) {
251                 startTask = currentTask;
252             }
253         }
254         returnval = startTask.get().getValue();
255         return returnval;
256 
257     }
258 
259     private final void storeBucket (int pBucketId, byte[] pData) throws InterruptedException , ExecutionException {
260         if (lastIndexWritten != pBucketId && lastBlobWritten != null) {
261             Future<Integer> writeTask = mRunningWriteTasks.remove(lastIndexWritten);
262             if (writeTask != null) {
263                 writeTask.cancel(false);
264             }
265             mRunningWriteTasks.put(lastIndexWritten, mWriterService.submit(new WriteTask(lastBlobWritten, lastIndexWritten)));
266         }
267         lastIndexWritten = pBucketId;
268         lastBlobWritten = pData;
269     }
270 
271     /**
272      * {@inheritDoc}
273      * 
274      * @throws Exception
275      */
276     @Override
277     public synchronized void write (byte[] bytes, long storageIndex) throws IOException {
278         final int bucketIndex = (int) (storageIndex / SIZE_PER_BUCKET);
279         final int bucketOffset = (int) (storageIndex % SIZE_PER_BUCKET);
280         try {
281 
282             // // DEBUG CODE
283             // writer.write(bucketIndex + "," + storageIndex + "," +
284             // bucketOffset + "," + bytes.length +
285             // "\n");
286             // writer.flush();
287 
288             byte[] data = mByteCache.getIfPresent(bucketIndex);
289             if (data == null) {
290                 data = getAndprefetchBuckets(bucketIndex);
291             }
292 
293             System.arraycopy(bytes, 0, data, bucketOffset, bytes.length + bucketOffset > SIZE_PER_BUCKET ? SIZE_PER_BUCKET - bucketOffset
294                     : bytes.length);
295             storeBucket(bucketIndex, data);
296             mByteCache.put(bucketIndex, data);
297 
298             if (bucketOffset + bytes.length > SIZE_PER_BUCKET) {
299                 data = mByteCache.getIfPresent(bucketIndex + 1);
300                 if (data == null) {
301                     data = getAndprefetchBuckets(bucketIndex + 1);
302                 }
303 
304                 System.arraycopy(bytes, SIZE_PER_BUCKET - bucketOffset, data, 0, bytes.length - (SIZE_PER_BUCKET - bucketOffset));
305                 storeBucket(bucketIndex + 1, data);
306                 mByteCache.put(bucketIndex + 1, data);
307             }
308         } catch (ExecutionException | InterruptedException exc) {
309             throw new IOException(exc);
310         }
311     }
312 
313     /**
314      * {@inheritDoc}
315      */
316     @Override
317     public void close () throws IOException {
318         mContext.close();
319     }
320 
321     /**
322      * Getting credentials for aws from homedir/.credentials
323      * 
324      * @return a two-dimensional String[] with login and password
325      */
326     private static String[] getCredentials () {
327         return new String[0];
328         // File userStore = new File(System.getProperty("user.home"),
329         // new StringBuilder(".credentials").append(File.separator)
330         // .append("aws.properties").toString());
331         // if (!userStore.exists()) {
332         // return new String[0];
333         // } else {
334         // Properties props = new Properties();
335         // try {
336         // props.load(new FileReader(userStore));
337         // return new String[] { props.getProperty("access"),
338         // props.getProperty("secret") };
339         //
340         // } catch (IOException exc) {
341         // throw new RuntimeException(exc);
342         // }
343         // }
344     }
345 
346     /**
347      * Single task to write data to the cloud.
348      * 
349      * @author Sebastian Graf, University of Konstanz
350      * 
351      */
352     class ReadTask implements Callable<Map.Entry<Integer , byte[]>> {
353 
354         final Cipher mCipher;
355 
356         /**
357          * Bucket ID to be read.
358          */
359         final int mBucketId;
360 
361         ReadTask (final int pBucketId) {
362             if (ENCRYPT) {
363                 try {
364                     mCipher = Cipher.getInstance(ALGO);
365                     mCipher.init(Cipher.DECRYPT_MODE, KEY);
366                 } catch (NoSuchAlgorithmException | NoSuchPaddingException | InvalidKeyException e) {
367                     throw new RuntimeException(e);
368                 }
369             } else {
370                 mCipher = null;
371             }
372             this.mBucketId = pBucketId;
373         }
374 
375         @Override
376         public Map.Entry<Integer , byte[]> call () throws Exception {
377             byte[] data = mByteCache.getIfPresent(mBucketId);
378             if (data == null) {
379                 // long time = System.currentTimeMillis();
380                 Blob blob = mStore.getBlob(mContainerName, Integer.toString(mBucketId));
381                 if (blob == null) {
382                     data = new byte[SIZE_PER_BUCKET];
383                     // // DEBUG CODE
384                     // download.write(Integer.toString(mBucketId) + ", empty, "
385                     // + (System.currentTimeMillis() - time) + "\n");
386                     // download.flush();
387                 } else {
388                     data = ByteStreams.toByteArray(blob.getPayload().getInput());
389                     // download.write(Integer.toString(mBucketId) + "," +
390                     // data.length + " , "
391                     // + (System.currentTimeMillis() - time) + "\n");
392                     // download.flush();
393                     while (data.length < SIZE_PER_BUCKET) {
394                         blob = mStore.getBlob(mContainerName, Integer.toString(mBucketId));
395                         data = ByteStreams.toByteArray(blob.getPayload().getInput());
396                         // // DEBUG CODE
397                         // download.write(Integer.toString(mBucketId) + "," +
398                         // data.length + " , "
399                         // + (System.currentTimeMillis() - time) + "\n");
400                         // download.flush();
401                     }
402                     if (ENCRYPT) {
403                         data = mCipher.doFinal(data);
404                     }
405                 }
406             }
407             if (data.length < SIZE_PER_BUCKET) {
408                 System.out.println(data.length);
409                 // throw new IllegalStateEception("Bucket " + mBucketId
410                 // +" invalid");
411             }
412             mByteCache.put(mBucketId, data);
413             final byte[] finalizedData = data;
414             return new Map.Entry<Integer , byte[]>() {
415                 @Override
416                 public byte[] setValue (byte[] value) {
417                     throw new UnsupportedOperationException();
418                 }
419 
420                 @Override
421                 public byte[] getValue () {
422                     return finalizedData;
423                 }
424 
425                 @Override
426                 public Integer getKey () {
427                     return mBucketId;
428                 }
429             };
430         }
431     }
432 
433     /**
434      * Single task to write data to the cloud.
435      * 
436      * @author Sebastian Graf, University of Konstanz
437      * 
438      */
439     class WriteTask implements Callable<Integer> {
440         /**
441          * The bytes to buffer.
442          */
443         final byte[] mData;
444         final int mBucketIndex;
445         final Cipher mCipher;
446 
447         WriteTask (byte[] pData, int pBucketIndex) {
448             checkState(pData.length == SIZE_PER_BUCKET);
449             if (ENCRYPT) {
450                 try {
451                     mCipher = Cipher.getInstance(ALGO);
452                     mCipher.init(Cipher.ENCRYPT_MODE, KEY);
453                 } catch (NoSuchAlgorithmException | NoSuchPaddingException | InvalidKeyException e) {
454                     throw new RuntimeException(e);
455                 }
456             } else {
457                 mCipher = null;
458             }
459             this.mData = pData;
460             this.mBucketIndex = pBucketIndex;
461         }
462 
463         @Override
464         public Integer call () throws Exception {
465             boolean finished = false;
466 
467             while (!finished) {
468                 try {
469                     // long time = System.currentTimeMillis();
470                     byte[] data = mData;
471                     if (ENCRYPT) {
472                         data = mCipher.doFinal(mData);
473                     }
474                     Blob blob = mStore.blobBuilder(Integer.toString(mBucketIndex)).build();
475                     blob.setPayload(data);
476                     mStore.putBlob(mContainerName, blob);
477                     // // DEBUG CODE
478                     // upload.write(Integer.toString(mBucketIndex) + ", " +
479                     // (System.currentTimeMillis() -
480                     // time)
481                     // + "\n");
482                     // upload.flush();
483                 } catch (Exception exc) {
484 
485                 }
486                 finished = true;
487             }
488 
489             return mBucketIndex;
490         }
491     }
492 
493     class ReadFutureCleaner extends Thread {
494 
495         public void run () {
496             while (true) {
497                 try {
498                     Future<Map.Entry<Integer , byte[]>> element = mReaderService.take();
499                     if (!element.isCancelled()) {
500                         mRunningReadTasks.remove(element.get().getKey());
501                     }
502                 } catch (Exception exc) {
503                     throw new RuntimeException(exc);
504                 }
505             }
506         }
507     }
508 
509     class WriteFutureCleaner extends Thread {
510 
511         public void run () {
512             while (true) {
513                 try {
514                     Future<Integer> element = mWriterService.take();
515                     if (!element.isCancelled()) {
516                         mRunningWriteTasks.remove(element.get());
517                     }
518                 } catch (Exception exc) {
519                     throw new RuntimeException(exc);
520                 }
521             }
522 
523         }
524     }
525 
526 }