1 /**
2   Vibe-based AWS client
3  */
4 
5 module vibe.aws.aws;
6 
7 import std.algorithm;
8 import std.datetime;
9 import std.random;
10 import std.range;
11 import std.stdio;
12 import std.string;
13 import std.conv;
14 
15 import vibe.core.stream;
16 import vibe.core.core;
17 import vibe.core.log;
18 import vibe.data.json;
19 import vibe.http.client;
20 import vibe.inet.message;
21 import vibe.http.common;
22 
23 import std.digest.sha;
24 import vibe.aws.sigv4;
25 import std.math;
26 
27 import memutils.all;
28 import kxml.xml;
29 
30 public import vibe.aws.credentials;
31 
32 class AWSException : Exception
33 {
34     immutable string type;
35     immutable bool retriable;
36 
37     this(string type, bool retriable, string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
38     {
39         super(type ~ ": " ~ message, file, line, next);
40         this.type = type;
41         this.retriable = retriable;
42     }
43 
44     /**
45       Returns the 'ThrottlingException' from 'com.amazon.coral.service#ThrottlingException'
46      */
47     @property string simpleType() 
48     {
49         auto h = type.indexOf('#');
50         if (h == -1) return type;
51         return type[h+1..$];
52     }
53 }
54 
55 /**
56   Configuraton for AWS clients
57  */
58 struct ClientConfiguration
59 {
60     uint maxErrorRetry = 3;
61 }
62 
63 /**
64   Thrown when the signature/authorization information is wrong
65  */
66 class AuthorizationException : AWSException
67 {
68     this(string type, string message, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
69     {
70         super(type, false, message, file, line, next);
71     }
72 }
73 
74 struct ExponentialBackoff
75 {
76     immutable uint maxRetries;
77     uint tries = 0;
78     uint maxSleepMs = 10;
79 
80     this(uint maxRetries)
81     {
82         this.maxRetries = maxRetries;
83     }
84 
85     @property bool canRetry()
86     {
87         return tries < maxRetries;
88     }
89 
90     @property bool finished()
91     {
92         return tries >= maxRetries + 1;
93     }
94 
95     void inc()
96     {
97         tries++;
98         maxSleepMs *= 2;
99     }
100 
101     void sleep()
102     {
103         vibe.core.core.sleep(uniform!("[]")(1, maxSleepMs).msecs);
104     }
105 
106     int opApply(scope int delegate(uint) attempt)
107     {
108         int result = 0;
109         for (; !finished; inc())
110         {
111             try
112             {
113                 result = attempt(maxRetries - tries);
114                 if (result)
115                     return result;
116             }
117             catch (AWSException e)
118             {
119                 logWarn(typeid(e).name ~ " occurred at " ~ e.file ~ ":" ~ e.line.to!string ~ " : " ~ e.msg);
120                 // Retry if possible and retriable, otherwise give up.
121                 if (!canRetry || !e.retriable) 
122                     throw e;
123             }
124             catch (Exception e) //ssl errors from ssl.d
125             {
126                 logWarn(typeid(e).name ~ " occurred at " ~ e.file ~ ":" ~ e.line.to!string ~ " : " ~ e.msg);
127                 if (!canRetry)
128                     throw e;
129             }
130             sleep();
131         }
132         return result;
133     }
134 }
135 
136 abstract class RESTClient {
137     immutable string endpoint;
138     immutable string region;
139     immutable string service;
140 
141     private AWSCredentialSource m_credsSource;
142     private ClientConfiguration m_config;
143 
144     this(string endpoint, string region, string service, AWSCredentialSource credsSource, ClientConfiguration config=ClientConfiguration()) 
145     {
146         this.region = region;
147         this.endpoint = endpoint;
148         this.service = service;
149         this.m_credsSource = credsSource;
150         this.m_config = config;
151     }
152 
153     private static string buildQueryParameterString(string[string] queryParameters)
154     {
155         import vibe.textfilter.urlencode;
156 
157         auto stringBuilder = appender!string;
158         bool firstParameter = true;
159         foreach(parameter, value; queryParameters)
160         {
161             if (firstParameter)
162                 firstParameter = false;
163             else
164                 stringBuilder.put("&");
165 
166             stringBuilder.put(urlEncode(parameter));
167             if(value)
168             {
169                 stringBuilder.put("=");
170                 stringBuilder.put(urlEncode(value));
171             }
172         }
173         return stringBuilder.data;
174     }
175 
176     HTTPClientResponse doRequest(HTTPMethod method, string resource, string[string] queryParameters, in InetHeaderMap headers, in ubyte[] reqBody = null)
177     {
178         if (!resource.startsWith("/"))
179             resource = "/" ~ resource;
180 
181         //Initialize credentials
182         auto credScope = region ~ "/" ~ service;
183         auto creds = m_credsSource.credentials(credScope);
184 
185         auto queryString = buildQueryParameterString(queryParameters);
186 
187         auto retries = ExponentialBackoff(m_config.maxErrorRetry);
188         foreach(triesLeft; retries)
189         {
190             HTTPClientResponse resp;
191             scope(failure) 
192                 if (resp)
193                 {
194                     resp.dropBody();
195                     resp.destroy();
196                 }
197             auto url = "https://" ~ endpoint ~ resource ~ "?" ~ queryString;
198             resp = requestHTTP(url, (scope HTTPClientRequest req) {
199                 req.method = method;
200                 
201                 foreach(key, value; headers)
202                     req.headers[key] = value;
203 
204                 req.headers["host"] = endpoint;
205                 auto timeString = currentTimeString();
206                 req.headers["x-amz-date"] = timeString;
207                 req.headers["x-amz-content-sha256"] = sha256Of(reqBody).toHexString().toLower();
208                 if (creds.sessionToken && !creds.sessionToken.empty)
209                     req.headers["x-amz-security-token"] = creds.sessionToken;
210                 signRequest(req, queryParameters, reqBody, creds, timeString, region, service);
211                 if (reqBody)
212                     req.writeBody(reqBody);
213             });
214             checkForError(resp);
215             return resp;
216         }
217         assert(0);
218     }
219 
220     HTTPClientResponse doUpload(HTTPMethod method, string resource, string[string] queryParameters,
221                                 in InetHeaderMap headers, in string[] additionalSignedHeaders,
222                                 scope RandomAccessStream payload, ulong blockSize = 512*1024)
223     {
224         auto retries = ExponentialBackoff(m_config.maxErrorRetry);
225         foreach(triesLeft; retries)
226         {
227             payload.seek(0);
228             return doUpload(method, resource, queryParameters, headers, additionalSignedHeaders,
229                             payload, payload.size, blockSize);
230         }
231         assert(0);
232     }
233 
234     HTTPClientResponse doUpload(HTTPMethod method, string resource, string[string] queryParameters,
235                                 in InetHeaderMap headers, in string[] additionalSignedHeaders,
236                                 scope InputStream payload, ulong payloadSize, ulong blockSize = 512*1024)
237     {
238         //Calculate the body size upfront for the "Content-Length" header
239         logDebug("doUpload for resource %s", resource);
240         auto base16 = (ulong x) => ceil(log2(x)/4).to!ulong;
241         enum ulong signatureSize = ";chunk-signature=".length + 64;
242         immutable ulong numFullSizeBlocks = payloadSize / blockSize;
243         immutable ulong lastBlockSize = payloadSize % blockSize;
244         
245         immutable ulong bodySize =  numFullSizeBlocks * (base16(blockSize)  + signatureSize + 4 + blockSize) //Full-Sized blocks (4 = 2*"\r\n")
246                                  + (lastBlockSize  ? (base16(lastBlockSize) + signatureSize + 4 + lastBlockSize) : 0) //Part-Sized last block
247                                  + (1 + signatureSize + 4); //Finishing 0-sized block
248 
249 
250         if (!resource.startsWith("/"))
251             resource = "/" ~ resource;
252 
253         //Initialize credentials
254         auto credScope = region ~ "/" ~ service;
255         auto creds = m_credsSource.credentials(credScope);
256 
257         HTTPClientResponse resp;
258         scope(failure) 
259             if (resp)
260             {
261                 resp.dropBody();
262                 resp.destroy();
263             }
264 
265         auto url = "https://" ~ endpoint ~ resource;
266         if (queryParameters !is null)
267         {
268             url ~= "?" ~ buildQueryParameterString(queryParameters);
269         }
270 
271         resp = requestHTTP(url, (scope HTTPClientRequest req) {
272             req.method = method;
273             
274             //Initialize the headers
275             foreach(key, value; headers)
276                 req.headers[key] = value;
277 
278             //Since we might be doing retries, update the date
279             auto isoTimeString = currentTimeString();
280             req.headers["x-amz-date"] = isoTimeString;
281             auto date = isoTimeString.dateFromISOString;
282             auto time = isoTimeString.timeFromISOString;
283             
284             //Adjust the headers necessary for a chunked transmission
285             string newEncoding = "aws-chunked";
286             if ("Content-Encoding" in headers)
287                 newEncoding ~= "," ~headers["Content-Encoding"];
288             
289             if ("Content-Type" in headers)
290                 req.contentType = headers["Content-Type"];
291             else
292                 req.contentType = "application/octet-stream";
293             
294             req.headers["Content-Length"] = bodySize.to!string;
295             req.headers["Content-Encoding"] = newEncoding;
296             req.headers["x-amz-content-sha256"] = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD";
297             req.headers["x-amz-decoded-content-length"] = payloadSize.to!string;
298 
299             //Seems not to be working properly (S3 returns error if "Content-Length" is not used)
300 //                req.headers["Transfer-Encoding"] = "chunked";
301 //                if ("Content-Length" in headers)
302 //                    req.headers.remove("Content-Length");
303 
304             auto canonicalRequest = CanonicalRequest(
305                     method.to!string,
306                     resource,
307                     queryParameters,
308                     [
309                         "host":                         req.headers["host"],
310                         "content-encoding":             req.headers["Content-Encoding"],
311                         "content-length":               req.headers["Content-Length"],
312                         "x-amz-content-sha256":         req.headers["x-amz-content-sha256"],
313                         "x-amz-date":                   req.headers["x-amz-date"],
314                         "x-amz-decoded-content-length": req.headers["x-amz-decoded-content-length"],
315 //                        "transfer-encoding":            req.headers["Transfer-Encoding"],
316                     ],
317                     null
318                 );
319 
320             foreach (key; additionalSignedHeaders)
321                 canonicalRequest.headers[key] = req.headers[key];
322 
323             //Calculate the seed signature
324             auto signableRequest = SignableRequest(date, time, region, service, canonicalRequest);
325             auto key = signingKey(creds.accessKeySecret, date, region, service);
326             auto binarySignature = key.sign(cast(ubyte[])signableRequest.signableStringForStream);
327 
328             auto credScope = date ~ "/" ~ region ~ "/" ~ service;
329             auto authHeader = createSignatureHeader(creds.accessKeyID, credScope, canonicalRequest.headers, binarySignature);
330             req.headers["authorization"] = authHeader;
331 
332             //Write the data in chunks to the stream
333             auto outputStream = createChunkedOutputStream(req.bodyWriter);
334             outputStream.maxBufferSize = blockSize;
335 //            auto outputStream = cast(ChunkedOutputStream) req.bodyWriter;
336 //            enforce(outputStream !is null);
337 
338             string signature = binarySignature.toHexString().toLower();
339             outputStream.chunkExtensionCallback = (in ubyte[] data) @safe
340             {
341                 logDebug("doUpload: chunkExtensionCallback data is %s bytes", data.length);
342                 auto chunk = SignableChunk(date, time, region, service, signature, hash(data));
343                 signature = key.sign(chunk.signableString.representation).toHexString().toLower();
344                 return "chunk-signature=" ~ signature;
345             };
346             logDebug("doUpload: write payload");
347             payload.pipe(outputStream);
348             logDebug("doUpload: finalize ... ");
349             outputStream.finalize;
350             logDebug("doUpload: finalized.");
351         });
352         checkForError(resp);
353         return resp;
354     }
355 
356     XmlNode readXML(HTTPClientResponse response)
357     {
358         auto stringBuilder = appender!string;
359         auto reader = response.bodyReader;
360 
361         auto buffer = ThreadMem.alloc!(ubyte[])(1024);
362         scope(exit)
363             ThreadMem.free(buffer);
364 
365         while(reader.leastSize > 0)
366         {
367             auto size = min(reader.leastSize,buffer.length);
368             auto bytes = buffer[0..size];
369             reader.read(bytes);
370             stringBuilder.put(bytes);
371         }
372         return readDocument(stringBuilder.data,true);
373     }
374 
375     void checkForError(HTTPClientResponse response, string file = __FILE__, size_t line = __LINE__, Throwable next = null)
376     {
377         if (response.statusCode < 400) 
378             return; // No error
379 
380         auto document = readXML(response);
381         auto code = document.parseXPath("/Error/Code")[0].getCData;
382         auto message = document.parseXPath("/Error/Message")[0].getCData;
383         logError(message);
384         throw makeException(code, response.statusCode / 100 == 5, message, file, line, next);
385     }
386 
387     AWSException makeException(string type, bool retriable, string message,
388         string file = __FILE__, size_t line = __LINE__, Throwable next = null)
389     {
390         if (type == "UnrecognizedClientException" 
391          || type == "InvalidSignatureException")
392             throw new AuthorizationException(type, message, file, line, next);
393         return new AWSException(type, retriable, message, file, line, next);
394     }
395 }
396 
397 class AWSClient {
398     protected static immutable exceptionPrefix = "com.amazon.coral.service#";
399 
400     immutable string endpoint;
401     immutable string region;
402     immutable string service;
403 
404     private AWSCredentialSource m_credsSource;
405     private ClientConfiguration m_config;
406 
407     this(string endpoint, string region, string service, AWSCredentialSource credsSource, ClientConfiguration config=ClientConfiguration()) 
408     {
409         this.region = region;
410         this.endpoint = endpoint;
411         this.service = service;
412         this.m_credsSource = credsSource;
413         this.m_config = config;
414     }
415 
416     AWSResponse doRequest(string operation, Json request)
417     {
418         auto backoff = ExponentialBackoff(m_config.maxErrorRetry);
419 
420         for (; !backoff.finished; backoff.inc())
421         {
422             auto credScope = region ~ "/" ~ service;
423             auto creds = m_credsSource.credentials(credScope);
424             HTTPClientResponse resp;
425             try
426             {
427                 // FIXME: Auto-retries for retriable errors
428                 // FIXME: Report credential errors and retry for failed credentials
429                  resp = requestHTTP("https://" ~ endpoint ~ "/", (scope req) {
430                     auto timeString = currentTimeString();
431                     auto jsonString = cast(ubyte[])request.toString();
432 
433                     req.method = HTTPMethod.POST;
434                     req.headers["x-amz-target"] = operation;
435                     req.headers["x-amz-date"] = currentTimeString();
436                     req.headers["host"] = endpoint;
437                     if (creds.sessionToken && !creds.sessionToken.empty)
438                         req.headers["x-amz-security-token"] = creds.sessionToken;
439                     req.contentType = "application/x-amz-json-1.1";
440                     signRequest(req, null, jsonString, creds, timeString, region, service);
441                     req.writeBody(jsonString);
442                 });
443 
444                 checkForError(resp);
445 
446                 return new AWSResponse(resp);
447             }
448             catch (AuthorizationException ex)
449             {
450                 logWarn(ex.msg);
451                 // Report credentials as invalid. Will retry if possible.
452                 m_credsSource.credentialsInvalid(credScope, creds, ex.msg);
453                 resp.dropBody();
454                 resp.destroy();
455                 if (!backoff.canRetry) throw ex;
456             }
457             catch (AWSException ex)
458             {
459                 logWarn(ex.msg);
460                 resp.dropBody();
461                 resp.destroy();
462                 // Retry if possible and retriable, otherwise give up.
463                 if (!backoff.canRetry || !ex.retriable) throw ex;
464             } 
465             catch (Throwable t) //ssl errors from ssl.d
466             {
467               if (!backoff.canRetry)
468               {
469                 vibe.core.log.logError("no retries left, failing request");
470                 throw(t);
471               }
472             }
473             backoff.sleep();
474         }
475         assert(0);
476     }
477 
478     void checkForError(HTTPClientResponse response)
479     {
480         if (response.statusCode < 400) return; // No error
481 
482         auto bod = response.readJson();
483 
484         throw makeException(bod["__type"].get!string, response.statusCode / 100 == 5, bod["message"].opt!string(""));
485     }
486 
487     AWSException makeException(string type, bool retriable, string message)
488     {
489         if (type == exceptionPrefix ~ "UnrecognizedClientException" || type == exceptionPrefix ~ "InvalidSignatureException")
490             throw new AuthorizationException(type, message);
491         return new AWSException(type, retriable, message);
492     }
493 }
494 
495 private auto currentTimeString()
496 {
497     auto t = Clock.currTime(UTC());
498     t.fracSecs = 0.seconds;
499     return t.toISOString();
500 }
501 
502 private void signRequest(HTTPClientRequest req, string[string] queryParameters,
503                          in ubyte[] requestBody, AWSCredentials creds, 
504                          string timeString, string region, string service)
505 {
506     auto dateString = dateFromISOString(timeString);
507     auto credScope = dateString ~ "/" ~ region ~ "/" ~ service;
508 
509     SignableRequest signRequest;
510     signRequest.dateString = dateString;
511     signRequest.timeStringUTC = timeFromISOString(timeString);
512     signRequest.region = region;
513     signRequest.service = service;
514     signRequest.canonicalRequest.method = req.method.to!string();
515 
516     auto pos = req.requestURL.indexOf("?");
517     if (pos < 0)
518         pos = req.requestURL.length;
519     signRequest.canonicalRequest.uri = req.requestURL[0..pos];
520 
521     signRequest.canonicalRequest.queryParameters = queryParameters;
522 
523     auto reqHeaders = req.headers.toRepresentation;
524     foreach (x; reqHeaders) {
525         signRequest.canonicalRequest.headers[x.key] = x.value;
526     }
527     signRequest.canonicalRequest.payload = requestBody;
528 
529     ubyte[] signKey = signingKey(creds.accessKeySecret, dateString, region, service);
530     ubyte[] stringToSign = cast(ubyte[])signableString(signRequest);
531     auto signature = sign(signKey, stringToSign);
532 
533     auto authHeader = createSignatureHeader(creds.accessKeyID, credScope, signRequest.canonicalRequest.headers, signature);
534     req.headers["authorization"] = authHeader;
535 }
536 
537 class AWSResponse
538 {
539   
540     private Json m_body;
541 
542     this(HTTPClientResponse response)
543     {
544         //m_response = response;
545         m_body = response.readJson();
546         response.dropBody();
547         response.destroy();
548     }
549     
550     override string toString()
551     {
552       return m_body.toString();
553     }
554 
555     @property Json responseBody() { return m_body; }
556 }
557