forked from taskcluster/taskcluster
-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.js
1003 lines (855 loc) · 30.2 KB
/
task.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Handler of individual tasks beginning after the task is claimed and ending
* with posting task results.
*/
const Debug = require('debug');
const DockerProc = require('dockerode-process');
const { PassThrough } = require('stream');
const States = require('./states');
const taskcluster = require('taskcluster-client');
const promiseRetry = require('promise-retry');
const os = require('os');
const features = require('./features');
const getHostname = require('./util/hostname');
const { fmtLog, fmtErrorLog } = require('./log');
const scopes = require('./scopes');
const { validatePayload } = require('./util/validate_schema');
const waitForEvent = require('./wait_for_event');
const _ = require('lodash');
const EventEmitter = require('events');
const libUrls = require('taskcluster-lib-urls');
const { version } = require('../package.json');
let debug = new Debug('runTask');
// TODO probably a terrible error message, look at making it better later
const CANCEL_ERROR = 'Task was canceled by another entity. This can happen using ' +
'a taskcluster client or by cancelling a task within Treeherder.';
/*
@example
taskEnvToDockerEnv({ FOO: true });
// => ['FOO=true']
@private
@param {Object} env key=value pair for environment variables.
@return {Array} the docker array format for variables
*/
function taskEnvToDockerEnv(env) {
if (!env || typeof env !== 'object') {
return env;
}
return Object.keys(env).reduce(function(map, name) {
map.push(name + '=' + env[name]);
return map;
}, []);
}
/**
Convert the feature flags into a state handler.
@param {Object} task definition.
@param {Monitor} monitor object implementing record/measure methods
*/
function buildStateHandlers(runtime, task, monitor) {
let handlers = [];
let featureFlags = task.payload.features || {};
// performs a set difference (featureFlags - features) to get the set of non supported requested features
// eslint-disable-next-line no-prototype-builtins
let diff = _.keys(featureFlags).filter(x => !features.hasOwnProperty(x));
if (diff.length) {
throw new Error(`${diff.join()} ${diff.length > 1 ? 'are' : 'is'} not part of valid features`);
}
diff = _.keys(featureFlags).filter(x => !runtime.features[x].enabled);
if (diff.length) {
throw new Error(`${diff.join()} ${diff.length > 1 ? 'are' : 'is'} not enabled on this worker`);
}
for (let flag of Object.keys(features || {})) {
let enabled = (flag in featureFlags) ?
featureFlags[flag] : (runtime.features[flag].enabled && features[flag].defaults);
if (enabled) {
handlers.push(new (features[flag].module)());
debug(flag);
monitor.count(`task.feature.${flag}`);
}
}
return new States(handlers);
}
/**
Create a list of cached volumes that will be mounted within the docker container.
@param {object} volume cache
@param {object} volumes to mount in the container
*/
async function buildVolumeBindings(runtime, taskVolumeBindings, volumeCache, expandedScopes) {
let scopeExpression = {
AllOf: Object.keys(taskVolumeBindings || {}).map((taskVolumeBinding) => ({
AnyOf: [
`docker-worker:cache:${taskVolumeBinding}`,
`docker-worker:cache:${taskVolumeBinding}:${runtime.workerPool}`,
],
})),
};
const satisfyingScopes = scopes.scopesSatisfying(expandedScopes, scopeExpression);
if (!satisfyingScopes) {
let unsatisfied = scopes.removeGivenScopes(expandedScopes, scopeExpression);
throw new Error([
'Insufficient scopes to attach cache volumes.',
'The task is missing the following scopes:',
'',
'```',
`${unsatisfied}`,
'```',
'This requested devices requires the task scopes to satisfy the following scope expression:',
'```',
`${scopeExpression}`,
'```',
].join('\n'));
}
let bindings = [];
let caches = [];
for (let volumeName of Object.keys(taskVolumeBindings || {})) {
let cacheInstance = await volumeCache.get(volumeName);
let binding = cacheInstance.path + ':' + taskVolumeBindings[volumeName];
bindings.push(binding);
caches.push(cacheInstance.key);
}
return [caches, bindings];
}
function runAsPrivileged(runtime, task, allowPrivilegedTasks) {
let taskCapabilities = task.payload.capabilities || {};
let privilegedTask = taskCapabilities.privileged || false;
if (!privilegedTask) {return false;}
if (!scopes.scopeMatch(task.scopes, [
['docker-worker:capability:privileged'],
[`docker-worker:capability:privileged:${runtime.workerPool}`],
])) {
throw new Error(
'Insufficient scopes to run task in privileged mode. Try ' +
`adding docker-worker:capability:privileged:${runtime.workerPool} to the .scopes array`,
);
}
if (!allowPrivilegedTasks) {
throw new Error(
'Cannot run task using docker privileged mode. Worker ' +
'must be enabled to allow running of privileged tasks.',
);
}
return true;
}
async function buildDeviceBindings(runtime, devices, expandedScopes) {
let scopeExpression = {
AllOf: Object.keys(devices || {}).map((device) => ({
AnyOf: [
`docker-worker:capability:device:${device}`,
`docker-worker:capability:device:${device}:${runtime.workerPool}`,
],
})),
};
const satisfyingScopes = scopes.scopesSatisfying(expandedScopes, scopeExpression);
if (!satisfyingScopes) {
let unsatisfied = scopes.removeGivenScopes(expandedScopes, scopeExpression);
throw new Error([
'Insufficient scopes to attach devices to task container.',
'The task is missing the following scopes:',
'',
'```',
`${unsatisfied}`,
'```',
'This requested devices requires the task scopes to satisfy the following scope expression:',
'',
'```',
`${scopeExpression}`,
'```',
].join('\n'));
}
let deviceBindings = [];
let bindMounts = [];
for (let deviceType of Object.keys(devices || {})) {
let device = devices[deviceType];
device.mountPoints.forEach((mountPoint) => {
deviceBindings.push(
{
PathInContainer: mountPoint,
PathOnHost: mountPoint,
CgroupPermissions: 'rwm',
},
);
});
if (device.binds) {
bindMounts = _.union(bindMounts, device.binds);
}
}
return { deviceBindings, bindMounts };
}
class Reclaimer {
constructor(runtime, task, claim) {
this.runtime = runtime;
this.task = task;
this.claim = claim;
this.stopped = false;
// start things off
this.scheduleReclaim();
}
/**
* Stop reclaiming. If a reclaim is already in progress, it will complete.
*/
stop() {
this.stopped = true;
this.clearClaimTimeout();
}
/**
* Determine when the right time is to issue another reclaim then schedule it
* via setTimeout.
*/
scheduleReclaim() {
let claim = this.claim;
// Figure out when to issue the next claim...
let takenFor = (new Date(claim.takenUntil) - new Date());
let nextClaim = takenFor / this.runtime.task.reclaimDivisor;
// This is tricky ensure we have logs...
this.log('next claim', { time: nextClaim });
// Figure out when we need to make the next claim...
this.clearClaimTimeout();
this.claimTimeoutId = setTimeout(async () => await this.reclaimTask(), nextClaim);
}
/**
* Clear next reclaim if one is pending...
*/
clearClaimTimeout() {
if (this.claimTimeoutId) {
clearTimeout(this.claimTimeoutId);
this.claimTimeoutId = null;
}
}
/**
* Reclaim the current task and schedule the next reclaim...
*/
async reclaimTask() {
let task = this.claim.task;
if (this.stopped) {
return;
}
this.log('reclaiming task');
try {
let queue = this.task.createQueue(this.claim.credentials);
this.claim = await queue.reclaimTask(
this.claim.status.taskId, this.claim.runId);
// reclaim does not return the task, so carry that forward from the previous
// claim
this.claim.task = task;
} catch (e) {
let errorMessage = `Could not reclaim task. ${e.stack || e}`;
this.log('error reclaiming task', { error: errorMessage });
// If status code is 409, assume that the run has already been resolved
// and/or the deadline-exceeded. Task run should be handled as though it were
// canceled.
if (e.statusCode === 409) {
this.task.cancel('canceled', errorMessage);
} else {
// if this is a permissions error, then this call is unlikely to work,
// but let's give it a shot!
this.task.abort('internal-error');
}
return;
}
this.task.queue = this.task.createQueue(this.claim.credentials);
this.task.emit('credentials', this.claim.credentials);
this.log('reclaimed task');
await this.scheduleReclaim();
}
log(msg, options) {
this.runtime.log(msg, _.defaults({}, options || {}, {
taskId: this.claim.status.taskId,
runId: this.claim.runId,
takenUntil: this.claim.takenUntil,
}));
}
}
class Task extends EventEmitter {
/**
@param {Object} runtime global runtime.
@param {Object} task for this instance.
@param {Object} claim details for this instance
@param {Object} options
@param {Number} [options.cpusetCpus] cpu(s) to use for this container/task.
*/
constructor(runtime, task, claim, options) {
super();
this.runtime = runtime;
this.task = task;
this.claim = claim;
this.status = this.claim.status;
this.runId = this.claim.runId;
this.taskState = 'pending';
this.options = options;
this.queue = this.createQueue(this.claim.credentials);
// Primarly log of all actions for the task.
this.stream = new PassThrough();
try {
// states actions.
this.states = buildStateHandlers(this.runtime, this.task, this.runtime.monitor);
} catch (err) {
this.abortRun(fmtErrorLog(err));
throw err;
}
}
/**
Build the docker container configuration for this task.
@param {Array[dockerode.Container]} [links] list of dockerode containers.
@param {object} [baseEnv] Environment variables that can be overwritten.
*/
async dockerConfig(imageId, linkInfo) {
let config = this.task.payload;
// Allow task specific environment vars to overwrite those provided by hooks
let env = _.defaults({}, config.env || {}, linkInfo.env || {});
// Universally useful environment variables describing the current task.
// Note: these environment variables cannot be overwritten by anyone, we
// rely on these for self-validating tasks.
env.TASK_ID = this.status.taskId;
env.RUN_ID = this.runId;
env.TASKCLUSTER_WORKER_TYPE = this.runtime.workerType;
env.TASKCLUSTER_WORKER_POOL = this.runtime.workerPool;
env.TASKCLUSTER_INSTANCE_TYPE = this.runtime.workerNodeType;
env.TASKCLUSTER_WORKER_GROUP = this.runtime.workerGroup;
env.TASKCLUSTER_PUBLIC_IP = this.runtime.publicIp;
env.TASKCLUSTER_ROOT_URL = this.runtime.rootUrl;
env.TASKCLUSTER_WORKER_LOCATION = this.runtime.workerLocation;
let privilegedTask = runAsPrivileged(
this.runtime, this.task, this.runtime.dockerConfig.allowPrivileged,
);
let procConfig = {
start: {},
create: {
Image: imageId,
Cmd: config.command,
Hostname: '',
User: '',
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: true,
OpenStdin: false,
StdinOnce: false,
Env: taskEnvToDockerEnv(env),
HostConfig: {
Privileged: privilegedTask,
ShmSize: 1800000000,
ExtraHosts: [
'localhost.localdomain:127.0.0.1', // Bug 1488148
],
},
},
};
// Zero is a valid option so only check for existence.
if ('cpusetCpus' in this.options) {
procConfig.create.HostConfig.CpusetCpus = this.options.cpusetCpus;
}
// expand the task's scopes for access checks
let auth = new taskcluster.Auth({
rootUrl: this.runtime.rootUrl,
credentials: this.runtime.taskcluster,
});
let expandedScopes = (await auth.expandScopes({ scopes: this.task.scopes })).scopes;
if (linkInfo.links) {
procConfig.create.HostConfig.Links = linkInfo.links.map(link => {
return link.name + ':' + link.alias;
});
}
// Bindings from linkInfo
let binds = linkInfo.binds.map(b => {
let binding = `${b.source}:${b.target}`;
if (b.readOnly) {
binding += ':ro';
}
return binding;
});
if (this.options.devices) {
let bindings = await buildDeviceBindings(this.runtime, this.options.devices, expandedScopes);
procConfig.create.HostConfig['Devices'] = bindings.deviceBindings;
binds = _.union(binds, bindings.bindMounts);
}
if (this.task.payload.cache) {
let bindings = await buildVolumeBindings(this.runtime, this.task.payload.cache,
this.runtime.volumeCache, expandedScopes);
this.volumeCaches = bindings[0];
binds = _.union(binds, bindings[1]);
}
// If we have any binds, add them to HostConfig
if (binds.length > 0) {
procConfig.create.HostConfig.Binds = binds;
}
if(this.task.payload.features && this.task.payload.features.interactive) {
//TODO: test with things that aren't undefined
let oldEntrypoint = (await this.runtime.docker.getImage(imageId).inspect()).Entrypoint;
if(typeof oldEntrypoint === 'string') {
oldEntrypoint = ['/bin/sh', '-c', oldEntrypoint];
} else if(oldEntrypoint === undefined) {
oldEntrypoint = [];
}
procConfig.create.Entrypoint = ['/.taskclusterutils/busybox',
'sh',
'-e',
'/.taskclusterutils/interactive_wrapper_run.sh']
.concat(oldEntrypoint);
}
if(this.task.payload.features && this.task.payload.features.allowPtrace) {
procConfig.create.HostConfig.CapAdd = ['SYS_PTRACE'];
}
return procConfig;
}
writeLogHeader() {
let header = [
`Task ID: ${this.status.taskId}`,
`Worker ID: ${this.runtime.workerId}`,
`Worker Group: ${this.runtime.workerGroup}`,
`Worker Node Type: ${this.runtime.workerNodeType}`,
`Worker Pool: ${this.runtime.workerPool}`,
`Worker Version: ${version}`,
`Public IP: ${this.runtime.publicIp}`,
`Hostname: ${os.hostname()}`,
];
//
// List caches if used...
if (this.task.payload.cache) {
for (let key of Object.keys(this.task.payload.cache) || {}) {
let path = this.task.payload.cache[key];
header.push(`using cache "${key}" -> ${path}`);
}
}
for (let line of header) {
this.stream.write(fmtLog(line));
}
this.stream.write('\r\n');
}
logFooter(success, exitCode, start, finish) {
// Human readable success/failure thing...
let humanSuccess = success ?
'Successful' :
'Unsuccessful';
// Yes, date subtraction yields a Number.
let duration = (finish - start) / 1000;
return fmtLog(
'%s task run with exit code: %d completed in %d seconds',
humanSuccess, exitCode, duration,
);
}
logSchemaErrors(prefix, errors) {
return fmtErrorLog(
'%s format is invalid json schema errors:\n %s',
prefix, JSON.stringify(errors, null, 2),
);
}
/**
* Aborts a run that is currently running or being prepared to run (pulling images,
* establishing states, etc). This will optionally write an error to the stream
* and then write the footer and kill states.
*
* @param {String} error - Option error to write to the stream prior to aborting
*/
async abortRun(error = '') {
if (!this.isCanceled()) {
this.taskState = 'aborted';
}
this.runtime.monitor.count('task.state.abort');
this.runtime.log('task aborted', {
taskId: this.status.taskId,
runId: this.runId,
exception: this.taskException || '',
error: error.stack || error,
});
if (error) {this.stream.write(error);}
// Ensure that the stream has completely finished.
await this.stream.end(this.logFooter(
false, // unsuccessful task
-1, // negative exit code indicates infrastructure errors usually.
this.taskStart, // duration details...
new Date(),
));
try {
// Run killed hook
await this.states.killed(this);
}
catch (e) {
// Do not throw, killing the states is a best effort here when aborting
//
this.runtime.log('error killing states', {
error: `Could not kill states properly. ${e.stack}`,
});
}
if (this.isAborted()) {
let queue = this.queue;
let reporter = this.taskException ? queue.reportException : queue.reportFailed;
let reportDetails = [this.status.taskId, this.runId];
if (this.taskException) {reportDetails.push({ reason: this.taskException });}
await reporter.apply(queue, reportDetails);
}
this.runtime.log('task resolved', {
taskId: this.status.taskId,
runId: this.runId,
taskState: this.taskState,
});
return false;
}
/**
* Resolves a run that has completed and reports to the proper exchange.
*
* If a task has been canceled or aborted, abortRun() should be used since the
* run did not complete.
*
* @param {Boolean} success
*/
async completeRun(success) {
let queue = this.queue;
let reportDetails = [this.status.taskId, this.runId];
let reporter;
let taskState;
if (success) {
reporter = queue.reportCompleted;
taskState = 'completed';
} else if (!this.task.payload.onExitStatus) {
reporter = queue.reportFailed;
taskState = 'failed';
} else {
let retry = this.task.payload.onExitStatus.retry;
if (retry && retry.includes(this.exitCode)) {
taskState = 'retry';
reportDetails.push({ reason: 'intermittent-task' });
reporter = queue.reportException;
} else {
reporter = queue.reportFailed;
taskState = 'failed';
}
let purgeStatuses = this.task.payload.onExitStatus && this.task.payload.onExitStatus.purgeCaches;
if (purgeStatuses && purgeStatuses.includes(this.exitCode)) {
for (let cacheKey of this.volumeCaches) {
this.runtime.volumeCache.purgeInstance(cacheKey);
}
}
}
await reporter.apply(queue, reportDetails);
this.runtime.log('task resolved', {
taskId: this.status.taskId,
runId: this.runId,
taskState: taskState,
});
}
/**
* Schedule reclaims of each claim
*/
scheduleReclaims() {
this.reclaimer = new Reclaimer(this.runtime, this, this.claim);
}
stopReclaims() {
this.reclaimer.stop();
this.reclaimer = null;
}
setRuntimeTimeout(maxRuntime) {
let maxRuntimeMS = maxRuntime * 1000;
let runtimeTimeoutId = setTimeout(function() {
this.taskState = 'aborted';
this.runtime.monitor.count('task.runtimeExceeded');
this.runtime.log('task max runtime timeout', {
maxRunTime: this.task.payload.maxRunTime,
taskId: this.status.taskId,
runId: this.runId,
});
// we don't wait for the promise to resolve just trigger kill here which
// will cause run to stop processing the task and give us an error
// exit code.
this.dockerProcess.kill();
this.stream.write(fmtErrorLog(
'Task timeout after %d seconds. Force killing container.',
this.task.payload.maxRunTime,
));
}.bind(this), maxRuntimeMS);
return runtimeTimeoutId;
}
async start() {
this.runtime.log('task start', {
taskId: this.status.taskId,
runId: this.runId,
});
// Task has already been claimed, schedule reclaiming
await this.scheduleReclaims();
let success;
try {
success = await this.run();
} catch (e) {
// TODO: Reconsider if we should mark the task as failed or something else
// at this point... I intentionally did not mark the task completed
// here as to allow for a retry by another worker, etc...
this.stopReclaims();
// If task ends prematurely, make sure the container and volume caches get
// flagged to be cleaned up.
if (this.dockerProcess && this.dockerProcess.container) {
this.runtime.gc.removeContainer(this.dockerProcess.container.id, this.volumeCaches);
}
throw e;
}
// Called again outside so we don't run this twice in the same try/catch
// segment potentially causing a loop...
this.stopReclaims();
if (this.dockerProcess && this.dockerProcess.container) {
this.runtime.gc.removeContainer(this.dockerProcess.container.id, this.volumeCaches);
}
// Mark the task appropriately now that all internal state is cleaned up.
if (!this.isCanceled() && !this.isAborted()) {await this.completeRun(success);}
}
isAborted() {
return this.taskState === 'aborted';
}
isCanceled() {
return this.taskState === 'canceled';
}
/**
* Aborts the running of the task. This is similar to cancelling a task, but
* will allow time to upload artifacts and report the run as an exception instead.
*
* @param {String} reason - Reason for aborting the test run (Example: worker-shutdown)
*/
abort(reason) {
debug(`aborting task ${this.status.taskId} with reason ${reason}`);
this.stopReclaims();
this.taskState = 'aborted';
this.taskException = reason;
if (this.dockerProcess) {this.dockerProcess.kill();}
this.stream.write(
fmtErrorLog(`Task has been aborted prematurely. Reason: ${reason}`),
);
}
/**
* Cancel the running of the task. Task cancellation was performed by an external
* entity and has already been published to task-exception exchange. This will
* kill the docker container that might be running, attempt to release resources
* that were linked, as well as prevent artifacts from uploading, which cannot
* be done after a run is resolved.
*
* @param {String} reason - Reason for cancellation
* @param {String} error - Optional error message to provide.
*/
cancel(exception, errorMessage = CANCEL_ERROR) {
this.taskState = 'canceled';
this.taskException = exception;
this.runtime.log('cancel task', {
taskId: this.status.taskId,
runId: this.runId,
message: errorMessage,
});
if (this.dockerProcess) {this.dockerProcess.kill();}
this.stream.write(fmtErrorLog(errorMessage));
}
/**
Primary handler for all docker related task activities this handles the
launching/configuration of all tasks as well as the features for the given
tasks.
@return {Boolean} success true/false for complete run.
*/
async run() {
this.taskState = 'running';
this.taskStart = new Date();
let monitor = this.runtime.monitor;
this.hostname = getHostname(
this.runtime,
new Date(Date.now() + this.task.payload.maxRunTime * 1000),
);
// Cork all writes to the stream until we are done setting up logs.
this.stream.cork();
// Task log header.
this.writeLogHeader();
let linkInfo = {};
try {
const retryOptions = {
retries: 3,
minTimeout: 2000,
randomize: true,
};
// Build the list of container links... and base environment variables
if (this.states) {
linkInfo = await promiseRetry(retry => {
return this.states.link(this).catch(retry);
}, retryOptions);
}
// Hooks prior to running the task.
await promiseRetry(retry => {
return this.states.created(this).catch(retry);
}, retryOptions);
} catch (e) {
return await this.abortRun(
fmtErrorLog(
'Task was aborted because states could not be created ' +
`successfully. ${e.message}`,
),
);
}
// Everything should have attached to the stream by now...
this.stream.uncork();
if (this.isCanceled() || this.isAborted()) {
return await this.abortRun();
}
const schema = libUrls.schema(this.runtime.rootUrl, 'docker-worker', 'v1/payload.json#');
let payloadErrors = validatePayload(this.runtime.validator, this.task.payload, this.status, schema);
if (payloadErrors.length) {
// Inform the user that this task has failed due to some configuration
// error on their part.
this.taskException = 'malformed-payload';
monitor.count('task.validationFailure');
return await this.abortRun(
this.logSchemaErrors('`task.payload`', payloadErrors),
);
}
// Download the docker image needed for this task... This may fail in
// unexpected ways and should be handled gracefully to indicate to the user
// that their task has failed due to a image specific problem rather then
// some general bug in taskcluster or the worker code.
if (this.isCanceled() || this.isAborted()) {
return await this.abortRun();
}
let imageId;
try {
let im = this.runtime.imageManager;
imageId = await promiseRetry(retry => {
return im.ensureImage(
this.task.payload.image,
this.stream,
this,
this.task.scopes).catch(retry);
}, {
maxTimeout: 1000,
minTimeout: 10,
factor: 1.2,
randomize: true,
retries: 3,
});
this.imageHash = imageId;
this.runtime.gc.markImage(imageId);
} catch (e) {
monitor.count('task.image.pullFailed');
return await this.abortRun(
fmtErrorLog('Pulling docker image has failed.') +
fmtErrorLog(`Error: ${e.message}`),
);
}
if (this.isCanceled() || this.isAborted()) {
return await this.abortRun();
}
let dockerConfig;
try {
dockerConfig = await this.dockerConfig(imageId, linkInfo);
} catch (e) {
let error = fmtErrorLog('Docker configuration could not be ' +
'created. This may indicate an authentication error when validating ' +
'scopes necessary for running the task. \n %s', e.stack);
return await this.abortRun(error);
}
if (this.isCanceled() || this.isAborted()) {
return await this.abortRun();
}
let dockerProc = this.dockerProcess = new DockerProc(
this.runtime.docker, dockerConfig);
// Now that we know the stream is ready pipe data into it...
dockerProc.stdout.pipe(this.stream, {
end: false,
});
let runtimeTimeoutId = this.setRuntimeTimeout(this.task.payload.maxRunTime);
if (this.isCanceled() || this.isAborted()) {
return await this.abortRun();
}
// Call started hook when container is started
dockerProc.once('container start', async (container) => {
try {
await this.states.started(this);
} catch (e) {
return await this.abortRun(
fmtErrorLog(
'Task was aborted because states could not be started ' +
`successfully. ${e}`,
),
);
}
});
this.runtime.log('task run');
this.stream.write(fmtLog('=== Task Starting ==='));
try {
let taskStart = new Date();
this.exitCode = await dockerProc.run({
// Do not pull the image as part of the docker run we handle it +
// authentication above...
pull: false,
});
monitor.measure('task.runtime', Date.now() - taskStart);
} catch(error) {
// Catch any errors starting the docker container. This can be form an invalid
// command being specified in the task payload, or a docker related issue.
// XXX Look into determining if this was an issue starting the container because
// of the command specified or an internal error. Hard part, 500 error can
// either mean the command caused the container to not start or docker had
// an internal error such as not being able to a directory (aufs issues).
this.runtime.log('error starting container', {
taskId: this.status.taskId,
runId: this.runId,
error: error.stack || error,
});
this.stream.write(fmtErrorLog('Failure to properly start execution environment.'));
this.stream.write(fmtErrorLog(error.message));
this.exitCode = -1;
}
this.stream.write(fmtLog('=== Task Finished ==='));
let success = this.exitCode === 0;
clearTimeout(runtimeTimeoutId);
// XXX: Semi-hack to ensure all consumers of the docker proc stdout get the
// entire contents. Ideally we could just wait for the end cb but that does
// not seem to help in this case...
if (!dockerProc.stdout._readableState.endEmitted) {
// We wait _before_ extractResult so those states hooks can add items
// to the stream.
await waitForEvent(dockerProc.stdout, 'end');
}
if (this.isCanceled() || this.isAborted()) {
return await this.abortRun();
}
// Extract any results from the hooks.
try {
await this.states.stopped(this);
} catch (e) {
// If task finished successfully, mark it as unsuccessful.
// Otherwise artifact uploading most likely will be expected
// to fail if the task did not finish successfully.
if (success) {
success = false;
this.exitCode = -1;
}
this.stream.write(fmtErrorLog(e.message));
}
this.stream.write(this.logFooter(success, this.exitCode, this.taskStart, new Date()));
// Wait for the stream to end entirely before killing remaining containers.
await this.stream.end();
try {
await this.states.killed(this);
} catch(e) {
// If killing states was unsucessful, mark task as failed. If error is not
// caught the task remains in limbo until claim expires.
success = false;
// Unfortunately in the current implementation, logging is bundled with killing
// states. At this point the task log stream is ended, and possible the log already
// uploaded. This is a best effort of capturing an error.
this.runtime.log('error killing states', {
error: `Could not kill states properly. ${e.stack || e}`,
});
}
// If the results validation failed we consider this task failure.
if (this.isCanceled() || this.isAborted()) {
return await this.abortRun();
}
return success;
}
/**
Create a new queue using temp credentials.
@param {credentials} Temporary credentials.
@param {runtime} Runtime config.
@return New queue.
*/
createQueue(credentials) {
return new taskcluster.Queue({
rootUrl: this.runtime.rootUrl,
credentials: credentials,
});
}
}
module.exports = {