1 module vibe.aws.s3; 2 3 import vibe.d; 4 import vibe.core.stream; 5 6 import vibe.aws.aws; 7 import vibe.aws.credentials; 8 import vibe.aws.sigv4; 9 10 import std.typecons: Tuple, tuple; 11 12 enum StorageClass: string 13 { 14 STANDARD = "STANDARD", 15 REDUCED_REDUNDANCY = "REDUCED_REDUNDANCY", 16 GLACIER = "GLACIER" 17 } 18 19 struct BucketListResult 20 { 21 static struct S3Resource 22 { 23 static struct Owner 24 { 25 string id; 26 string displayName; 27 } 28 29 string key; 30 string lastModfied; 31 string etag; 32 ulong size; 33 Owner owner; 34 StorageClass storageClass; 35 } 36 37 string name; 38 string prefix; 39 string marker; 40 string nextMarker; 41 S3Resource[] resources; 42 string[] commonPrefixes; 43 uint maxKeys; 44 bool isTruncated; 45 } 46 47 auto listFilesRecursive(S3 client, string path = null) 48 { 49 if(!path.empty && !path.endsWith("/")) 50 path ~= "/"; 51 return S3Resources(client, null, path); 52 } 53 54 auto listFiles(S3 client, string path = null) 55 { 56 if(!path.empty && !path.endsWith("/")) 57 path ~= "/"; 58 return S3Resources(client, "/", path); 59 } 60 61 auto listFolders(S3 client, string path = null) 62 { 63 if(!path.empty && !path.endsWith("/")) 64 path ~= "/"; 65 return S3Prefixes(client, "/", path); 66 } 67 68 struct S3Resources 69 { 70 import std.range.primitives; 71 mixin _S3Common; 72 73 auto front() @property 74 { 75 assert(!empty); 76 return res.resources.front; 77 } 78 79 auto empty() const @property 80 { 81 return res.resources.empty; 82 } 83 84 auto popFront() 85 { 86 assert(!empty); 87 res.resources.popFront; 88 if(empty && res.isTruncated) 89 { 90 next; 91 } 92 } 93 94 } 95 96 struct S3Prefixes 97 { 98 import std.range.primitives; 99 mixin _S3Common; 100 101 auto front() @property 102 { 103 assert(!empty); 104 return res.commonPrefixes.front; 105 } 106 107 auto empty() const @property 108 { 109 return res.commonPrefixes.empty; 110 } 111 112 auto popFront() 113 { 114 assert(!empty); 115 res.commonPrefixes.popFront; 116 if(empty && res.isTruncated) 117 { 118 next; 119 } 120 } 121 } 122 123 private mixin template _S3Common() 124 { 125 private S3 client; 126 private BucketListResult res; 127 private string delimiter; 128 private string prefix; 129 private uint maxKeys; 130 131 private void next() 132 { 133 res = client.list(delimiter, prefix, res.nextMarker, maxKeys); 134 } 135 136 @disable this(); 137 138 this(S3 client, string delimiter, string prefix, uint maxKeys = 0) 139 { 140 this.client = client; 141 this.delimiter = delimiter; 142 this.prefix = prefix; 143 this.maxKeys = maxKeys; 144 next(); 145 } 146 147 auto save() @property 148 { 149 return this; 150 } 151 } 152 153 class S3 : RESTClient 154 { 155 private string bucket; 156 157 this(string bucket, string region, AWSCredentialSource credsSource, ClientConfiguration config = ClientConfiguration()) 158 { 159 this.bucket = bucket; 160 enforce(region.length, "AWS region should be defined."); 161 super(bucket ~ ".s3-" ~ region ~ ".amazonaws.com", region, "s3", credsSource, config); 162 } 163 164 auto list(string delimiter = null, string prefix = null, string marker = null, uint maxKeys = 0) 165 { 166 assert(maxKeys <= 1000); 167 168 import memutils.all; 169 import std.stdio; 170 import std.conv; 171 172 InetHeaderMap headers; 173 string[string] queryParameters; 174 175 if (delimiter !is null) 176 queryParameters["delimiter"] = delimiter; 177 178 if (prefix !is null) 179 queryParameters["prefix"] = prefix; 180 181 if (marker !is null) 182 queryParameters["marker"] = marker; 183 184 if (maxKeys) 185 queryParameters["max-keys"] = maxKeys.to!string; 186 187 auto resp = doRequest(HTTPMethod.GET, "/", queryParameters, headers); 188 auto response = readXML(resp); 189 resp.dropBody(); 190 resp.destroy(); 191 192 BucketListResult result; 193 result.name = response.parseXPath("/ListBucketResult/Name")[0].getCData; 194 result.prefix = response.parseXPath("/ListBucketResult/Prefix")[0].getCData; 195 result.marker = response.parseXPath("/ListBucketResult/Marker")[0].getCData; 196 result.maxKeys = response.parseXPath("/ListBucketResult/MaxKeys")[0].getCData.to!uint; 197 result.isTruncated = response.parseXPath("/ListBucketResult/IsTruncated")[0].getCData.toLower.to!bool; 198 199 if (result.isTruncated) 200 result.nextMarker = response.parseXPath("/ListBucketResult/NextMarker")[0].getCData; 201 202 auto entries = response.parseXPath("/ListBucketResult/Contents"); 203 204 result.resources.reserve = 1000; 205 foreach(node; entries) 206 { 207 BucketListResult.S3Resource entry; 208 BucketListResult.S3Resource.Owner owner; 209 210 entry.key = node.parseXPath("Key")[0].getCData; 211 entry.lastModfied = node.parseXPath("LastModified")[0].getCData; 212 entry.etag = node.parseXPath("ETag")[0].getCData; 213 entry.size = node.parseXPath("Size")[0].getCData.to!ulong; 214 import std.conv; 215 entry.storageClass = node.parseXPath("StorageClass")[0].getCData.to!StorageClass; 216 217 result.resources.assumeSafeAppend ~= entry; 218 } 219 result.resources.reserve = result.resources.length; 220 221 result.commonPrefixes.reserve = 1000; 222 entries = response.parseXPath("/ListBucketResult/CommonPrefixes/Prefix"); 223 foreach(node; entries) 224 result.commonPrefixes.assumeSafeAppend ~= node.getCData; 225 result.commonPrefixes.reserve = result.commonPrefixes.length; 226 227 return result; 228 } 229 230 void upload( 231 string resource, 232 RandomAccessStream input, 233 string contentType = "application/octet-stream", 234 StorageClass storageClass = StorageClass.STANDARD, 235 size_t chunkSize = 512*1024, 236 ) 237 { 238 InetHeaderMap headers; 239 headers["Content-Type"] = contentType; 240 headers["x-amz-storage-class"] = storageClass.to!string; 241 string[] signedHeaders = ["x-amz-storage-class"]; 242 auto httpResp = doUpload(HTTPMethod.PUT, 243 resource, null, headers, signedHeaders, input, chunkSize); 244 httpResp.dropBody(); 245 httpResp.destroy(); 246 } 247 248 /++ 249 On_failure: aborts multipart upload. 250 +/ 251 void multipartUpload( 252 string resource, 253 scope InputStream input, 254 InetHeaderMap headers = InetHeaderMap.init, 255 string contentType = "application/octet-stream", 256 StorageClass storageClass = StorageClass.STANDARD, 257 SysTime expires = SysTime.init, 258 size_t chunkSize = 512*1024, 259 size_t partSize = 5*1024*1024, 260 ) 261 { 262 import std.array: appender, uninitializedArray; 263 import std.algorithm.comparison: min; 264 logDebug("multipartUpload for %s ...", resource); 265 enforce(partSize >= 5 * 1024 * 1024, "multipartUpload: minimal allowed part size is 5 MB."); 266 auto id = startMultipartUpload(resource, headers, contentType, storageClass, expires); 267 scope(failure) 268 { 269 logWarn("aborting multipart upload for resource=%s, uploadId=%s", resource, id); 270 try 271 { 272 abortMultipartUpload(resource, id); 273 } 274 catch(Exception e) 275 { 276 logWarn(e.msg); 277 } 278 } 279 280 auto buf = uninitializedArray!(ubyte[])(partSize); 281 auto etags = appender!(Tuple!(string, size_t)[]); 282 283 size_t least = input.leastSize; 284 for(size_t part = 1;;part++) 285 { 286 size_t length; 287 do 288 { 289 auto newLength = least + length; 290 if(newLength > buf.length) 291 newLength = buf.length; 292 input.read(buf[length .. newLength]); 293 length = newLength; 294 least = input.leastSize; 295 } 296 while(least && length < buf.length); 297 logDebug("buf.length = %s", buf.length); 298 logDebug("least = %s", least); 299 logDebug("multipartUpload: sending %s bytes for part %s ...", length, part); 300 auto etag = uploadPart(resource, id, part, createMemoryStream(buf[0 .. length], false), contentType, chunkSize); 301 etags.put(tuple(etag, part)); 302 if(least == 0) 303 break; 304 } 305 enforce(etags.data, "At least one part should be uploaded."); 306 completeMultipartUpload(resource, id, etags.data); 307 } 308 309 string uploadPart( 310 string resource, 311 string id, 312 size_t part, 313 RandomAccessStream input, 314 string contentType = "application/octet-stream", 315 size_t chunkSize = 512*1024, 316 ) 317 { 318 string[string] queryParameters = [ 319 "partNumber": part.to!string, 320 "uploadId": id, 321 ]; 322 InetHeaderMap headers; 323 headers["Content-Type"] = contentType; 324 logDebug("uploadPart: doUpload ..."); 325 auto httpResp = doUpload(HTTPMethod.PUT, resource, queryParameters, headers, null, input, chunkSize); 326 logDebug("uploadPart: doUpload finished."); 327 httpResp.dropBody(); 328 auto etag = httpResp.headers["ETag"]; 329 httpResp.destroy(); 330 logDebug("uploadPart: finished."); 331 return etag; 332 } 333 334 string startMultipartUpload( 335 string resource, 336 InetHeaderMap headers = InetHeaderMap.init, 337 string contentType = "application/octet-stream", 338 StorageClass storageClass = StorageClass.STANDARD, 339 SysTime expires = SysTime.init, 340 ) 341 { 342 headers["Content-Type"] = contentType; 343 headers["x-amz-storage-class"] = storageClass.to!string; 344 string[] signedHeaders = ["x-amz-storage-class"]; 345 if(expires != SysTime.init) 346 { 347 expires.fracSecs = expires.fracSecs.init; 348 headers["Expires"] = expires.toISOString; // HTTP format is different. So, we need to check if it is works. 349 } 350 auto httpResp = doRequest(HTTPMethod.POST, resource, ["uploads":null], headers); 351 scope(exit) 352 { 353 httpResp.dropBody(); 354 httpResp.destroy(); 355 } 356 auto document = readXML(httpResp); 357 auto id = document.parseXPath("/InitiateMultipartUploadResult/UploadId")[0].getCData; 358 return id; 359 } 360 361 void completeMultipartUpload( 362 string resource, 363 string id, 364 in Tuple!(string, size_t)[] parts, 365 InetHeaderMap headers = InetHeaderMap.init, 366 ) 367 { 368 import std.format; 369 import std.array: appender; 370 auto app = appender!(char[]); 371 app.put(`<CompleteMultipartUpload>`); 372 FormatSpec!char fmt; 373 foreach(ref part; parts) 374 { 375 app.put(`<Part><PartNumber>`); 376 app.formatValue(part[1], fmt); 377 app.put(`</PartNumber><ETag>`); 378 app.put(part[0]); 379 app.put(`</ETag></Part>`); 380 } 381 app.put(`</CompleteMultipartUpload>`); 382 auto httpResp = doRequest(HTTPMethod.POST, resource, ["uploadId":id], headers, cast(ubyte[])app.data); 383 httpResp.dropBody(); 384 httpResp.destroy(); 385 } 386 387 void abortMultipartUpload(string resource, string id) 388 { 389 auto httpResp = doRequest(HTTPMethod.DELETE, resource, ["uploadId":id], InetHeaderMap.init); 390 httpResp.dropBody(); 391 httpResp.destroy(); 392 } 393 394 void info(string resource, scope void delegate(scope HTTPClientResponse) del, 395 string[string] queryParameters = null, InetHeaderMap headers = InetHeaderMap.init) 396 { 397 auto httpResp = doRequest(HTTPMethod.HEAD, resource, queryParameters, headers); 398 scope(exit) 399 { 400 httpResp.dropBody(); 401 httpResp.destroy(); 402 } 403 del(httpResp); 404 } 405 406 void download(string resource, scope void delegate(scope HTTPClientResponse) del, 407 string[string] queryParameters = null, InetHeaderMap headers = InetHeaderMap.init) 408 { 409 auto httpResp = doRequest(HTTPMethod.GET, resource, queryParameters, headers); 410 scope(exit) 411 { 412 httpResp.dropBody(); 413 httpResp.destroy(); 414 } 415 del(httpResp); 416 } 417 418 /++ 419 Returns: 420 Response headers list, which has type DictionaryList!(string,false,12L,false) 421 +/ 422 auto download(string resource, scope void delegate(scope InputStreamProxy) del, 423 string[string] queryParameters = null, InetHeaderMap headers = InetHeaderMap.init) 424 { 425 typeof(HTTPClientResponse.headers) ret; 426 download(resource, (scope HTTPClientResponse resp) { 427 ret = resp.headers; 428 del(resp.bodyReader); 429 }, queryParameters, headers); 430 return ret; 431 } 432 433 /// ditto 434 auto download(OutputStream)(string resource, scope OutputStream stream, 435 string[string] queryParameters = null, InetHeaderMap headers = InetHeaderMap.init) 436 { 437 return download(resource, (scope InputStreamProxy input) { input.pipe(stream); }, queryParameters, headers); 438 } 439 440 /// ditto 441 auto download(string resource, string saveTo, 442 string[string] queryParameters = null, InetHeaderMap headers = InetHeaderMap.init) 443 { 444 auto file = openFile(saveTo, FileMode.createTrunc); 445 scope(exit) 446 file.close(); 447 return download(resource, file, queryParameters, headers); 448 } 449 }