1 /** 2 Vibe-based AWS client 3 */ 4 5 module vibe.aws.aws; 6 7 import std.algorithm; 8 import std.datetime; 9 import std.random; 10 import std.range; 11 import std.stdio; 12 import std.string; 13 import std.conv; 14 15 import vibe.core.stream; 16 import vibe.core.core; 17 import vibe.core.log; 18 import vibe.data.json; 19 import vibe.http.client; 20 import vibe.inet.message; 21 import vibe.http.common; 22 23 import std.digest.sha; 24 import vibe.aws.sigv4; 25 import std.math; 26 27 import memutils.all; 28 import kxml.xml; 29 30 public import vibe.aws.credentials; 31 32 class AWSException : Exception 33 { 34 immutable string type; 35 immutable bool retriable; 36 37 this(string type, bool retriable, string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 38 { 39 super(type ~ ": " ~ message, file, line, next); 40 this.type = type; 41 this.retriable = retriable; 42 } 43 44 /** 45 Returns the 'ThrottlingException' from 'com.amazon.coral.service#ThrottlingException' 46 */ 47 @property string simpleType() 48 { 49 auto h = type.indexOf('#'); 50 if (h == -1) return type; 51 return type[h+1..$]; 52 } 53 } 54 55 /** 56 Configuraton for AWS clients 57 */ 58 struct ClientConfiguration 59 { 60 uint maxErrorRetry = 3; 61 } 62 63 /** 64 Thrown when the signature/authorization information is wrong 65 */ 66 class AuthorizationException : AWSException 67 { 68 this(string type, string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 69 { 70 super(type, false, message, file, line, next); 71 } 72 } 73 74 struct ExponentialBackoff 75 { 76 immutable uint maxRetries; 77 uint tries = 0; 78 uint maxSleepMs = 10; 79 80 this(uint maxRetries) 81 { 82 this.maxRetries = maxRetries; 83 } 84 85 @property bool canRetry() 86 { 87 return tries < maxRetries; 88 } 89 90 @property bool finished() 91 { 92 return tries >= maxRetries + 1; 93 } 94 95 void inc() 96 { 97 tries++; 98 maxSleepMs *= 2; 99 } 100 101 void sleep() 102 { 103 vibe.core.core.sleep(uniform!("[]")(1, maxSleepMs).msecs); 104 } 105 106 int opApply(scope int delegate(uint) attempt) 107 { 108 int result = 0; 109 for (; !finished; inc()) 110 { 111 try 112 { 113 result = attempt(maxRetries - tries); 114 if (result) 115 return result; 116 } 117 catch (AWSException e) 118 { 119 logWarn(typeid(e).name ~ " occurred at " ~ e.file ~ ":" ~ e.line.to!string ~ " : " ~ e.msg); 120 // Retry if possible and retriable, otherwise give up. 121 if (!canRetry || !e.retriable) 122 throw e; 123 } 124 catch (Exception e) //ssl errors from ssl.d 125 { 126 logWarn(typeid(e).name ~ " occurred at " ~ e.file ~ ":" ~ e.line.to!string ~ " : " ~ e.msg); 127 if (!canRetry) 128 throw e; 129 } 130 sleep(); 131 } 132 return result; 133 } 134 } 135 136 abstract class RESTClient { 137 immutable string endpoint; 138 immutable string region; 139 immutable string service; 140 141 private AWSCredentialSource m_credsSource; 142 private ClientConfiguration m_config; 143 144 this(string endpoint, string region, string service, AWSCredentialSource credsSource, ClientConfiguration config=ClientConfiguration()) 145 { 146 this.region = region; 147 this.endpoint = endpoint; 148 this.service = service; 149 this.m_credsSource = credsSource; 150 this.m_config = config; 151 } 152 153 private static string buildQueryParameterString(string[string] queryParameters) 154 { 155 import vibe.textfilter.urlencode; 156 157 auto stringBuilder = appender!string; 158 bool firstParameter = true; 159 foreach(parameter, value; queryParameters) 160 { 161 if (firstParameter) 162 firstParameter = false; 163 else 164 stringBuilder.put("&"); 165 166 stringBuilder.put(urlEncode(parameter)); 167 if(value) 168 { 169 stringBuilder.put("="); 170 stringBuilder.put(urlEncode(value)); 171 } 172 } 173 return stringBuilder.data; 174 } 175 176 HTTPClientResponse doRequest(HTTPMethod method, string resource, string[string] queryParameters, in InetHeaderMap headers, in ubyte[] reqBody = null) 177 { 178 if (!resource.startsWith("/")) 179 resource = "/" ~ resource; 180 181 //Initialize credentials 182 auto credScope = region ~ "/" ~ service; 183 auto creds = m_credsSource.credentials(credScope); 184 185 auto queryString = buildQueryParameterString(queryParameters); 186 187 auto retries = ExponentialBackoff(m_config.maxErrorRetry); 188 foreach(triesLeft; retries) 189 { 190 HTTPClientResponse resp; 191 scope(failure) 192 if (resp) 193 { 194 resp.dropBody(); 195 resp.destroy(); 196 } 197 auto url = "https://" ~ endpoint ~ resource ~ "?" ~ queryString; 198 resp = requestHTTP(url, (scope HTTPClientRequest req) { 199 req.method = method; 200 201 foreach(key, value; headers) 202 req.headers[key] = value; 203 204 req.headers["host"] = endpoint; 205 auto timeString = currentTimeString(); 206 req.headers["x-amz-date"] = timeString; 207 req.headers["x-amz-content-sha256"] = sha256Of(reqBody).toHexString().toLower(); 208 if (creds.sessionToken && !creds.sessionToken.empty) 209 req.headers["x-amz-security-token"] = creds.sessionToken; 210 signRequest(req, queryParameters, reqBody, creds, timeString, region, service); 211 if (reqBody) 212 req.writeBody(reqBody); 213 }); 214 checkForError(resp); 215 return resp; 216 } 217 assert(0); 218 } 219 220 HTTPClientResponse doUpload(HTTPMethod method, string resource, string[string] queryParameters, 221 in InetHeaderMap headers, in string[] additionalSignedHeaders, 222 scope RandomAccessStream payload, ulong blockSize = 512*1024) 223 { 224 auto retries = ExponentialBackoff(m_config.maxErrorRetry); 225 foreach(triesLeft; retries) 226 { 227 payload.seek(0); 228 return doUpload(method, resource, queryParameters, headers, additionalSignedHeaders, 229 payload, payload.size, blockSize); 230 } 231 assert(0); 232 } 233 234 HTTPClientResponse doUpload(HTTPMethod method, string resource, string[string] queryParameters, 235 in InetHeaderMap headers, in string[] additionalSignedHeaders, 236 scope InputStream payload, ulong payloadSize, ulong blockSize = 512*1024) 237 { 238 //Calculate the body size upfront for the "Content-Length" header 239 logDebug("doUpload for resource %s", resource); 240 auto base16 = (ulong x) => ceil(log2(x)/4).to!ulong; 241 enum ulong signatureSize = ";chunk-signature=".length + 64; 242 immutable ulong numFullSizeBlocks = payloadSize / blockSize; 243 immutable ulong lastBlockSize = payloadSize % blockSize; 244 245 immutable ulong bodySize = numFullSizeBlocks * (base16(blockSize) + signatureSize + 4 + blockSize) //Full-Sized blocks (4 = 2*"\r\n") 246 + (lastBlockSize ? (base16(lastBlockSize) + signatureSize + 4 + lastBlockSize) : 0) //Part-Sized last block 247 + (1 + signatureSize + 4); //Finishing 0-sized block 248 249 250 if (!resource.startsWith("/")) 251 resource = "/" ~ resource; 252 253 //Initialize credentials 254 auto credScope = region ~ "/" ~ service; 255 auto creds = m_credsSource.credentials(credScope); 256 257 HTTPClientResponse resp; 258 scope(failure) 259 if (resp) 260 { 261 resp.dropBody(); 262 resp.destroy(); 263 } 264 265 auto url = "https://" ~ endpoint ~ resource; 266 if (queryParameters !is null) 267 { 268 url ~= "?" ~ buildQueryParameterString(queryParameters); 269 } 270 271 resp = requestHTTP(url, (scope HTTPClientRequest req) { 272 req.method = method; 273 274 //Initialize the headers 275 foreach(key, value; headers) 276 req.headers[key] = value; 277 278 //Since we might be doing retries, update the date 279 auto isoTimeString = currentTimeString(); 280 req.headers["x-amz-date"] = isoTimeString; 281 auto date = isoTimeString.dateFromISOString; 282 auto time = isoTimeString.timeFromISOString; 283 284 //Adjust the headers necessary for a chunked transmission 285 string newEncoding = "aws-chunked"; 286 if ("Content-Encoding" in headers) 287 newEncoding ~= "," ~headers["Content-Encoding"]; 288 289 if ("Content-Type" in headers) 290 req.contentType = headers["Content-Type"]; 291 else 292 req.contentType = "application/octet-stream"; 293 294 req.headers["Content-Length"] = bodySize.to!string; 295 req.headers["Content-Encoding"] = newEncoding; 296 req.headers["x-amz-content-sha256"] = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD"; 297 req.headers["x-amz-decoded-content-length"] = payloadSize.to!string; 298 299 //Seems not to be working properly (S3 returns error if "Content-Length" is not used) 300 // req.headers["Transfer-Encoding"] = "chunked"; 301 // if ("Content-Length" in headers) 302 // req.headers.remove("Content-Length"); 303 304 auto canonicalRequest = CanonicalRequest( 305 method.to!string, 306 resource, 307 queryParameters, 308 [ 309 "host": req.headers["host"], 310 "content-encoding": req.headers["Content-Encoding"], 311 "content-length": req.headers["Content-Length"], 312 "x-amz-content-sha256": req.headers["x-amz-content-sha256"], 313 "x-amz-date": req.headers["x-amz-date"], 314 "x-amz-decoded-content-length": req.headers["x-amz-decoded-content-length"], 315 // "transfer-encoding": req.headers["Transfer-Encoding"], 316 ], 317 null 318 ); 319 320 foreach (key; additionalSignedHeaders) 321 canonicalRequest.headers[key] = req.headers[key]; 322 323 //Calculate the seed signature 324 auto signableRequest = SignableRequest(date, time, region, service, canonicalRequest); 325 auto key = signingKey(creds.accessKeySecret, date, region, service); 326 auto binarySignature = key.sign(cast(ubyte[])signableRequest.signableStringForStream); 327 328 auto credScope = date ~ "/" ~ region ~ "/" ~ service; 329 auto authHeader = createSignatureHeader(creds.accessKeyID, credScope, canonicalRequest.headers, binarySignature); 330 req.headers["authorization"] = authHeader; 331 332 //Write the data in chunks to the stream 333 auto outputStream = createChunkedOutputStream(req.bodyWriter); 334 outputStream.maxBufferSize = blockSize; 335 // auto outputStream = cast(ChunkedOutputStream) req.bodyWriter; 336 // enforce(outputStream !is null); 337 338 string signature = binarySignature.toHexString().toLower(); 339 outputStream.chunkExtensionCallback = (in ubyte[] data) @safe 340 { 341 logDebug("doUpload: chunkExtensionCallback data is %s bytes", data.length); 342 auto chunk = SignableChunk(date, time, region, service, signature, hash(data)); 343 signature = key.sign(chunk.signableString.representation).toHexString().toLower(); 344 return "chunk-signature=" ~ signature; 345 }; 346 logDebug("doUpload: write payload"); 347 payload.pipe(outputStream); 348 logDebug("doUpload: finalize ... "); 349 outputStream.finalize; 350 logDebug("doUpload: finalized."); 351 }); 352 checkForError(resp); 353 return resp; 354 } 355 356 XmlNode readXML(HTTPClientResponse response) 357 { 358 auto stringBuilder = appender!string; 359 auto reader = response.bodyReader; 360 361 auto buffer = ThreadMem.alloc!(ubyte[])(1024); 362 scope(exit) 363 ThreadMem.free(buffer); 364 365 while(reader.leastSize > 0) 366 { 367 auto size = min(reader.leastSize,buffer.length); 368 auto bytes = buffer[0..size]; 369 reader.read(bytes); 370 stringBuilder.put(bytes); 371 } 372 return readDocument(stringBuilder.data,true); 373 } 374 375 void checkForError(HTTPClientResponse response, string file = __FILE__, size_t line = __LINE__, Throwable next = null) 376 { 377 if (response.statusCode < 400) 378 return; // No error 379 380 auto document = readXML(response); 381 auto code = document.parseXPath("/Error/Code")[0].getCData; 382 auto message = document.parseXPath("/Error/Message")[0].getCData; 383 logError(message); 384 throw makeException(code, response.statusCode / 100 == 5, message, file, line, next); 385 } 386 387 AWSException makeException(string type, bool retriable, string message, 388 string file = __FILE__, size_t line = __LINE__, Throwable next = null) 389 { 390 if (type == "UnrecognizedClientException" 391 || type == "InvalidSignatureException") 392 throw new AuthorizationException(type, message, file, line, next); 393 return new AWSException(type, retriable, message, file, line, next); 394 } 395 } 396 397 class AWSClient { 398 protected static immutable exceptionPrefix = "com.amazon.coral.service#"; 399 400 immutable string endpoint; 401 immutable string region; 402 immutable string service; 403 404 private AWSCredentialSource m_credsSource; 405 private ClientConfiguration m_config; 406 407 this(string endpoint, string region, string service, AWSCredentialSource credsSource, ClientConfiguration config=ClientConfiguration()) 408 { 409 this.region = region; 410 this.endpoint = endpoint; 411 this.service = service; 412 this.m_credsSource = credsSource; 413 this.m_config = config; 414 } 415 416 AWSResponse doRequest(string operation, Json request) 417 { 418 auto backoff = ExponentialBackoff(m_config.maxErrorRetry); 419 420 for (; !backoff.finished; backoff.inc()) 421 { 422 auto credScope = region ~ "/" ~ service; 423 auto creds = m_credsSource.credentials(credScope); 424 HTTPClientResponse resp; 425 try 426 { 427 // FIXME: Auto-retries for retriable errors 428 // FIXME: Report credential errors and retry for failed credentials 429 resp = requestHTTP("https://" ~ endpoint ~ "/", (scope req) { 430 auto timeString = currentTimeString(); 431 auto jsonString = cast(ubyte[])request.toString(); 432 433 req.method = HTTPMethod.POST; 434 req.headers["x-amz-target"] = operation; 435 req.headers["x-amz-date"] = currentTimeString(); 436 req.headers["host"] = endpoint; 437 if (creds.sessionToken && !creds.sessionToken.empty) 438 req.headers["x-amz-security-token"] = creds.sessionToken; 439 req.contentType = "application/x-amz-json-1.1"; 440 signRequest(req, null, jsonString, creds, timeString, region, service); 441 req.writeBody(jsonString); 442 }); 443 444 checkForError(resp); 445 446 return new AWSResponse(resp); 447 } 448 catch (AuthorizationException ex) 449 { 450 logWarn(ex.msg); 451 // Report credentials as invalid. Will retry if possible. 452 m_credsSource.credentialsInvalid(credScope, creds, ex.msg); 453 resp.dropBody(); 454 resp.destroy(); 455 if (!backoff.canRetry) throw ex; 456 } 457 catch (AWSException ex) 458 { 459 logWarn(ex.msg); 460 resp.dropBody(); 461 resp.destroy(); 462 // Retry if possible and retriable, otherwise give up. 463 if (!backoff.canRetry || !ex.retriable) throw ex; 464 } 465 catch (Throwable t) //ssl errors from ssl.d 466 { 467 if (!backoff.canRetry) 468 { 469 vibe.core.log.logError("no retries left, failing request"); 470 throw(t); 471 } 472 } 473 backoff.sleep(); 474 } 475 assert(0); 476 } 477 478 void checkForError(HTTPClientResponse response) 479 { 480 if (response.statusCode < 400) return; // No error 481 482 auto bod = response.readJson(); 483 484 throw makeException(bod["__type"].get!string, response.statusCode / 100 == 5, bod["message"].opt!string("")); 485 } 486 487 AWSException makeException(string type, bool retriable, string message) 488 { 489 if (type == exceptionPrefix ~ "UnrecognizedClientException" || type == exceptionPrefix ~ "InvalidSignatureException") 490 throw new AuthorizationException(type, message); 491 return new AWSException(type, retriable, message); 492 } 493 } 494 495 private auto currentTimeString() 496 { 497 auto t = Clock.currTime(UTC()); 498 t.fracSecs = 0.seconds; 499 return t.toISOString(); 500 } 501 502 private void signRequest(HTTPClientRequest req, string[string] queryParameters, 503 in ubyte[] requestBody, AWSCredentials creds, 504 string timeString, string region, string service) 505 { 506 auto dateString = dateFromISOString(timeString); 507 auto credScope = dateString ~ "/" ~ region ~ "/" ~ service; 508 509 SignableRequest signRequest; 510 signRequest.dateString = dateString; 511 signRequest.timeStringUTC = timeFromISOString(timeString); 512 signRequest.region = region; 513 signRequest.service = service; 514 signRequest.canonicalRequest.method = req.method.to!string(); 515 516 auto pos = req.requestURL.indexOf("?"); 517 if (pos < 0) 518 pos = req.requestURL.length; 519 signRequest.canonicalRequest.uri = req.requestURL[0..pos]; 520 521 signRequest.canonicalRequest.queryParameters = queryParameters; 522 523 auto reqHeaders = req.headers.toRepresentation; 524 foreach (x; reqHeaders) { 525 signRequest.canonicalRequest.headers[x.key] = x.value; 526 } 527 signRequest.canonicalRequest.payload = requestBody; 528 529 ubyte[] signKey = signingKey(creds.accessKeySecret, dateString, region, service); 530 ubyte[] stringToSign = cast(ubyte[])signableString(signRequest); 531 auto signature = sign(signKey, stringToSign); 532 533 auto authHeader = createSignatureHeader(creds.accessKeyID, credScope, signRequest.canonicalRequest.headers, signature); 534 req.headers["authorization"] = authHeader; 535 } 536 537 class AWSResponse 538 { 539 540 private Json m_body; 541 542 this(HTTPClientResponse response) 543 { 544 //m_response = response; 545 m_body = response.readJson(); 546 response.dropBody(); 547 response.destroy(); 548 } 549 550 override string toString() 551 { 552 return m_body.toString(); 553 } 554 555 @property Json responseBody() { return m_body; } 556 } 557