1 module vibrato.api;
2 
3 
4 import core.thread;
5 import core.time;
6 
7 import std.algorithm;
8 import std.array;
9 import std.base64;
10 import std.datetime;
11 import std.range;
12 import std.socket;
13 import std.string;
14 
15 import vibe.core.core;
16 import vibe.core.sync;
17 import vibe.data.json;
18 import vibe.inet.url;
19 import vibe.http.client;
20 import vibe.stream.operations;
21 
22 import vibrato.json;
23 
24 
25 class VibratoException : Throwable {
26 	this(string msg, string file = __FILE__, size_t line = __LINE__) pure {
27 		super(msg, file, line);
28 	}
29 }
30 
31 
32 alias SendCallback = void delegate();
33 alias ErrorCallback	= void delegate(string error);
34 
35 
36 struct Settings {
37 	string user;				// librato.com user email
38 	string token;				// librato.com api token
39 	string source;				// this instance's source name - defaults to machine's hostname
40 	string prefix;				// metric name prefix
41 	size_t intervalSecs = 10;	// send/aggregate thread sleep cycle
42 	SendCallback sendCallback;	// this callback will be called from the sender thread right after waking up, and before aggregation
43 	ErrorCallback errorCallback; // this callback will be called from the sender when a librato error occurs i.e. when the api is down
44 	HTTPClientSettings httpSettings;	// http client settings
45 	string host;				// librato.com's host
46 	ushort port;				// librato.com's port
47 	string path;				// librato.com's api path
48 }
49 
50 
51 private size_t hashOf(const(char)[] x) {
52 	size_t hash = 5381;
53 	foreach(i; 0..x.length)
54 		hash = (hash * 33) ^ cast(size_t)(std.ascii.toLower(x.ptr[i]));
55 	return hash;
56 }
57 
58 
59 enum GaugeFlags : ushort {
60 	TimeSeconds			= 1 << 0, // by default time is in msecs
61 	NoAggregate			= 1 << 1, // do not aggregate locally - send every sample and it's timestamp untouched
62 	AggregateOnlyAvg	= 1 << 2, // compute and send only the average
63 	AggregateNoStdDev	= 1 << 3, // compute and send everything (count, min, max, sum, ...), except sum_squares
64 	Default = 0,
65 }
66 
67 
68 struct Annotation {
69 	struct Link {
70 		string label;
71 		string rel;
72 		string href;
73 	}
74 
75 	string stream;
76 	string title;
77 	string description;
78 	string source;
79 	Link[] links;
80 	SysTime start;
81 	SysTime end;
82 }
83 
84 
85 void init(Settings settings) {
86 	settings_ = settings;
87 
88 	if (settings_.host.empty)
89 		settings_.host = defaultHostName;
90 	if (settings_.path.empty)
91 		settings_.path = defaultPath;
92 	if (!settings_.port)
93 		settings_.port = defaultPort;
94 	if (!settings_.source)
95 		settings_.source = Socket.hostName;
96 	if (settings_.httpSettings is null)
97 		settings_.httpSettings = new HTTPClientSettings;
98 	if (settings_.errorCallback is null)
99 		settings_.errorCallback = (string error) {
100 			throw new Exception(format("Librato.com error: %s", error));
101 		};
102 
103 	auth_ = "Basic " ~ cast(string)Base64.encode(cast(ubyte[])(settings_.user ~ ":" ~ settings_.token));
104 
105 	metricsURL_ = settings_.path ~ "/metrics";
106 	annotationsURL_ = "https://" ~ settings_.host ~ "/" ~ settings_.path ~ "/annotations/";
107 
108 	if (!settings_.prefix.empty) {
109 		foreach(ref counter; hashCounters_) {
110 			if (!counter.name.empty)
111 				counter.prefixed = settings_.prefix.empty ? counter.name : (settings_.prefix ~ counter.name);
112 		}
113 		foreach(ref gauge; hashGauges_) {
114 			if (!gauge.name.empty)
115 				gauge.prefixed = settings_.prefix.empty ? gauge.name : (settings_.prefix ~ gauge.name);
116 		}
117 	}
118 
119 	mutex_ = new TaskMutex();
120 }
121 
122 
123 void start() {
124 	assert(!running_);
125 
126 	running_ = true;
127 	runWorkerTask(&sender);
128 }
129 
130 
131 void shutdown() {
132 	running_ = false;
133 
134 	if (mutex_ !is null) {
135 		mutex_.destroy();
136 		mutex_ = null;
137 	}
138 
139 	hashCounters_.clear;
140 	hashGauges_.clear;
141 
142 	dataGauges_ = null;
143 	dataCounters_ = null;
144 }
145 
146 
147 private string unescapeError(string x) {
148 	if (!x.empty && (x[0] == '\"'))
149 		x = x.dropOne.dropBack(1).replace("\\\"", "\"").replace("\\n", "\n").replace("\\t", "\t");
150 	return x;
151 }
152 
153 
154 private void libratoError(scope HTTPClientResponse res) {
155 	if (settings_.errorCallback) {
156 		string error = res.bodyReader.readAllUTF8;
157 
158 		if (auto ContentType = "Content-Type" in res.headers) {
159 			if ((*ContentType).indexOf("/json") != -1) {
160 				auto json = error.parseJsonString;
161 				if (auto perrorsJson = "errors" in json) {
162 					foreach(string type, errorJson; *perrorsJson) {
163 						if (errorJson.type == Json.Type.array) {
164 							error = null;
165 							foreach(errorLine; errorJson)
166 								error ~= (error.length ? "\n" : null) ~ errorLine.get!string.unescapeError;
167 						} else if (errorJson.type == Json.Type..string) {
168 							error = errorJson.get!string.unescapeError;
169 						}
170 					}
171 				}
172 			}
173 		}
174 
175 		settings_.errorCallback(error);
176 	}
177 }
178 
179 
180 void annotation(Annotation event) {
181 	assert(!event.stream.empty, "must specify an annotation stream name");
182 	assert(!event.title.empty, "must specify a title for this event");
183 
184 	auto json = jsonWriter(2048);
185 	{
186 		json.beginObject();
187 		scope(exit) json.endObject();
188 
189 		json.field("name").value(event.stream);
190 		json.field("title").value(event.title);
191 		if (!event.description.empty)
192 			json.field("title").value(event.title);
193 		if (!event.description.empty)
194 			json.field("description").value(event.description);
195 		if (!event.source.empty)
196 			json.field("source").value(event.source);
197 		if (event.start.stdTime != 0)
198 			json.field("start_time").value(event.start.toUTC.toUnixTime);
199 		if (event.end.stdTime != 0)
200 			json.field("end_time").value(event.end.toUTC.toUnixTime);
201 		if (!event.links.empty) {
202 			json.field("links");
203 
204 			json.beginArray();
205 			scope(exit) json.endArray();
206 
207 			foreach(l; event.links) {
208 				json.beginObject();
209 				scope(exit) json.endObject();
210 
211 				assert(!l.rel.empty);
212 				assert(!l.href.empty);
213 				json.field("rel").value(l.rel);
214 				json.field("href").value(l.href);
215 				if (!l.label.empty)
216 					json.field("label").value(l.label);
217 			}
218 		}
219 	}
220 
221 	auto url = annotationsURL_ ~ event.stream;
222 	requestHTTP(url, (scope HTTPClientRequest req) {
223 		req.method = HTTPMethod.POST;
224 		req.requestURL = url;
225 		req.headers["Content-Type"] = "application/json";
226 		req.headers["Authorization"] = auth_;
227 		req.bodyWriter.write(json.json);
228 	}, (scope HTTPClientResponse res) {
229 		if ((res.statusCode != HTTPStatus.OK) && (res.statusCode != HTTPStatus.Created))
230 			libratoError(res);
231 		res.dropBody();
232 	});
233 }
234 
235 
236 void registerCounter(string name) {
237 	assert(!running_, "cannot call register() once started");
238 	assert(name !in hashGauges_, "'" ~ name ~ "' is already registered as a gauge");
239 
240 	auto info = hashCounters_.insert(name);
241 	assert(info);
242 
243 	if (info.name.empty) {
244 		info.name = name;
245 		info.prefixed = settings_.prefix.empty ? name : (settings_.prefix ~ name);
246 		info.slot = cast(ushort)(hashCounters_.length - 1);
247 
248 		dataCounters_[0] ~= 0.0;
249 		dataCounters_[1] ~= 0.0;
250 	} else {
251 		assert(info.name == name);
252 		info.prefixed = settings_.prefix.empty ? name : (settings_.prefix ~ name);
253 	}
254 	assert(dataCounters_[0].length == dataCounters_[1].length);
255 }
256 
257 
258 void registerGauge(string name, GaugeFlags flags = GaugeFlags.Default) {
259 	assert(!running_, "cannot call register() once started");
260 	assert(name !in hashCounters_, "'" ~ name ~ "' is already registered as a counter");
261 
262 	auto info = hashGauges_.insert(name);
263 	assert(info);
264 
265 	if (info.name.empty) {
266 		info.name = name;
267 		info.prefixed = settings_.prefix.empty ? name : (settings_.prefix ~ name);
268 		info.slot = cast(ushort)(hashGauges_.length - 1);
269 		info.flags = flags;
270 
271 		++dataGauges_[0].length;
272 		++dataGauges_[1].length;
273 	} else {
274 		assert(info.name == name);
275 		info.prefixed = settings_.prefix.empty ? name : (settings_.prefix ~ name);
276 		info.flags = flags;
277 	}
278 	assert(dataGauges_[0].length == dataGauges_[1].length);
279 }
280 
281 
282 // ok to access settings_ and ungarded as there is no interface to change settings
283 private void sender() {
284 	HTTPClient http = connectHTTP(settings_.host, settings_.port, true, settings_.httpSettings);
285 
286 	auto json = jsonWriter(8192);
287 
288 	while (running_) {
289 		sleep(settings_.intervalSecs.seconds);
290 
291 		if (settings_.sendCallback)
292 			settings_.sendCallback();
293 
294 		size_t buffer = buffer_;
295 		synchronized(mutex_) {
296 			buffer_ = (buffer_ + 1) & 1;
297 		}
298 
299 		auto now = Clock.currTime(UTC()).toUnixTime;
300 
301 		{
302 			json.clear();
303 			json.beginObject();
304 			scope(exit) json.endObject();
305 
306 			json.field("counters");
307 			{
308 				json.beginArray();
309 				scope(exit) json.endArray();
310 
311 				foreach (ref counter; hashCounters_) {
312 					auto pvalue = &dataCounters_.ptr[buffer].ptr[counter.slot];
313 					json.beginObject();
314 					scope(exit) json.endObject();
315 
316 					json.field("name").value(counter.prefixed);
317 					json.field("value").value(*pvalue);
318 				}
319 			}
320 
321 			json.field("gauges");
322 			{
323 				json.beginArray();
324 				scope(exit) json.endArray();
325 
326 				foreach(ref gauge; hashGauges_) {
327 					auto pgauge = &dataGauges_.ptr[buffer].ptr[gauge.slot];
328 					auto flags = gauge.flags;
329 
330 					// note: these aggregations could well be SSE'd
331 					if (!pgauge.values.empty) {
332 						if ((flags & GaugeFlags.NoAggregate) == 0) {
333 							json.beginObject();
334 							scope(exit) json.endObject();
335 
336 							json.field("name").value(gauge.prefixed);
337 
338 							auto vsum = 0.0;
339 							auto vsumSq = 0.0;
340 							auto vmin = double.max;
341 							auto vmax = -double.max;
342 							auto count = pgauge.values.length;
343 
344 							if (flags & GaugeFlags.AggregateOnlyAvg) {
345 								foreach(value; pgauge.values)
346 									vsum += value;
347 								json.field("value").value(vsum / cast(double)count);
348 							} else if (flags & GaugeFlags.AggregateNoStdDev) {
349 								foreach(value; pgauge.values) {
350 									vsum += value;
351 									vmin = min(vmin, value);
352 									vmax = max(vmax, value);
353 								}
354 								json.field("count").value(count);
355 								json.field("sum").value(vsum);
356 								json.field("min").value(vmin);
357 								json.field("max").value(vmax);
358 							} else {
359 								foreach(value; pgauge.values) {
360 									vsum += value;
361 									vsumSq += value * value;
362 									vmin = min(vmin, value);
363 									vmax = max(vmax, value);
364 								}
365 								json.field("count").value(count);
366 								json.field("sum").value(vsum);
367 								json.field("min").value(vmin);
368 								json.field("max").value(vmax);
369 								json.field("sum_squares").value(vsumSq);
370 							}
371 						} else {
372 							foreach(i, value; pgauge.values) {
373 								json.beginObject();
374 								scope(exit) json.endObject();
375 
376 								json.field("name").value(gauge.prefixed);
377 								json.field("value").value(value);
378 								json.field("measure_time").value(pgauge.times[i].stdTimeToUnixTime);
379 							}
380 						}
381 						pgauge.values.length = 0;
382 						pgauge.times.length = 0;
383 					}
384 				}
385 			}
386 
387 			json.field("source").value(settings_.source);
388 			json.field("measure_time").value(now);
389 		}
390 
391 		http.request((scope HTTPClientRequest req) {
392 			req.method = HTTPMethod.POST;
393 			req.requestURL = metricsURL_;
394 			req.headers["Content-Type"] = "application/json";
395 			req.headers["Authorization"] = auth_;
396 			req.bodyWriter.write(json.json);
397 		}, (scope HTTPClientResponse res) {
398 			if (res.statusCode != HTTPStatus.OK)
399 				libratoError(res);
400 			res.dropBody();
401 		});
402 	}
403 
404 	http.disconnect;
405 	http.destroy;
406 	running_ = false;
407 }
408 
409 
410 void increment(string name, double count = 1.0) {
411 	assert(running_, "cannot increment before start() has been called");
412 
413 	if (auto counter = hashCounters_.find(name)) {
414 		synchronized(mutex_) {
415 			dataCounters_.ptr[buffer_].ptr[counter.slot] += count;
416 		}
417 	} else {
418 		assert(false, "unknown counter metric '" ~ name ~ "'");
419 	}
420 }
421 
422 
423 void counter(string name, double count) {
424 	assert(running_, "cannot set counter before start() has been called");
425 	assert(!name.empty, "counter name must not be empty");
426 
427 	if (auto counter = hashCounters_.find(name)) {
428 		synchronized(mutex_) {
429 			dataCounters_.ptr[buffer_].ptr[counter.slot] = count;
430 		}
431 	} else {
432 		assert(false, "unknown counter metric '" ~ name ~ "'");
433 	}
434 }
435 
436 
437 private void gaugei(alias Filter)(string name, double value) {
438 	assert(running_, "cannot add gauge sample before start() has been called");
439 	assert(!name.empty, "gauge name must not be empty");
440 
441 	if (auto gauge = hashGauges_.find(name)) {
442 		auto flags = gauge.flags;
443 		synchronized(mutex_) {
444 			auto pgauge = &dataGauges_.ptr[buffer_].ptr[gauge.slot];
445 			pgauge.values ~= Filter(flags, value);
446 			if (flags & GaugeFlags.NoAggregate)
447 				pgauge.times ~= Clock.currTime(UTC()).stdTime; // converted to unixtime by sender thread
448 		}
449 	} else {
450 		assert(false, "unknown gauge metric '" ~ name ~ "'");
451 	}
452 }
453 
454 
455 void gauge(string name, double value) {
456 	gaugei!((f, x) => x)(name, value);
457 }
458 
459 void timed(string name, double milliseconds) {
460 	gaugei!((flags, x) => (flags & GaugeFlags.TimeSeconds) ? (x * 0.001) : x)(name, milliseconds);
461 }
462 
463 void timed(string name, TickDuration duration) {
464 	gaugei!((flags, x) => (flags & GaugeFlags.TimeSeconds) ? (x * 0.001) : x)(name, cast(double)duration.msecs);
465 }
466 
467 void timed(string name, Duration duration) {
468 	gaugei!((flags, x) => (flags & GaugeFlags.TimeSeconds) ? (x * 0.001) : x)(name, cast(double)duration.total!"msecs");
469 }
470 
471 void timed(string name, SysTime start, SysTime end) {
472 	gaugei!((flags, x) => (flags & GaugeFlags.TimeSeconds) ? (x * 0.001) : x)(name, cast(double)(end - start).total!"msecs");
473 }
474 
475 
476 shared static ~this() {
477 	shutdown();
478 }
479 
480 
481 private enum defaultHostName = "metrics-api.librato.com";
482 private enum defaultPath = "/v1";
483 private enum defaultPort = 443;
484 
485 
486 private struct GaugeValue {
487 	double[] values;
488 	long[] times;
489 }
490 
491 
492 private struct MetricInfo {
493 	ushort slot;
494 	ushort flags;
495 	string name;
496 	string prefixed;
497 }
498 
499 
500 private __gshared {
501 	Settings settings_;
502 
503 	string auth_;
504 	string metricsURL_;
505 	string annotationsURL_;
506 
507 	MetricMap!MetricInfo hashCounters_;
508 	MetricMap!MetricInfo hashGauges_;
509 
510 	double[][2] dataCounters_;
511 	GaugeValue[][2] dataGauges_;
512 
513 	TaskMutex mutex_;
514 
515 	shared bool running_;
516 	shared size_t buffer_;
517 }
518 
519 
520 private struct MetricMap(T) {
521 	T* find(string name) {
522 		size_t probeCount = metrics_.length;
523 		size_t mask = (probeCount - 1);
524 		size_t index = hashOf(name) & mask;
525 		size_t probe = 0;
526 
527 		while (probe != probeCount) {
528 			auto metric = &metrics_[index & mask];
529 			if (!metric.name.empty) {
530 				if (metric.name == name)
531 					return metric;
532 			} else {
533 				return null;
534 			}
535 			index = (index + ++probe) & mask;
536 		}
537 		return null;
538 	}
539 
540 	T* insert(string name) {
541 		if (count_ >= ((3 * metrics_.length) >> 2)) // ensure at most 75% load
542 			grow();
543 
544 		size_t probeCount = metrics_.length;
545 		size_t mask = (probeCount - 1);
546 		size_t index = hashOf(name) & mask;
547 		size_t probe = 0;
548 
549 		while (probe != probeCount) {
550 			auto metric = &metrics_[index & mask];
551 			if (metric.name.empty) {
552 				++count_;
553 				return metric;
554 			} else if (metric.name == name) {
555 				return metric;
556 			}
557 
558 			index = (index + ++probe) & mask;
559 		}
560 		assert(0);
561 	}
562 
563 	void grow() {
564 		T[] metrics;
565 
566 		const size = max(32, metrics_.length) << 1;
567 		const mask = size - 1;
568 		metrics.length = size;
569 
570 		foreach(ref metric; metrics_) {
571 			if (!metric.name.empty)
572 				metrics[hashOf(metric.name) & mask] = metric;
573 		}
574 
575 		swap(metrics_, metrics);
576 	}
577 
578 	void clear() {
579 		count_ = 0;
580 		metrics_ = null;
581 	}
582 
583 	@property auto length() const {
584 		return count_;
585 	}
586 
587 	@property auto capacity() const {
588 		return metrics_.length;
589 	}
590 
591 	T* opIn_r(in string name) {
592 		return find(name);
593 	}
594 
595 	int opApply(scope int delegate(ref T) dg) {
596 		foreach(ref metric; metrics_) {
597 			if (!metric.name.empty) {
598 				if (auto res = dg(metric))
599 					return res;
600 			}
601 		}
602 		return 0;
603 	}
604 
605 	private T[] metrics_;
606 	private size_t count_;
607 }