1 module vibe.aws.s3;
2 
3 import vibe.d;
4 import vibe.core.stream;
5 
6 import vibe.aws.aws;
7 import vibe.aws.credentials;
8 import vibe.aws.sigv4;
9 
10 import std.typecons: Tuple, tuple;
11 
12 enum StorageClass: string
13 {
14     STANDARD = "STANDARD",
15     REDUCED_REDUNDANCY = "REDUCED_REDUNDANCY",
16     GLACIER = "GLACIER"
17 }
18 
19 struct BucketListResult
20 {
21     static struct S3Resource
22     {
23         static struct Owner
24         {
25             string id;
26             string displayName;
27         }
28 
29         string key;
30         string lastModfied;
31         string etag;
32         ulong size;
33         Owner owner;
34         StorageClass storageClass;
35     }
36 
37     string name;
38     string prefix;
39     string marker;
40     string nextMarker;
41     S3Resource[] resources;
42     string[] commonPrefixes;
43     uint maxKeys;
44     bool isTruncated;
45 }
46 
47 auto listFilesRecursive(S3 client, string path = null)
48 {
49     if(!path.empty && !path.endsWith("/"))
50         path ~= "/";
51     return S3Resources(client, null, path);
52 }
53 
54 auto listFiles(S3 client, string path = null)
55 {
56     if(!path.empty && !path.endsWith("/"))
57         path ~= "/";
58     return S3Resources(client, "/", path);
59 }
60 
61 auto listFolders(S3 client, string path = null)
62 {
63     if(!path.empty && !path.endsWith("/"))
64         path ~= "/";
65     return S3Prefixes(client, "/", path);
66 }
67 
68 struct S3Resources
69 {
70     import std.range.primitives;
71     mixin _S3Common;
72 
73     auto front() @property
74     {
75         assert(!empty);
76         return res.resources.front;
77     }
78 
79     auto empty() const @property
80     {
81         return res.resources.empty;
82     }
83 
84     auto popFront()
85     {
86         assert(!empty);
87         res.resources.popFront;
88         if(empty && res.isTruncated)
89         {
90             next;
91         }
92     }
93 
94 }
95 
96 struct S3Prefixes
97 {
98     import std.range.primitives;
99     mixin _S3Common;
100 
101     auto front() @property
102     {
103         assert(!empty);
104         return res.commonPrefixes.front;
105     }
106 
107     auto empty() const @property
108     {
109         return res.commonPrefixes.empty;
110     }
111 
112     auto popFront()
113     {
114         assert(!empty);
115         res.commonPrefixes.popFront;
116         if(empty && res.isTruncated)
117         {
118             next;
119         }
120     }
121 }
122 
123 private mixin template _S3Common()
124 {
125     private S3 client;
126     private BucketListResult res;
127     private string delimiter;
128     private string prefix;
129     private uint maxKeys;
130 
131     private void next()
132     {
133         res = client.list(delimiter, prefix, res.nextMarker, maxKeys);
134     }
135 
136     @disable this();
137 
138     this(S3 client, string delimiter, string prefix, uint maxKeys = 0)
139     {
140         this.client = client;
141         this.delimiter = delimiter;
142         this.prefix = prefix;
143         this.maxKeys = maxKeys;
144         next();
145     }
146 
147     auto save() @property
148     {
149         return this;
150     }
151 }
152 
153 class S3 : RESTClient
154 {
155     private string bucket;
156 
157     this(string bucket, string region, AWSCredentialSource credsSource, ClientConfiguration config = ClientConfiguration())
158     {
159         this.bucket = bucket;
160         enforce(region.length, "AWS region should be defined.");
161         super(bucket ~ ".s3-" ~ region ~ ".amazonaws.com", region, "s3", credsSource, config);
162     }
163 
164     auto list(string delimiter = null, string prefix = null, string marker = null, uint maxKeys = 0)
165     {
166         assert(maxKeys <= 1000);
167 
168         import memutils.all;
169         import std.stdio;
170         import std.conv;
171 
172         InetHeaderMap headers;
173         string[string] queryParameters;
174 
175         if (delimiter !is null)
176             queryParameters["delimiter"] = delimiter;
177 
178         if (prefix !is null)
179             queryParameters["prefix"] = prefix;
180 
181         if (marker !is null)
182             queryParameters["marker"] = marker;
183 
184         if (maxKeys)
185             queryParameters["max-keys"] = maxKeys.to!string;
186 
187         auto resp = doRequest(HTTPMethod.GET, "/", queryParameters, headers);
188         auto response = readXML(resp);
189         resp.dropBody();
190         resp.destroy();
191 
192         BucketListResult result;
193         result.name = response.parseXPath("/ListBucketResult/Name")[0].getCData;
194         result.prefix = response.parseXPath("/ListBucketResult/Prefix")[0].getCData;
195         result.marker = response.parseXPath("/ListBucketResult/Marker")[0].getCData;
196         result.maxKeys = response.parseXPath("/ListBucketResult/MaxKeys")[0].getCData.to!uint;
197         result.isTruncated = response.parseXPath("/ListBucketResult/IsTruncated")[0].getCData.toLower.to!bool;
198 
199         if (result.isTruncated)
200             result.nextMarker = response.parseXPath("/ListBucketResult/NextMarker")[0].getCData;
201 
202         auto entries = response.parseXPath("/ListBucketResult/Contents");
203 
204         result.resources.reserve = 1000;
205         foreach(node; entries)
206         {
207             BucketListResult.S3Resource entry;
208             BucketListResult.S3Resource.Owner owner;
209 
210             entry.key = node.parseXPath("Key")[0].getCData;
211             entry.lastModfied = node.parseXPath("LastModified")[0].getCData;
212             entry.etag = node.parseXPath("ETag")[0].getCData;
213             entry.size = node.parseXPath("Size")[0].getCData.to!ulong;
214             import std.conv;
215             entry.storageClass = node.parseXPath("StorageClass")[0].getCData.to!StorageClass;
216 
217             result.resources.assumeSafeAppend ~= entry;
218         }
219         result.resources.reserve = result.resources.length;
220 
221         result.commonPrefixes.reserve = 1000;
222         entries = response.parseXPath("/ListBucketResult/CommonPrefixes/Prefix");
223         foreach(node; entries)
224             result.commonPrefixes.assumeSafeAppend ~= node.getCData;
225         result.commonPrefixes.reserve = result.commonPrefixes.length;
226 
227         return result;
228     }
229 
230     void upload(
231         string resource,
232         RandomAccessStream input,
233         string contentType = "application/octet-stream",
234         StorageClass storageClass = StorageClass.STANDARD,
235         size_t chunkSize = 512*1024,
236         )
237     {
238         InetHeaderMap headers;
239         headers["Content-Type"] = contentType;
240         headers["x-amz-storage-class"] = storageClass.to!string;
241         string[] signedHeaders = ["x-amz-storage-class"];
242         auto httpResp = doUpload(HTTPMethod.PUT,
243             resource, null, headers, signedHeaders, input, chunkSize);
244         httpResp.dropBody();
245         httpResp.destroy();
246     }
247 
248     /++
249     On_failure: aborts multipart upload.
250     +/
251     void multipartUpload(
252         string resource,
253         scope InputStream input,
254         InetHeaderMap headers = InetHeaderMap.init,
255         string contentType = "application/octet-stream",
256         StorageClass storageClass = StorageClass.STANDARD, 
257         SysTime expires = SysTime.init,
258         size_t chunkSize = 512*1024,
259         size_t partSize = 5*1024*1024,
260         )
261     {
262         import std.array: appender, uninitializedArray;
263         import std.algorithm.comparison: min;
264         logDebug("multipartUpload for %s ...", resource);
265         enforce(partSize >= 5 * 1024 * 1024, "multipartUpload: minimal allowed part size is 5 MB.");
266         auto id = startMultipartUpload(resource, headers, contentType, storageClass, expires);
267         scope(failure)
268         {
269             logWarn("aborting multipart upload for resource=%s, uploadId=%s", resource, id);
270             try
271             {
272                 abortMultipartUpload(resource, id);
273             }
274             catch(Exception e)
275             {
276                 logWarn(e.msg);
277             }
278         }
279 
280         auto buf = uninitializedArray!(ubyte[])(partSize);
281         auto etags = appender!(Tuple!(string, size_t)[]);
282 
283         size_t least = input.leastSize;
284         for(size_t part = 1;;part++)
285         {
286             size_t length;
287             do
288             {
289                 auto newLength = least + length;
290                 if(newLength > buf.length)
291                     newLength = buf.length;
292                 input.read(buf[length .. newLength]);
293                 length = newLength;
294                 least = input.leastSize;
295             }
296             while(least && length < buf.length);
297             logDebug("buf.length = %s", buf.length);
298             logDebug("least = %s", least);
299             logDebug("multipartUpload: sending %s bytes for part %s ...", length, part);
300             auto etag = uploadPart(resource, id, part, createMemoryStream(buf[0 .. length], false), contentType, chunkSize);
301             etags.put(tuple(etag, part));
302             if(least == 0)
303                 break;
304         }
305         enforce(etags.data, "At least one part should be uploaded.");
306         completeMultipartUpload(resource, id, etags.data);
307     }
308 
309     string uploadPart(
310         string resource,
311         string id,
312         size_t part,
313         RandomAccessStream input,
314         string contentType = "application/octet-stream",
315         size_t chunkSize = 512*1024,
316         )
317     {
318         string[string] queryParameters = [
319             "partNumber": part.to!string,
320             "uploadId": id,
321         ];
322         InetHeaderMap headers;
323         headers["Content-Type"] = contentType;
324         logDebug("uploadPart: doUpload ...");
325         auto httpResp = doUpload(HTTPMethod.PUT, resource, queryParameters, headers, null, input, chunkSize);
326         logDebug("uploadPart: doUpload finished.");
327         httpResp.dropBody();
328         auto etag = httpResp.headers["ETag"];
329         httpResp.destroy();
330         logDebug("uploadPart: finished.");
331         return etag;
332     }
333 
334     string startMultipartUpload(
335         string resource,
336         InetHeaderMap headers = InetHeaderMap.init,
337         string contentType = "application/octet-stream", 
338         StorageClass storageClass = StorageClass.STANDARD,
339         SysTime expires = SysTime.init,
340         )
341     {
342         headers["Content-Type"] = contentType;
343         headers["x-amz-storage-class"] = storageClass.to!string;
344         string[] signedHeaders = ["x-amz-storage-class"];
345         if(expires != SysTime.init)
346         {
347             expires.fracSecs = expires.fracSecs.init;
348             headers["Expires"] = expires.toISOString; // HTTP format is different. So, we need to check if it is works.
349         }
350         auto httpResp = doRequest(HTTPMethod.POST, resource, ["uploads":null], headers);
351         scope(exit)
352         {
353             httpResp.dropBody();
354             httpResp.destroy();
355         }
356         auto document = readXML(httpResp);
357         auto id = document.parseXPath("/InitiateMultipartUploadResult/UploadId")[0].getCData;
358         return id;
359     }
360 
361     void completeMultipartUpload(
362         string resource,
363         string id,
364         in Tuple!(string, size_t)[] parts,
365         InetHeaderMap headers = InetHeaderMap.init,
366         )
367     {
368         import std.format;
369         import std.array: appender;
370         auto app = appender!(char[]);
371         app.put(`<CompleteMultipartUpload>`);
372         FormatSpec!char fmt;
373         foreach(ref part; parts)
374         {
375             app.put(`<Part><PartNumber>`);
376             app.formatValue(part[1], fmt);
377             app.put(`</PartNumber><ETag>`);
378             app.put(part[0]);
379             app.put(`</ETag></Part>`);
380         }
381         app.put(`</CompleteMultipartUpload>`);
382         auto httpResp = doRequest(HTTPMethod.POST, resource, ["uploadId":id], headers, cast(ubyte[])app.data);
383         httpResp.dropBody();
384         httpResp.destroy();
385     }
386 
387     void abortMultipartUpload(string resource, string id)
388     {
389         auto httpResp = doRequest(HTTPMethod.DELETE, resource, ["uploadId":id], InetHeaderMap.init);
390         httpResp.dropBody();
391         httpResp.destroy();
392     }
393 
394     void info(string resource, scope void delegate(scope HTTPClientResponse) del,
395                 string[string] queryParameters = null, InetHeaderMap headers = InetHeaderMap.init)
396     {
397         auto httpResp = doRequest(HTTPMethod.HEAD, resource, queryParameters, headers);
398         scope(exit)
399         {
400             httpResp.dropBody();
401             httpResp.destroy();
402         }
403         del(httpResp);
404     }
405 
406     void download(string resource, scope void delegate(scope HTTPClientResponse) del,
407                 string[string] queryParameters = null, InetHeaderMap headers = InetHeaderMap.init)
408     {
409         auto httpResp = doRequest(HTTPMethod.GET, resource, queryParameters, headers);
410         scope(exit)
411         {
412             httpResp.dropBody();
413             httpResp.destroy();
414         }
415         del(httpResp);
416     }
417 
418     /++
419     Returns:
420         Response headers list, which has type  DictionaryList!(string,false,12L,false)
421     +/
422     auto download(string resource, scope void delegate(scope InputStreamProxy) del,
423                 string[string] queryParameters = null, InetHeaderMap headers = InetHeaderMap.init)
424     {
425         typeof(HTTPClientResponse.headers) ret;
426         download(resource, (scope HTTPClientResponse resp) {
427             ret = resp.headers;
428             del(resp.bodyReader);
429         }, queryParameters, headers);
430         return ret;
431     }
432 
433     /// ditto
434     auto download(OutputStream)(string resource, scope OutputStream stream,
435                 string[string] queryParameters = null, InetHeaderMap headers = InetHeaderMap.init)
436     {
437         return download(resource, (scope InputStreamProxy input) { input.pipe(stream); }, queryParameters, headers);
438     }
439 
440     /// ditto
441     auto download(string resource, string saveTo,
442                 string[string] queryParameters = null, InetHeaderMap headers = InetHeaderMap.init)
443     {
444         auto file = openFile(saveTo, FileMode.createTrunc);
445         scope(exit)
446             file.close();
447         return download(resource, file, queryParameters, headers);
448     }
449 }