1 module vibrato.api; 2 3 4 import core.thread; 5 import core.time; 6 7 import std.algorithm; 8 import std.array; 9 import std.base64; 10 import std.datetime; 11 import std.range; 12 import std.socket; 13 import std.string; 14 15 import vibe.core.core; 16 import vibe.core.sync; 17 import vibe.data.json; 18 import vibe.inet.url; 19 import vibe.http.client; 20 import vibe.stream.operations; 21 22 import vibrato.json; 23 24 25 class VibratoException : Throwable { 26 this(string msg, string file = __FILE__, size_t line = __LINE__) pure { 27 super(msg, file, line); 28 } 29 } 30 31 32 alias SendCallback = void delegate(); 33 alias ErrorCallback = void delegate(string error); 34 35 36 struct Settings { 37 string user; // librato.com user email 38 string token; // librato.com api token 39 string source; // this instance's source name - defaults to machine's hostname 40 string prefix; // metric name prefix 41 size_t intervalSecs = 10; // send/aggregate thread sleep cycle 42 SendCallback sendCallback; // this callback will be called from the sender thread right after waking up, and before aggregation 43 ErrorCallback errorCallback; // this callback will be called from the sender when a librato error occurs i.e. when the api is down 44 HTTPClientSettings httpSettings; // http client settings 45 string host; // librato.com's host 46 ushort port; // librato.com's port 47 string path; // librato.com's api path 48 } 49 50 51 private size_t hashOf(const(char)[] x) { 52 size_t hash = 5381; 53 foreach(i; 0..x.length) 54 hash = (hash * 33) ^ cast(size_t)(std.ascii.toLower(x.ptr[i])); 55 return hash; 56 } 57 58 59 enum GaugeFlags : ushort { 60 TimeSeconds = 1 << 0, // by default time is in msecs 61 NoAggregate = 1 << 1, // do not aggregate locally - send every sample and it's timestamp untouched 62 AggregateOnlyAvg = 1 << 2, // compute and send only the average 63 AggregateNoStdDev = 1 << 3, // compute and send everything (count, min, max, sum, ...), except sum_squares 64 Default = 0, 65 } 66 67 68 struct Annotation { 69 struct Link { 70 string label; 71 string rel; 72 string href; 73 } 74 75 string stream; 76 string title; 77 string description; 78 string source; 79 Link[] links; 80 SysTime start; 81 SysTime end; 82 } 83 84 85 void init(Settings settings) { 86 settings_ = settings; 87 88 if (settings_.host.empty) 89 settings_.host = defaultHostName; 90 if (settings_.path.empty) 91 settings_.path = defaultPath; 92 if (!settings_.port) 93 settings_.port = defaultPort; 94 if (!settings_.source) 95 settings_.source = Socket.hostName; 96 if (settings_.httpSettings is null) 97 settings_.httpSettings = new HTTPClientSettings; 98 if (settings_.errorCallback is null) 99 settings_.errorCallback = (string error) { 100 throw new Exception(format("Librato.com error: %s", error)); 101 }; 102 103 auth_ = "Basic " ~ cast(string)Base64.encode(cast(ubyte[])(settings_.user ~ ":" ~ settings_.token)); 104 105 metricsURL_ = settings_.path ~ "/metrics"; 106 annotationsURL_ = "https://" ~ settings_.host ~ "/" ~ settings_.path ~ "/annotations/"; 107 108 if (!settings_.prefix.empty) { 109 foreach(ref counter; hashCounters_) { 110 if (!counter.name.empty) 111 counter.prefixed = settings_.prefix.empty ? counter.name : (settings_.prefix ~ counter.name); 112 } 113 foreach(ref gauge; hashGauges_) { 114 if (!gauge.name.empty) 115 gauge.prefixed = settings_.prefix.empty ? gauge.name : (settings_.prefix ~ gauge.name); 116 } 117 } 118 119 mutex_ = new TaskMutex(); 120 } 121 122 123 void start() { 124 assert(!running_); 125 126 running_ = true; 127 runWorkerTask(&sender); 128 } 129 130 131 void shutdown() { 132 running_ = false; 133 134 if (mutex_ !is null) { 135 mutex_.destroy(); 136 mutex_ = null; 137 } 138 139 hashCounters_.clear; 140 hashGauges_.clear; 141 142 dataGauges_ = null; 143 dataCounters_ = null; 144 } 145 146 147 private string unescapeError(string x) { 148 if (!x.empty && (x[0] == '\"')) 149 x = x.dropOne.dropBack(1).replace("\\\"", "\"").replace("\\n", "\n").replace("\\t", "\t"); 150 return x; 151 } 152 153 154 private void libratoError(scope HTTPClientResponse res) { 155 if (settings_.errorCallback) { 156 string error = res.bodyReader.readAllUTF8; 157 158 if (auto ContentType = "Content-Type" in res.headers) { 159 if ((*ContentType).indexOf("/json") != -1) { 160 auto json = error.parseJsonString; 161 if (auto perrorsJson = "errors" in json) { 162 foreach(string type, errorJson; *perrorsJson) { 163 if (errorJson.type == Json.Type.array) { 164 error = null; 165 foreach(errorLine; errorJson) 166 error ~= (error.length ? "\n" : null) ~ errorLine.get!string.unescapeError; 167 } else if (errorJson.type == Json.Type..string) { 168 error = errorJson.get!string.unescapeError; 169 } 170 } 171 } 172 } 173 } 174 175 settings_.errorCallback(error); 176 } 177 } 178 179 180 void annotation(Annotation event) { 181 assert(!event.stream.empty, "must specify an annotation stream name"); 182 assert(!event.title.empty, "must specify a title for this event"); 183 184 auto json = jsonWriter(2048); 185 { 186 json.beginObject(); 187 scope(exit) json.endObject(); 188 189 json.field("name").value(event.stream); 190 json.field("title").value(event.title); 191 if (!event.description.empty) 192 json.field("title").value(event.title); 193 if (!event.description.empty) 194 json.field("description").value(event.description); 195 if (!event.source.empty) 196 json.field("source").value(event.source); 197 if (event.start.stdTime != 0) 198 json.field("start_time").value(event.start.toUTC.toUnixTime); 199 if (event.end.stdTime != 0) 200 json.field("end_time").value(event.end.toUTC.toUnixTime); 201 if (!event.links.empty) { 202 json.field("links"); 203 204 json.beginArray(); 205 scope(exit) json.endArray(); 206 207 foreach(l; event.links) { 208 json.beginObject(); 209 scope(exit) json.endObject(); 210 211 assert(!l.rel.empty); 212 assert(!l.href.empty); 213 json.field("rel").value(l.rel); 214 json.field("href").value(l.href); 215 if (!l.label.empty) 216 json.field("label").value(l.label); 217 } 218 } 219 } 220 221 auto url = annotationsURL_ ~ event.stream; 222 requestHTTP(url, (scope HTTPClientRequest req) { 223 req.method = HTTPMethod.POST; 224 req.requestURL = url; 225 req.headers["Content-Type"] = "application/json"; 226 req.headers["Authorization"] = auth_; 227 req.bodyWriter.write(json.json); 228 }, (scope HTTPClientResponse res) { 229 if ((res.statusCode != HTTPStatus.OK) && (res.statusCode != HTTPStatus.Created)) 230 libratoError(res); 231 res.dropBody(); 232 }); 233 } 234 235 236 void registerCounter(string name) { 237 assert(!running_, "cannot call register() once started"); 238 assert(name !in hashGauges_, "'" ~ name ~ "' is already registered as a gauge"); 239 240 auto info = hashCounters_.insert(name); 241 assert(info); 242 243 if (info.name.empty) { 244 info.name = name; 245 info.prefixed = settings_.prefix.empty ? name : (settings_.prefix ~ name); 246 info.slot = cast(ushort)(hashCounters_.length - 1); 247 248 dataCounters_[0] ~= 0.0; 249 dataCounters_[1] ~= 0.0; 250 } else { 251 assert(info.name == name); 252 info.prefixed = settings_.prefix.empty ? name : (settings_.prefix ~ name); 253 } 254 assert(dataCounters_[0].length == dataCounters_[1].length); 255 } 256 257 258 void registerGauge(string name, GaugeFlags flags = GaugeFlags.Default) { 259 assert(!running_, "cannot call register() once started"); 260 assert(name !in hashCounters_, "'" ~ name ~ "' is already registered as a counter"); 261 262 auto info = hashGauges_.insert(name); 263 assert(info); 264 265 if (info.name.empty) { 266 info.name = name; 267 info.prefixed = settings_.prefix.empty ? name : (settings_.prefix ~ name); 268 info.slot = cast(ushort)(hashGauges_.length - 1); 269 info.flags = flags; 270 271 ++dataGauges_[0].length; 272 ++dataGauges_[1].length; 273 } else { 274 assert(info.name == name); 275 info.prefixed = settings_.prefix.empty ? name : (settings_.prefix ~ name); 276 info.flags = flags; 277 } 278 assert(dataGauges_[0].length == dataGauges_[1].length); 279 } 280 281 282 // ok to access settings_ and ungarded as there is no interface to change settings 283 private void sender() { 284 HTTPClient http = connectHTTP(settings_.host, settings_.port, true, settings_.httpSettings); 285 286 auto json = jsonWriter(8192); 287 288 while (running_) { 289 sleep(settings_.intervalSecs.seconds); 290 291 if (settings_.sendCallback) 292 settings_.sendCallback(); 293 294 size_t buffer = buffer_; 295 synchronized(mutex_) { 296 buffer_ = (buffer_ + 1) & 1; 297 } 298 299 auto now = Clock.currTime(UTC()).toUnixTime; 300 301 { 302 json.clear(); 303 json.beginObject(); 304 scope(exit) json.endObject(); 305 306 json.field("counters"); 307 { 308 json.beginArray(); 309 scope(exit) json.endArray(); 310 311 foreach (ref counter; hashCounters_) { 312 auto pvalue = &dataCounters_.ptr[buffer].ptr[counter.slot]; 313 json.beginObject(); 314 scope(exit) json.endObject(); 315 316 json.field("name").value(counter.prefixed); 317 json.field("value").value(*pvalue); 318 } 319 } 320 321 json.field("gauges"); 322 { 323 json.beginArray(); 324 scope(exit) json.endArray(); 325 326 foreach(ref gauge; hashGauges_) { 327 auto pgauge = &dataGauges_.ptr[buffer].ptr[gauge.slot]; 328 auto flags = gauge.flags; 329 330 // note: these aggregations could well be SSE'd 331 if (!pgauge.values.empty) { 332 if ((flags & GaugeFlags.NoAggregate) == 0) { 333 json.beginObject(); 334 scope(exit) json.endObject(); 335 336 json.field("name").value(gauge.prefixed); 337 338 auto vsum = 0.0; 339 auto vsumSq = 0.0; 340 auto vmin = double.max; 341 auto vmax = -double.max; 342 auto count = pgauge.values.length; 343 344 if (flags & GaugeFlags.AggregateOnlyAvg) { 345 foreach(value; pgauge.values) 346 vsum += value; 347 json.field("value").value(vsum / cast(double)count); 348 } else if (flags & GaugeFlags.AggregateNoStdDev) { 349 foreach(value; pgauge.values) { 350 vsum += value; 351 vmin = min(vmin, value); 352 vmax = max(vmax, value); 353 } 354 json.field("count").value(count); 355 json.field("sum").value(vsum); 356 json.field("min").value(vmin); 357 json.field("max").value(vmax); 358 } else { 359 foreach(value; pgauge.values) { 360 vsum += value; 361 vsumSq += value * value; 362 vmin = min(vmin, value); 363 vmax = max(vmax, value); 364 } 365 json.field("count").value(count); 366 json.field("sum").value(vsum); 367 json.field("min").value(vmin); 368 json.field("max").value(vmax); 369 json.field("sum_squares").value(vsumSq); 370 } 371 } else { 372 foreach(i, value; pgauge.values) { 373 json.beginObject(); 374 scope(exit) json.endObject(); 375 376 json.field("name").value(gauge.prefixed); 377 json.field("value").value(value); 378 json.field("measure_time").value(pgauge.times[i].stdTimeToUnixTime); 379 } 380 } 381 pgauge.values.length = 0; 382 pgauge.times.length = 0; 383 } 384 } 385 } 386 387 json.field("source").value(settings_.source); 388 json.field("measure_time").value(now); 389 } 390 391 http.request((scope HTTPClientRequest req) { 392 req.method = HTTPMethod.POST; 393 req.requestURL = metricsURL_; 394 req.headers["Content-Type"] = "application/json"; 395 req.headers["Authorization"] = auth_; 396 req.bodyWriter.write(json.json); 397 }, (scope HTTPClientResponse res) { 398 if (res.statusCode != HTTPStatus.OK) 399 libratoError(res); 400 res.dropBody(); 401 }); 402 } 403 404 http.disconnect; 405 http.destroy; 406 running_ = false; 407 } 408 409 410 void increment(string name, double count = 1.0) { 411 assert(running_, "cannot increment before start() has been called"); 412 413 if (auto counter = hashCounters_.find(name)) { 414 synchronized(mutex_) { 415 dataCounters_.ptr[buffer_].ptr[counter.slot] += count; 416 } 417 } else { 418 assert(false, "unknown counter metric '" ~ name ~ "'"); 419 } 420 } 421 422 423 void counter(string name, double count) { 424 assert(running_, "cannot set counter before start() has been called"); 425 assert(!name.empty, "counter name must not be empty"); 426 427 if (auto counter = hashCounters_.find(name)) { 428 synchronized(mutex_) { 429 dataCounters_.ptr[buffer_].ptr[counter.slot] = count; 430 } 431 } else { 432 assert(false, "unknown counter metric '" ~ name ~ "'"); 433 } 434 } 435 436 437 private void gaugei(alias Filter)(string name, double value) { 438 assert(running_, "cannot add gauge sample before start() has been called"); 439 assert(!name.empty, "gauge name must not be empty"); 440 441 if (auto gauge = hashGauges_.find(name)) { 442 auto flags = gauge.flags; 443 synchronized(mutex_) { 444 auto pgauge = &dataGauges_.ptr[buffer_].ptr[gauge.slot]; 445 pgauge.values ~= Filter(flags, value); 446 if (flags & GaugeFlags.NoAggregate) 447 pgauge.times ~= Clock.currTime(UTC()).stdTime; // converted to unixtime by sender thread 448 } 449 } else { 450 assert(false, "unknown gauge metric '" ~ name ~ "'"); 451 } 452 } 453 454 455 void gauge(string name, double value) { 456 gaugei!((f, x) => x)(name, value); 457 } 458 459 void timed(string name, double milliseconds) { 460 gaugei!((flags, x) => (flags & GaugeFlags.TimeSeconds) ? (x * 0.001) : x)(name, milliseconds); 461 } 462 463 void timed(string name, TickDuration duration) { 464 gaugei!((flags, x) => (flags & GaugeFlags.TimeSeconds) ? (x * 0.001) : x)(name, cast(double)duration.msecs); 465 } 466 467 void timed(string name, Duration duration) { 468 gaugei!((flags, x) => (flags & GaugeFlags.TimeSeconds) ? (x * 0.001) : x)(name, cast(double)duration.total!"msecs"); 469 } 470 471 void timed(string name, SysTime start, SysTime end) { 472 gaugei!((flags, x) => (flags & GaugeFlags.TimeSeconds) ? (x * 0.001) : x)(name, cast(double)(end - start).total!"msecs"); 473 } 474 475 476 shared static ~this() { 477 shutdown(); 478 } 479 480 481 private enum defaultHostName = "metrics-api.librato.com"; 482 private enum defaultPath = "/v1"; 483 private enum defaultPort = 443; 484 485 486 private struct GaugeValue { 487 double[] values; 488 long[] times; 489 } 490 491 492 private struct MetricInfo { 493 ushort slot; 494 ushort flags; 495 string name; 496 string prefixed; 497 } 498 499 500 private __gshared { 501 Settings settings_; 502 503 string auth_; 504 string metricsURL_; 505 string annotationsURL_; 506 507 MetricMap!MetricInfo hashCounters_; 508 MetricMap!MetricInfo hashGauges_; 509 510 double[][2] dataCounters_; 511 GaugeValue[][2] dataGauges_; 512 513 TaskMutex mutex_; 514 515 shared bool running_; 516 shared size_t buffer_; 517 } 518 519 520 private struct MetricMap(T) { 521 T* find(string name) { 522 size_t probeCount = metrics_.length; 523 size_t mask = (probeCount - 1); 524 size_t index = hashOf(name) & mask; 525 size_t probe = 0; 526 527 while (probe != probeCount) { 528 auto metric = &metrics_[index & mask]; 529 if (!metric.name.empty) { 530 if (metric.name == name) 531 return metric; 532 } else { 533 return null; 534 } 535 index = (index + ++probe) & mask; 536 } 537 return null; 538 } 539 540 T* insert(string name) { 541 if (count_ >= ((3 * metrics_.length) >> 2)) // ensure at most 75% load 542 grow(); 543 544 size_t probeCount = metrics_.length; 545 size_t mask = (probeCount - 1); 546 size_t index = hashOf(name) & mask; 547 size_t probe = 0; 548 549 while (probe != probeCount) { 550 auto metric = &metrics_[index & mask]; 551 if (metric.name.empty) { 552 ++count_; 553 return metric; 554 } else if (metric.name == name) { 555 return metric; 556 } 557 558 index = (index + ++probe) & mask; 559 } 560 assert(0); 561 } 562 563 void grow() { 564 T[] metrics; 565 566 const size = max(32, metrics_.length) << 1; 567 const mask = size - 1; 568 metrics.length = size; 569 570 foreach(ref metric; metrics_) { 571 if (!metric.name.empty) 572 metrics[hashOf(metric.name) & mask] = metric; 573 } 574 575 swap(metrics_, metrics); 576 } 577 578 void clear() { 579 count_ = 0; 580 metrics_ = null; 581 } 582 583 @property auto length() const { 584 return count_; 585 } 586 587 @property auto capacity() const { 588 return metrics_.length; 589 } 590 591 T* opIn_r(in string name) { 592 return find(name); 593 } 594 595 int opApply(scope int delegate(ref T) dg) { 596 foreach(ref metric; metrics_) { 597 if (!metric.name.empty) { 598 if (auto res = dg(metric)) 599 return res; 600 } 601 } 602 return 0; 603 } 604 605 private T[] metrics_; 606 private size_t count_; 607 }