Configuration
configuration not found
Configuration
configuration not found
Job
use Sparky::JobApi;
use HTTP::Tiny;
use YAMLish;
use JSON::Fast;
class Pipeline does Sparky::JobApi::Role {
has Str $.task = tags()<task> || "";
has Str $.tasks_config = tags()<tasks_config> || "";
has Str $.owner = tags()<owner> || "";
has Str $.image = tags()<image> || "";
has Str $.project = tags()<project> || tags()<SPARKY_PROJECT> || "";
has Str $.scm = tags()<scm> || tags()<SCM_URL> || "";
has Str $.source_dir is default(tags()<source_dir> || "") is rw;
has Str $.storage_job_id is default(tags()<storage_job_id> || "") is rw;
has Str $.docker_bootstrap = tags()<docker_bootstrap> || "on";
has Str $.sparrowdo_bootstrap = tags()<sparrowdo_bootstrap> || "off";
has Str $.is_reporter = tags()<is_reporter> || "";
my $notify-job;
my @jobs;
method !get-storage-api (:$docker = False) {
my $sapi;
say ">>> get-storage-api. docker_mode=$docker";
if $.storage_job_id {
# return existing storage api job
$sapi = self.new-job:
job-id => $.storage_job_id,
project => <SparrowCIStorage>,
api => ($docker ?? 'http://host.docker.internal:host-gateway:4000' !! 'http://127.0.0.1:4000');
} else {
$sapi = self.new-job:
project => <SparrowCIStorage>,
api => ($docker ?? 'http://host.docker.internal:host-gateway:4000' !! 'http://127.0.0.1:4000');
# allocate new storage api job
$.storage_job_id = $sapi.info()<job-id>;
}
return $sapi;
}
method !tasks-config (:$docker = False) {
say ">>> load sparrow.yaml from storage, docker_mode=$docker";
my $file = self!get-storage-api(:$docker).get-file("sparrow.yaml",:text);
my $processed-file = $file.subst(/'{{' \s* 'CWD' \s* '}}'/,$.source_dir,:g);
load-yaml($processed-file);
}
method !build-report(:$stash) {
say "build web report ...";
my %headers = content-type => 'application/json';
my $j = Sparky::JobApi.new: :mine;
my $time = now - INIT now;
$stash<project> = $.project;
$stash<job-id> = $j.info()<job-id>;
$stash<with-sparrowci> = True;
$stash<date> = "{DateTime.now}";
$stash<worker-status> = "OK";
$stash<scm> = $.scm;
$stash<elapsed> = $time.Int;
my $res;
my $cnt = 0;
while True {
my $r = HTTP::Tiny.post: "http://127.0.0.1:2222/build",
headers => %headers,
content => to-json($stash);
if $r<status> == 200 {
$res = from-json($r<content>.decode);
last;
}
if $cnt == 3 or $r<status> != 599 {
die "{$r<status>} : { $r<content> ?? $r<content>.decode !! ''}"
}
$cnt++;
say ">>> (599 recieved) http retry: #0{$cnt}";
sleep(60);
}
say "build web report OK, report_id: {$res}";
return $res;
}
method !get-jobs-list ($j){
# traverse jobs DAG
# in order: left -> parent -> right
if $j.get-stash()<child-jobs><left> {
for $j.get-stash()<child-jobs><left><> -> $c {
my $job-id = $c<job-id>;
my $project = $c<project>;
my $cj = self.new-job: :$job-id, :$project;
self!get-jobs-list($cj)
}
}
say ">>> get-jobs-list: push job={$j.info().perl}";
@jobs.push: $j.info();
if $j.get-stash()<child-jobs><right> {
for $j.get-stash()<child-jobs><right><> -> $c {
my $job-id = $c<job-id>;
my $project = $c<project>;
my $cj = self.new-job: :$job-id, :$project;
self!get-jobs-list($cj)
}
}
}
method stage-main {
my $j = self.new-job: :project<SparrowCIQueue>;
my $timeout = 1400;
$j.queue: %(
description => "{$.scm} queue",
tags => %(
stage => "prepare",
project => $.project,
scm => $.scm,
docker_bootstrap => $.docker_bootstrap,
sparrowdo_bootstrap => $.sparrowdo_bootstrap,
tasks_config => $.tasks_config,
image => $.image,
owner => $.owner,
scm_branch => tags()<SCM_BRANCH> || 'HEAD',
),
);
self.wait-job($j,{ timeout => $timeout.Int });
}
method stage-prepare {
say "tags: {tags().perl}";
directory "source";
git-scm $.scm, %(
to => "source",
branch => tags()<scm_branch>,
);
task-run "archive source directory", "pack-unpack", %(
target => "source",
file => "source.tar.gz"
);
self!get-storage-api().put-file("source.tar.gz","source.tar.gz");
my $git-data = task-run "git data", "git-commit-data", %(
dir => "{$*CWD}/source",
);
if $.tasks_config {
say ">>> copy {$.tasks_config} to remote storage";
die "{$.tasks_config} file not found" unless $.tasks_config.IO ~~ :e;
self!get-storage-api().put-file($.tasks_config,"sparrow.yaml");
} else {
say ">>> copy source/sparrow.yaml to remote storage";
unless "source/sparrow.yaml".IO ~~ :e {
my $stash = %(
status => "FAIL",
state => -2,
log => "sparrow.yaml not found",
git-data => $git-data,
);
self!build-report: :$stash;
die "sparrow.yaml file not found";
}
self!get-storage-api().put-file("source/sparrow.yaml","sparrow.yaml");
}
my $tasks-config;
try {
# check is tasks-confg is a valid YAML
$tasks-config = self!tasks-config;
CATCH {
when X::AdHoc {
my $err-message = .message;
my $stash = %(
status => "FAIL",
state => -2,
log => $err-message,
git-data => $git-data,
sparrow-yaml => self!get-storage-api.get-file("sparrow.yaml",:text),
);
self!build-report: :$stash;
die $err-message;
}
}
}
unless $tasks-config<tasks>.isa('Array') {
my $stash = %(
status => "FAIL",
state => -2,
log => "tasks should be an array",
git-data => $git-data,
sparrow-yaml => self!get-storage-api.get-file("sparrow.yaml",:text),
);
self!build-report: :$stash;
die "tasks should be an array";
}
my $data = $tasks-config<tasks>.grep({.<default>});
unless $data {
my $stash = %(
status => "FAIL",
state => -2,
log => "default task is not found",
git-data => $git-data,
sparrow-yaml => self!get-storage-api.get-file("sparrow.yaml",:text),
);
self!build-report: :$stash;
die "default task is not found";
}
if $data.elems > 1 {
my $stash = %(
status => "FAIL",
state => -2,
log => "default task - too many found",
git-data => $git-data,
sparrow-yaml => self!get-storage-api.get-file("sparrow.yaml",:text),
);
self!build-report: :$stash;
die "default task - too many found";
}
my @images = $.image ?? [ $.image ] !!
( $tasks-config<image> ?? $tasks-config<image><> !! ['melezhik/sparrow:alpine_arm'] );
my $jobs-status = "OK";
my $warn-cnt = 0; # number of warnings found in jobs
for @images -> $image {
my $task = $data[0];
my $project = $task<name>;
my $j = self.new-job: :$project;
if $.docker_bootstrap eq "on" {
say ">>> prepare docker container";
task-run "docker stop", "docker-cli", %(
action => "stop",
name => "sparrow-worker"
);
my $docker-run-params = %();
$docker-run-params<action> = "run";
$docker-run-params<name> = "sparrow-worker";
$docker-run-params<image> = $image;
if $.owner && $tasks-config<secrets> {
$docker-run-params<secrets> = $tasks-config<secrets>.join(" ");
$docker-run-params<vault_path> = "/kv/sparrow/users/{$.owner}/secrets";
}
# common pipeline variables:
my $docker-opts = "-e SCM_URL={$.scm} -e SP6_DUMP_TASK_CODE=1";
$docker-opts ~= " -e SCM_SHA={$git-data<sha>}";
$docker-opts ~= " -e SCM_BRANCH={tags()<scm_branch>}";
my $git-comment = $git-data<comment>.split("\n").first.subst("'","",:g);
$docker-opts ~= " -e SCM_COMMIT_MESSAGE='{$git-comment}'";
# following variables are only available for reporter pipelines:
$docker-opts ~= " -e BUILD_STATUS={tags()<build_status>}" if tags()<build_status>;
$docker-opts ~= " -e BUILD_URL={tags()<build_url>}" if tags()<build_url>;
$docker-opts ~= " -e BUILD_WARN_CNT={tags()<warn_cnt>||0}";
if $.is_reporter {
$docker-opts ~= " -v {%*ENV<HOME>}/.sparrowci/irc/bot/messages/:/tmp/irc/bot/messages/";
}
if $.owner eq "melezhik" {
$docker-opts ~= " -v /var/run/docker.sock:/var/run/docker.sock";
$docker-opts ~= ' --group-add $(stat -c %g /var/run/docker.sock)';
}
$docker-run-params<options> = $docker-opts;
task-run "docker run", "docker-cli", $docker-run-params;
}
say ">>> trigger task: {$task.perl}";
my $description = "run [{$task<name>}]";
my $timeout = 1100;
$j.queue: %(
description => $description,
tags => %(
stage => "run",
task => $task<name>,
storage_job_id => $.storage_job_id,
),
sparrowdo => %(
docker => "sparrow-worker",
no_sudo => True,
repo => "https://sparrowhub.io/repo",
bootstrap => ($.sparrowdo_bootstrap eq "on") ?? True !! False
)
);
my $st = self.wait-job($j,{ timeout => $timeout.Int });
say ">>> STOP WAITING ALL JOBS: {$st.perl}";
$jobs-status = "FAIL" unless $st<OK>;
# traverse jobs DAG in order
# and save result in @jobs
@jobs = [];
self!get-jobs-list($j);
my $st-to-human = %(
"-2" => "NA",
"-1" => "FAILED",
"0" => "RUNNING",
"1" => "OK",
);
my @logs;
for @jobs -> $b {
my $r = HTTP::Tiny.get: "http://127.0.0.1:4000/report/raw/{$b<project>}/{$b<job-id>}";
my $log = $r<content> ?? $r<content>.decode !! '';
$r = HTTP::Tiny.get: $b<status-url>;
my $status = $r<content> ?? $r<content>.decode !! '-2';
say "\n[$b<project>] - [{$st-to-human{$status}}]";
say "================================================================";
for $log.lines.grep({ $_ !~~ /^^ '>>>'/ }) -> $l {
say $l;
@logs.push: $l;
$warn-cnt++ if $l ~~ /":: warn:"/;
}
}
my $stash = %(
status => ( $st<OK> ?? "OK" !! ( $st<TIMEOUT> ?? "TIMEOUT" !! ($st<FAIL> ?? "FAIL" !! "NA") ) ),
state => ( $st<OK> ?? "1" !! ( $st<TIMEOUT> ?? "-1" !! ($st<FAIL> ?? "-2" !! "-10") ) ),
log => @logs.join("\n"),
git-data => $git-data,
image => $image,
sparrow-yaml => self!get-storage-api.get-file("sparrow.yaml",:text),
);
task-run "docker stop", "docker-cli", %(
action => "stop",
name => "sparrow-worker",
);
# we don't create reports for
# reporters jobs
unless $.is_reporter {
my $report = self!build-report: :$stash;
if "{%*ENV<HOME>}/.sparrowci/reporters/".IO ~~ :d and $.is_reporter ne "yes" {
# runs reporters jobs
for dir("{%*ENV<HOME>}/.sparrowci/reporters/", test => /'.yaml'$$/) -> $r {
my $j = self.new-job: :project<SparrowCIQueue>;
$j.queue: %(
description => "{$.scm} queue (reporter - {$r.basename})",
tags => %(
stage => "prepare",
is_reporter => "yes",
project => $.project,
scm => $.scm,
docker_bootstrap => $.docker_bootstrap,
sparrowdo_bootstrap => $.sparrowdo_bootstrap,
tasks_config => $r.path,
image => $.image,
owner => $.owner,
build_status => $jobs-status,
build_url => "{%*ENV<SPARROWCI_HOST> || 'https://ci.sparrowhub.io'}/report/{$report<build-id>}",
warn_cnt => $warn-cnt,
),
);
}
}
}
}
if $tasks-config<followup_job> && $jobs-status eq "OK" {
# runs followup jobs
my $j = self.new-job: :project<SparrowCIQueue>;
$j.queue: %(
description => "{$.scm} queue (followup - {$tasks-config<followup_job>})",
tags => %(
stage => "prepare",
project => $.project,
scm => $.scm,
docker_bootstrap => $.docker_bootstrap,
sparrowdo_bootstrap => $.sparrowdo_bootstrap,
tasks_config => "source/{$tasks-config<followup_job>}",
image => $.image,
owner => $.owner
),
);
}
}
method stage-run {
my $j = Sparky::JobApi.new: :mine;
my $stash = $j.get-stash();
my $data = self!tasks-config(:docker<True>)<tasks>.grep({.<name> eq $.task});
die "task {$.task} is not found" unless $data;
die "task {$.task} - too many found" if $data.elems > 1;
my $task = $data[0];
my $timeout = 1200;
say ">>> handle task: ", $task.perl;
unless $.source_dir {
say "source directory does not yet exist, download source archive from storage";
my $blob = self!get-storage-api(:docker).get-file("source.tar.gz",:bin);
"source.tar.gz".IO.spurt($blob,:bin);
task-run "unpack source archive", "pack-unpack", %(
action => "unpack",
# dir => "source",
file => "source.tar.gz"
);
$.source_dir = "{$*CWD}";
}
# child jobs - holds references to
# all depends/followup tasks/jobs
# and get's linked to the current job
my %child-jobs = %();
# accumulated state - represents
# all output data,
# collected from hub tasks
my @acc-state = (); # hub tasks accumulated state
my $i = 0; # hub tasks counter
# task out data will hold
# depends tasks output data
my $tasks-out-data = %();
my @tasks; # hub tasks
# execute depends tasks _before_
# any tasks
if $task<depends> {
say ">>> enter depends block: ", $task<depends>.perl;
my @jobs = self!run-task-dependency: :tasks($task<depends>);
say ">>> waiting for dependency tasks have finsihed ...";
my $st = self.wait-jobs(@jobs,{ timeout => $timeout.Int });
for @jobs -> $dj {
%child-jobs<left>.push: $dj.info;
my $d = $dj.get-stash();
if $d<task>:exists {
$tasks-out-data{$d<task>}<state> = $d<state>;
}
}
# save job data
$j.put-stash(%( child-jobs => %child-jobs ));
# handle depends jobs errors
say ">>> depends jobs status: ", $st.perl;
unless $st<OK> == @jobs.elems {
say "some depends jobs failed or timeouted: {$st.perl}";
exit(1);
}
}
if $task<if> { # compute conditional task
say ">>> compute conditional task ...";
my $task-if = $task<if>; $task-if<name> = "{$task<name>}-if";
my $params = $stash<config> || {};
$params<tasks> = $tasks-out-data if $tasks-out-data;
my $state = self!task-run: :task($task-if), :$params;
if $state<status> and $state<status> eq "skip" {
say ">>> conditional task returns SKIP, don't execute main task";
return;
}
}
if $task<hub> {
say ">>> run hub generator code";
my $params = $stash<config> || {};
$params<tasks> = $tasks-out-data if $tasks-out-data;
my $ht = $task<hub>;
$ht<name> = "{$task<name>}-hub";
my $state = self!task-run: :task($ht), :$params;
@tasks = $state<list> ?? $state<list><> !! [];
} else {
# in case there is no hub
# hub is effectively just one task
push @tasks, $task;
}
for @tasks -> $t {
$i++;
# if conditional task exists within hub iterator
# compute conditional task for every task in hub tasks list
if $t<if> && $task<hub> {
say ">>> compute conditional task ...";
my $task-if = $t<if>; $task-if<name> = "{$task<name>}-hub-if-{$i}";
my $params = $t<config> || {};
$params<tasks> = $tasks-out-data if $tasks-out-data;
my $state = self!task-run: :task($task-if), :$params;
if $state<status> and $state<status> eq "skip" {
say ">>> conditional task returns SKIP, don't execute hub task";
next;
}
}
my $params = $task<hub> ?? ($t<config> || {}) !! ($stash<config> || {});
# pass depends tasks output data
# to parent task as config()<tasks>
$params<tasks> = $tasks-out-data if $tasks-out-data;
my $in-artifacts = $task<artifacts><in>;
my $out-artifacts = $task<artifacts><out>;
my $state = self!task-run: :$task, :$params, :$in-artifacts, :$out-artifacts;
# link job and task data
@acc-state.push: $state;
$j.put-stash(%(
state => $task<hub> ?? @acc-state !! $state,
task => $task<name>,
child-jobs => %child-jobs
));
} # next task in @tasks
# execute followup tasks
# _after_ hub tasks are finished
if $task<followup> {
say ">>> enter followup block: ", $task<followup>.perl;
my $tasks = $task<followup>;
my $parent-data = $task<hub> ?? @acc-state !! (@acc-state.elems ?? @acc-state[0] !! %());
my @jobs = self!run-task-dependency: :$tasks, :tasks-data($tasks-out-data), :$parent-data;
say ">>> waiting for followup tasks have finsihed ...";
my $st = self.wait-jobs(@jobs,{ timeout => $timeout });
for @jobs -> $fj {
%child-jobs<right>.push: $fj.info();
}
$j.put-stash(%(
state => $task<hub> ?? @acc-state !! (@acc-state.elems ?? @acc-state[0] !! %()),
task => $task<name>,
child-jobs => %child-jobs
));
say ">>> followup jobs status: ", $st.perl;
# handle followup jobs errors
unless $st<OK> == @jobs.elems {
say "some followup jobs failed or timeouted: {$st.perl}";
exit(1);
}
}
}
method !run-task-dependency (:$tasks,:$tasks-data = {},:$parent-data) {
my @jobs;
for $tasks<>.sort({ .<queue> ?? (.<queue>,.<priority>) !! True }).reverse -> $t {
say ">>> run-task-dependency: handle task: {$t.perl}";
my $project = $t<queue> || $t<name>;
my $job = self.new-job: :$project;
my $data = self!tasks-config(:docker<True>)<tasks>.grep({.<name> eq $t<name>});
die "task {$t<name>} is not found" unless $data;
my $stash-data = %(
say ">>> set default depend/followup task/plugin parameters ...";
config => $data[0]<config> || {},
);
if $t<config> {
say ">>> override default depend/followup task/plugin parameters ...";
$stash-data<config> = $t<config>
}
$stash-data<config><parent><state> = $parent-data if $parent-data;
$stash-data<config><tasks> = $tasks-data if $tasks-data;
$job.put-stash: $stash-data;
my $description = "run [d] [{$t<name>}]";
say ">>> trigger task [$project] | {$t.perl} | stash: {$stash-data.perl}";
$job.queue: %(
description => $description,
tags => %(
stage => "run",
task => $t<name>,
source_dir => $.source_dir,
storage_job_id => $.storage_job_id,
),
);
sleep(3);
@jobs.push: $job;
}
return @jobs;
}
method !task-run (:$task, :$params = {},:$in-artifacts = [],:$out-artifacts = []) {
my $state;
say ">>> chdir to source_dir: {$.source_dir}";
my $cur-dir = $*CWD;
chdir $.source_dir;
if $in-artifacts {
my $job = self!get-storage-api: :docker;
mkdir ".artifacts";
for $in-artifacts<> -> $f {
say ">>> copy artifact [$f] from storage to .artifacts/";
".artifacts/{$f}".IO.spurt($job.get-file($f),:bin);
}
}
if $task<plugin> {
say ">>> run task [{$task<name>}] | plugin: {$task<plugin>} | params: {$params.perl}";
$state = task-run $task<name>, $task<plugin>, $params;
} else {
my $task-dir = self!build-task: :$task;
say ">>> run task [{$task<name>}] | params: {$params.perl} | dir: {$*CWD}/{$task-dir}";
$state = task-run $task-dir, $params;
}
if $out-artifacts {
my $job = self!get-storage-api: :docker;
for $out-artifacts<> -> $f {
say ">>> copy artifact [{$f<name>}] to storage";
$job.put-file("{$f<path>}",$f<name>);
}
}
# restore context
chdir $cur-dir;
return $state;
}
method !build-task (:$task,:$base-dir?) {
say ">>> build task [{$task<name>}]";
my $lang = $task<language> || die "task language is not set";
my $task-dir = $base-dir || "tasks/{{$task<name>}}";
mkdir $task-dir;
# build subtasks recursively
if $task<subtasks> {
for $task<subtasks><> -> $st {
self!build-task: task => $st, base-dir => "$task-dir/tasks/{$st<name>}";
}
}
my %lang-to-ext = %(
raku => "raku",
bash => "bash",
perl => "pl",
powershell => "ps1",
python => "py",
ruby => "rb",
go => "go"
);
die "unkonwn language $lang" unless %lang-to-ext{lc($lang)}:exists;
my $ext = %lang-to-ext{lc($lang)};
"{$task-dir}/task.{$ext}".IO.spurt(
($ext eq "py") ?? "from sparrow6lib import *\n\n{$task<code>}" !! $task<code>
) if $task<code>;
"{$task-dir}/task.check".IO.spurt($task<check>) if $task<check>;
"{$task-dir}/config.raku".IO.spurt($task<config>.perl) if $task<config>;
if $task<init> {
"{$task-dir}/hook.{$ext}".IO.spurt(
($ext eq "py") ?? "from sparrow6lib import *\n\n{$task<init>}" !! $task<init>
);
}
return $task-dir;
}
}
Pipeline.new.run;