Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
J
jio
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Tomáš Peterka
jio
Commits
298da338
Commit
298da338
authored
Oct 11, 2013
by
Tristan Cavelier
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
job features event management changed to be more maintainable
Job queues are now saved in workspace at different moment
parent
ad9c7aad
Changes
10
Show whitespace changes
Inline
Side-by-side
Showing
10 changed files
with
254 additions
and
175 deletions
+254
-175
src/jio/core/JIO.js
src/jio/core/JIO.js
+1
-1
src/jio/features/jobChecker.js
src/jio/features/jobChecker.js
+17
-9
src/jio/features/jobExecuter.js
src/jio/features/jobExecuter.js
+49
-26
src/jio/features/jobMaker.js
src/jio/features/jobMaker.js
+41
-34
src/jio/features/jobQueue.js
src/jio/features/jobQueue.js
+59
-45
src/jio/features/jobRecovery.js
src/jio/features/jobRecovery.js
+8
-5
src/jio/features/jobReference.js
src/jio/features/jobReference.js
+3
-3
src/jio/features/jobRetry.js
src/jio/features/jobRetry.js
+21
-15
src/jio/features/jobTimeout.js
src/jio/features/jobTimeout.js
+33
-26
test/jio/tests.js
test/jio/tests.js
+22
-11
No files found.
src/jio/core/JIO.js
View file @
298da338
...
...
@@ -21,10 +21,10 @@ function JIO(storage_spec, options) {
enableJobMaker
(
this
,
shared
,
options
);
enableJobReference
(
this
,
shared
,
options
);
enableJobRetry
(
this
,
shared
,
options
);
enableJobTimeout
(
this
,
shared
,
options
);
enableJobChecker
(
this
,
shared
,
options
);
enableJobQueue
(
this
,
shared
,
options
);
enableJobRecovery
(
this
,
shared
,
options
);
enableJobTimeout
(
this
,
shared
,
options
);
enableJobExecuter
(
this
,
shared
,
options
);
shared
.
emit
(
'
load
'
);
...
...
src/jio/features/jobChecker.js
View file @
298da338
...
...
@@ -13,7 +13,9 @@ function enableJobChecker(jio, shared, options) {
// creates
// - shared.job_rules Array
// uses 'job' event
// uses 'job:new' event
// emits 'job:modified', 'job:start', 'job:resolved',
// 'job:end', 'job:reject' events
var
i
;
...
...
@@ -22,33 +24,39 @@ function enableJobChecker(jio, shared, options) {
shared
.
job_rule_actions
=
{
wait
:
function
(
original_job
,
new_job
)
{
original_job
.
promise
.
always
(
function
()
{
shared
.
emit
(
'
job
'
,
new_job
);
new_job
.
state
=
'
ready
'
;
new_job
.
modified
=
new
Date
();
shared
.
emit
(
'
job:modified
'
,
new_job
);
shared
.
emit
(
'
job:start
'
,
new_job
);
});
new_job
.
state
=
'
waiting
'
;
new_job
.
modified
=
new
Date
();
shared
.
emit
(
'
job:modified
'
,
new_job
);
},
update
:
function
(
original_job
,
new_job
)
{
if
(
!
new_job
.
solver
)
{
// promise associated to the job
new_job
.
state
=
'
done
'
;
shared
.
emit
(
'
jobDone
'
,
new_job
);
shared
.
emit
(
'
job:resolved
'
,
new_job
,
[]);
// XXX why resolve?
shared
.
emit
(
'
job:end
'
,
new_job
);
}
else
{
if
(
!
original_job
.
solver
)
{
original_job
.
solver
=
new_job
.
solver
;
}
else
{
original_job
.
promise
.
then
(
new_job
.
command
.
resolve
,
new_job
.
command
.
reject
new_job
.
command
.
reject
,
new_job
.
command
.
notify
);
}
}
new_job
.
state
=
'
running
'
;
new_job
.
modified
=
new
Date
();
shared
.
emit
(
'
job:modified
'
,
new_job
);
},
deny
:
function
(
original_job
,
new_job
)
{
new_job
.
state
=
'
fail
'
;
new_job
.
modified
=
new
Date
();
restCommandRejecter
(
new_job
,
[
new_job
.
state
=
"
running
"
;
shared
.
emit
(
'
job:reject
'
,
new_job
,
[
'
precondition_failed
'
,
'
command denied
'
,
'
Command rejected by the job checker.
'
...
...
@@ -170,7 +178,7 @@ function enableJobChecker(jio, shared, options) {
}
}
else
{
// browsing jobs
for
(
j
=
0
;
j
<
shared
.
jobs
.
length
;
j
+
=
1
)
{
for
(
j
=
shared
.
jobs
.
length
-
1
;
j
>=
0
;
j
-
=
1
)
{
if
(
shared
.
jobs
[
j
]
!==
job
)
{
if
(
jobsRespectConditions
(
...
...
@@ -235,7 +243,7 @@ function enableJobChecker(jio, shared, options) {
}
}
shared
.
on
(
'
job
'
,
checkJob
);
shared
.
on
(
'
job
:new
'
,
checkJob
);
}
...
...
src/jio/features/jobExecuter.js
View file @
298da338
...
...
@@ -4,19 +4,28 @@
function
enableJobExecuter
(
jio
,
shared
)
{
// , options) {
// uses 'job
', 'jobDone', 'jobFail' and 'jobNotify
' events
//
emits 'jobRun' and 'jobEnd' events
// uses 'job
:new
' events
//
uses actions 'job:resolve', 'job:reject' and 'job:notify'
// listeners
// emits 'job:modified', 'job:started', 'job:resolved',
// 'job:rejected', 'job:notified' and 'job:end' events
// emits action 'job:start'
shared
.
on
(
'
job
'
,
function
(
param
)
{
function
startJobIfReady
(
job
)
{
if
(
job
.
state
===
'
ready
'
)
{
shared
.
emit
(
'
job:start
'
,
job
);
}
}
function
executeJobIfReady
(
param
)
{
var
storage
;
if
(
param
.
state
===
'
ready
'
)
{
param
.
tried
+=
1
;
param
.
started
=
new
Date
();
param
.
state
=
'
running
'
;
param
.
modified
=
new
Date
();
shared
.
emit
(
'
jobRun
'
,
param
);
shared
.
emit
(
'
job:modified
'
,
param
);
shared
.
emit
(
'
job:started
'
,
param
);
try
{
storage
=
createStorage
(
deepClone
(
param
.
storage_spec
));
}
catch
(
e
)
{
...
...
@@ -44,33 +53,47 @@ function enableJobExecuter(jio, shared) { // , options) {
);
});
}
}
);
}
shared
.
on
(
'
jobDone
'
,
function
(
param
,
args
)
{
if
(
param
.
state
===
'
running
'
)
{
param
.
state
=
'
done
'
;
param
.
modified
=
new
Date
();
shared
.
emit
(
'
jobEnd
'
,
param
);
if
(
param
.
solver
)
{
restCommandResolver
(
param
,
args
);
function
endAndResolveIfRunning
(
job
,
args
)
{
if
(
job
.
state
===
'
running
'
)
{
job
.
state
=
'
done
'
;
job
.
modified
=
new
Date
();
shared
.
emit
(
'
job:modified
'
,
job
);
if
(
job
.
solver
)
{
restCommandResolver
(
job
,
args
);
}
shared
.
emit
(
'
job:resolved
'
,
job
,
args
);
shared
.
emit
(
'
job:end
'
,
job
);
}
}
});
shared
.
on
(
'
jobFail
'
,
function
(
param
,
args
)
{
if
(
param
.
state
===
'
running
'
)
{
param
.
state
=
'
fail
'
;
param
.
modified
=
new
Date
();
shared
.
emit
(
'
jobEnd
'
,
param
);
if
(
param
.
solver
)
{
restCommandRejecter
(
param
,
args
);
function
endAndRejectIfRunning
(
job
,
args
)
{
if
(
job
.
state
===
'
running
'
)
{
job
.
state
=
'
fail
'
;
job
.
modified
=
new
Date
();
shared
.
emit
(
'
job:modified
'
,
job
);
if
(
job
.
solver
)
{
restCommandRejecter
(
job
,
args
);
}
shared
.
emit
(
'
job:rejected
'
,
job
,
args
);
shared
.
emit
(
'
job:end
'
,
job
);
}
}
});
shared
.
on
(
'
jobNotify
'
,
function
(
param
,
args
)
{
if
(
param
.
state
===
'
running
'
&&
param
.
solver
)
{
param
.
solver
.
notify
(
args
[
0
]);
function
notifyJobIfRunning
(
job
,
args
)
{
if
(
job
.
state
===
'
running
'
&&
job
.
solver
)
{
job
.
solver
.
notify
(
args
[
0
]);
shared
.
emit
(
'
job:notified
'
,
job
,
args
);
}
});
}
// listeners
shared
.
on
(
'
job:new
'
,
startJobIfReady
);
shared
.
on
(
'
job:start
'
,
executeJobIfReady
);
shared
.
on
(
'
job:resolve
'
,
endAndResolveIfRunning
);
shared
.
on
(
'
job:reject
'
,
endAndRejectIfRunning
);
shared
.
on
(
'
job:notify
'
,
notifyJobIfRunning
);
}
src/jio/features/jobMaker.js
View file @
298da338
...
...
@@ -20,10 +20,16 @@ function enableJobMaker(jio, shared, options) {
// - param.options object
// - param.command object
// uses method events
// add emits 'job' events
// list of job events:
// - Job existence -> new, end
// - Job execution -> started, stopped
// - Job resolution -> resolved, rejected, notified, cancelled
// - Job modification -> modified
// the job can emit 'jobDone', 'jobFail' and 'jobNotify'
// emits actions 'job:resolve', 'job:reject' and 'job:notify'
// uses `rest method` events
// emits 'job:new' event
shared
.
job_keys
=
arrayExtend
(
shared
.
job_keys
||
[],
[
"
created
"
,
...
...
@@ -36,48 +42,49 @@ function enableJobMaker(jio, shared, options) {
"
options
"
]);
function
addCommandToJob
(
param
)
{
param
.
command
=
{};
param
.
command
.
resolve
=
function
()
{
shared
.
emit
(
'
job
Done
'
,
param
,
arguments
);
function
addCommandToJob
(
job
)
{
job
.
command
=
{};
job
.
command
.
resolve
=
function
()
{
shared
.
emit
(
'
job
:resolve
'
,
job
,
arguments
);
};
param
.
command
.
success
=
param
.
command
.
resolve
;
param
.
command
.
reject
=
function
()
{
shared
.
emit
(
'
job
Fail
'
,
param
,
arguments
);
job
.
command
.
success
=
job
.
command
.
resolve
;
job
.
command
.
reject
=
function
()
{
shared
.
emit
(
'
job
:reject
'
,
job
,
arguments
);
};
param
.
command
.
error
=
param
.
command
.
reject
;
param
.
command
.
notify
=
function
()
{
shared
.
emit
(
'
job
Notify
'
,
param
,
arguments
);
job
.
command
.
error
=
job
.
command
.
reject
;
job
.
command
.
notify
=
function
()
{
shared
.
emit
(
'
job
:notify
'
,
job
,
arguments
);
};
param
.
command
.
storage
=
function
()
{
job
.
command
.
storage
=
function
()
{
return
shared
.
createRestApi
.
apply
(
null
,
arguments
);
};
}
// listeners
shared
.
rest_method_names
.
forEach
(
function
(
method
)
{
shared
.
on
(
method
,
function
(
param
)
{
function
createJobFromRest
(
param
)
{
if
(
param
.
solver
)
{
// params are good
shared
.
emit
(
'
job
'
,
param
);
// rest parameters are good
shared
.
emit
(
'
job:new
'
,
param
);
}
}
});
});
shared
.
on
(
'
job
'
,
function
(
param
)
{
// new or recovered job
param
.
state
=
'
ready
'
;
if
(
typeof
param
.
tried
!==
'
number
'
||
!
isFinite
(
param
.
tried
))
{
param
.
tried
=
0
;
function
initJob
(
job
)
{
job
.
state
=
'
ready
'
;
if
(
typeof
job
.
tried
!==
'
number
'
||
!
isFinite
(
job
.
tried
))
{
job
.
tried
=
0
;
}
if
(
!
param
.
created
)
{
param
.
created
=
new
Date
();
if
(
!
job
.
created
)
{
job
.
created
=
new
Date
();
}
if
(
!
param
.
command
)
{
addCommandToJob
(
param
);
addCommandToJob
(
job
);
job
.
modified
=
new
Date
(
);
}
param
.
modified
=
new
Date
();
// listeners
shared
.
rest_method_names
.
forEach
(
function
(
method
)
{
shared
.
on
(
method
,
createJobFromRest
);
});
shared
.
on
(
'
job:new
'
,
initJob
);
}
src/jio/features/jobQueue.js
View file @
298da338
/*jslint indent: 2, maxlen: 80, sloppy: true, nomen: true, unparam: true */
/*global arrayExtend, localStorage, Workspace, uniqueJSONStringify, JobQueue,
constants, indexOf */
constants, indexOf
, setTimeout, clearTimeout
*/
function
enableJobQueue
(
jio
,
shared
,
options
)
{
...
...
@@ -16,74 +16,88 @@ function enableJobQueue(jio, shared, options) {
// - shared.workspace Workspace
// - shared.job_queue JobQueue
// uses 'job
', 'jobRun', 'jobStop', 'jobEnd' events
//
emits 'jobE
nd' events
// uses 'job
:new', 'job:started', 'job:stopped', 'job:modified',
//
'job:notified', 'job:e
nd' events
if
(
options
.
job_management
!==
false
)
{
shared
.
job_keys
=
arrayExtend
(
shared
.
job_keys
||
[],
[
"
id
"
]);
if
(
typeof
options
.
workspace
!==
'
object
'
)
{
shared
.
workspace
=
localStorage
;
}
else
{
shared
.
workspace
=
new
Workspace
(
options
.
workspace
);
}
if
(
!
shared
.
storage_spec_str
)
{
shared
.
storage_spec_str
=
uniqueJSONStringify
(
shared
.
storage_spec
);
}
shared
.
job_queue
=
new
JobQueue
(
shared
.
workspace
,
'
jio/jobs/
'
+
shared
.
storage_spec_str
,
shared
.
job_keys
);
// emits 'job:end' event
shared
.
on
(
'
job
'
,
function
(
param
)
{
if
(
indexOf
(
param
.
state
,
[
'
fail
'
,
'
done
'
])
===
-
1
)
{
if
(
!
param
.
stored
)
{
function
postJobIfReady
(
param
)
{
if
(
!
param
.
stored
&&
param
.
state
===
'
ready
'
)
{
clearTimeout
(
param
.
queue_ident
);
delete
param
.
queue_ident
;
shared
.
job_queue
.
load
();
shared
.
job_queue
.
post
(
param
);
shared
.
job_queue
.
save
();
param
.
stored
=
true
;
}
}
});
[
'
jobRun
'
,
'
jobStop
'
].
forEach
(
function
(
event
)
{
shared
.
on
(
event
,
function
(
param
)
{
function
deferredPutJob
(
param
)
{
if
(
param
.
queue_ident
===
undefined
)
{
param
.
queue_ident
=
setTimeout
(
function
()
{
delete
param
.
queue_ident
;
if
(
param
.
stored
)
{
shared
.
job_queue
.
load
();
if
(
param
.
state
===
'
done
'
||
param
.
state
===
'
fail
'
)
{
if
(
shared
.
job_queue
.
remove
(
param
.
id
))
{
shared
.
job_queue
.
save
();
delete
param
.
storad
;
}
}
else
{
shared
.
job_queue
.
put
(
param
);
shared
.
job_queue
.
save
();
}
}
});
});
}
}
shared
.
on
(
'
jobEnd
'
,
function
(
param
)
{
function
removeJob
(
param
)
{
clearTimeout
(
param
.
queue_ident
);
delete
param
.
queue_ident
;
if
(
param
.
stored
)
{
shared
.
job_queue
.
load
();
if
(
shared
.
job_queue
.
remove
(
param
.
id
))
{
shared
.
job_queue
.
remove
(
param
.
id
);
shared
.
job_queue
.
save
();
delete
param
.
stored
;
delete
param
.
id
;
}
}
});
}
shared
.
on
(
'
job
'
,
function
(
param
)
{
function
initJob
(
param
)
{
if
(
!
param
.
command
.
end
)
{
param
.
command
.
end
=
function
()
{
shared
.
emit
(
'
job
E
nd
'
,
param
);
shared
.
emit
(
'
job
:e
nd
'
,
param
);
};
}
});
}
shared
.
on
(
'
job:new
'
,
initJob
);
if
(
options
.
job_management
!==
false
)
{
shared
.
job_keys
=
arrayExtend
(
shared
.
job_keys
||
[],
[
"
id
"
]);
if
(
typeof
options
.
workspace
!==
'
object
'
)
{
shared
.
workspace
=
localStorage
;
}
else
{
shared
.
workspace
=
new
Workspace
(
options
.
workspace
);
}
if
(
!
shared
.
storage_spec_str
)
{
shared
.
storage_spec_str
=
uniqueJSONStringify
(
shared
.
storage_spec
);
}
shared
.
job_queue
=
new
JobQueue
(
shared
.
workspace
,
'
jio/jobs/
'
+
shared
.
storage_spec_str
,
shared
.
job_keys
);
// Listeners
shared
.
on
(
'
job:new
'
,
postJobIfReady
);
shared
.
on
(
'
job:started
'
,
deferredPutJob
);
shared
.
on
(
'
job:stopped
'
,
deferredPutJob
);
shared
.
on
(
'
job:modified
'
,
deferredPutJob
);
shared
.
on
(
'
job:notified
'
,
deferredPutJob
);
shared
.
on
(
'
job:end
'
,
removeJob
);
}
}
src/jio/features/jobRecovery.js
View file @
298da338
...
...
@@ -9,20 +9,23 @@ function enableJobRecovery(jio, shared, options) {
// uses
// - shared.job_queue JobQueue
// emits 'job:new' event
function
numberOrDefault
(
number
,
default_value
)
{
return
(
typeof
number
===
'
number
'
&&
isFinite
(
number
)
?
number
:
default_value
);
}
function
recoverJob
(
param
)
{
shared
.
job_queue
.
load
();
shared
.
job_queue
.
remove
(
param
.
id
);
delete
param
.
id
;
if
(
methodType
(
param
.
method
)
===
'
writer
'
||
param
.
state
===
'
ready
'
||
if
(
methodType
(
param
.
method
)
===
'
writer
'
&&
(
param
.
state
===
'
ready
'
||
param
.
state
===
'
running
'
||
param
.
state
===
'
waiting
'
)
{
param
.
state
===
'
waiting
'
)
)
{
shared
.
job_queue
.
save
();
shared
.
emit
(
'
job
'
,
param
);
shared
.
emit
(
'
job
:new
'
,
param
);
}
}
...
...
src/jio/features/jobReference.js
View file @
298da338
...
...
@@ -6,17 +6,17 @@ function enableJobReference(jio, shared, options) {
// creates
// - shared.jobs Object Array
// uses 'job
', 'jobE
nd' events
// uses 'job
:new' and 'job:e
nd' events
shared
.
jobs
=
[];
var
job_references
=
new
ReferenceArray
(
shared
.
jobs
);
shared
.
on
(
'
job
'
,
function
(
param
)
{
shared
.
on
(
'
job
:new
'
,
function
(
param
)
{
job_references
.
put
(
param
);
});
shared
.
on
(
'
job
E
nd
'
,
function
(
param
)
{
shared
.
on
(
'
job
:e
nd
'
,
function
(
param
)
{
job_references
.
remove
(
param
);
});
}
src/jio/features/jobRetry.js
View file @
298da338
...
...
@@ -27,9 +27,9 @@ function enableJobRetry(jio, shared, options) {
// - param.options object
// - param.command object
// uses 'job
' and 'jobR
etry' events
// emits
'job', 'jobFail' and 'jobStateChange' events
//
job can emit 'jobRetry'
// uses 'job
:new' and 'job:r
etry' events
// emits
action 'job:start' event
//
emits 'job:retry', 'job:reject', 'job:modified' and 'job:stopped' events
shared
.
job_keys
=
arrayExtend
(
shared
.
job_keys
||
[],
[
"
max_retry
"
]);
...
...
@@ -73,9 +73,7 @@ function enableJobRetry(jio, shared, options) {
2
);
// listeners
shared
.
on
(
'
job
'
,
function
(
param
)
{
function
initJob
(
param
)
{
if
(
typeof
param
.
max_retry
!==
'
number
'
||
param
.
max_retry
<
0
)
{
param
.
max_retry
=
positiveNumberOrDefault
(
param
.
options
.
max_retry
,
...
...
@@ -84,32 +82,40 @@ function enableJobRetry(jio, shared, options) {
}
param
.
command
.
reject
=
function
(
status
)
{
if
(
constants
.
http_action
[
status
||
0
]
===
"
retry
"
)
{
shared
.
emit
(
'
job
R
etry
'
,
param
,
arguments
);
shared
.
emit
(
'
job
:r
etry
'
,
param
,
arguments
);
}
else
{
shared
.
emit
(
'
job
Fail
'
,
param
,
arguments
);
shared
.
emit
(
'
job
:reject
'
,
param
,
arguments
);
}
};
param
.
command
.
retry
=
function
()
{
shared
.
emit
(
'
job
R
etry
'
,
param
,
arguments
);
shared
.
emit
(
'
job
:r
etry
'
,
param
,
arguments
);
};
}
);
}
shared
.
on
(
'
jobRetry
'
,
function
(
param
,
args
)
{
function
retryIfRunning
(
param
,
args
)
{
if
(
param
.
state
===
'
running
'
)
{
if
(
param
.
max_retry
===
undefined
||
param
.
max_retry
===
null
||
param
.
max_retry
>=
param
.
tried
)
{
param
.
state
=
'
waiting
'
;
param
.
modified
=
new
Date
();
shared
.
emit
(
'
jobStop
'
,
param
);
shared
.
emit
(
'
job:modified
'
,
param
);
shared
.
emit
(
'
job:stopped
'
,
param
);
setTimeout
(
function
()
{
param
.
state
=
'
ready
'
;
param
.
modified
=
new
Date
();
shared
.
emit
(
'
job
'
,
param
);
shared
.
emit
(
'
job:modified
'
,
param
);
shared
.
emit
(
'
job:start
'
,
param
);
},
min
(
10000
,
param
.
tried
*
2000
));
}
else
{
shared
.
emit
(
'
jobFail
'
,
param
,
args
);
shared
.
emit
(
'
job:reject
'
,
param
,
args
);
}
}
}
});
// listeners
shared
.
on
(
'
job:new
'
,
initJob
);
shared
.
on
(
'
job:retry
'
,
retryIfRunning
);
}
src/jio/features/jobTimeout.js
View file @
298da338
...
...
@@ -13,7 +13,9 @@ function enableJobTimeout(jio, shared, options) {
// - param.timeout_ident Timeout
// - param.state string 'running'
// uses 'job', 'jobDone', 'jobFail', 'jobRetry' and 'jobNotify' events
// uses 'job:new', 'job:stopped', 'job:started',
// 'job:notified' and 'job:end' events
// emits 'job:modified' event
shared
.
job_keys
=
arrayExtend
(
shared
.
job_keys
||
[],
[
"
timeout
"
]);
...
...
@@ -38,34 +40,39 @@ function enableJobTimeout(jio, shared, options) {
};
}
// listeners
shared
.
on
(
'
job
'
,
function
(
param
)
{
if
(
typeof
param
.
timeout
!==
'
number
'
||
param
.
timeout
<
0
)
{
param
.
timeout
=
positiveNumberOrDefault
(
param
.
options
.
timeout
,
function
initJob
(
job
)
{
if
(
typeof
job
.
timeout
!==
'
number
'
||
job
.
timeout
<
0
)
{
job
.
timeout
=
positiveNumberOrDefault
(
job
.
options
.
timeout
,
default_timeout
);
}
param
.
modified
=
new
Date
();
});
job
.
modified
=
new
Date
();
shared
.
emit
(
'
job:modified
'
,
job
);
}
[
"
jobDone
"
,
"
jobFail
"
,
"
jobRetry
"
].
forEach
(
function
(
event
)
{
shared
.
on
(
event
,
function
(
param
)
{
clearTimeout
(
param
.
timeout_ident
);
delete
param
.
timeout_ident
;
});
});
function
clearJobTimeout
(
job
)
{
clearTimeout
(
job
.
timeout_ident
);
delete
job
.
timeout_ident
;
}
[
"
jobRun
"
,
"
jobNotify
"
,
"
jobEnd
"
].
forEach
(
function
(
event
)
{
shared
.
on
(
event
,
function
(
param
)
{
clearTimeout
(
param
.
timeout_ident
);
if
(
param
.
state
===
'
running
'
&&
param
.
timeout
>
0
)
{
param
.
timeout_ident
=
setTimeout
(
timeoutReject
(
param
),
param
.
timeout
);
param
.
modified
=
new
Date
();
function
restartJobTimeoutIfRunning
(
job
)
{
clearTimeout
(
job
.
timeout_ident
);
if
(
job
.
state
===
'
running
'
&&
job
.
timeout
>
0
)
{
job
.
timeout_ident
=
setTimeout
(
timeoutReject
(
job
),
job
.
timeout
);
job
.
modified
=
new
Date
();
}
else
{
delete
param
.
timeout_ident
;
delete
job
.
timeout_ident
;
}
}
});
});
// listeners
shared
.
on
(
'
job:new
'
,
initJob
);
shared
.
on
(
"
job:stopped
"
,
clearJobTimeout
);
shared
.
on
(
"
job:end
"
,
clearJobTimeout
);
shared
.
on
(
"
job:started
"
,
restartJobTimeoutIfRunning
);
shared
.
on
(
"
job:notified
"
,
restartJobTimeoutIfRunning
);
}
test/jio/tests.js
View file @
298da338
...
...
@@ -668,8 +668,8 @@
"
storage_spec
"
:
{
"
type
"
:
"
fake
"
,
"
id
"
:
"
1 Job Manage
"
},
"
method
"
:
"
get
"
,
//"created": new Date(),
"
tried
"
:
1
,
"
state
"
:
"
r
unning
"
,
"
tried
"
:
0
,
// deferred writing 1
"
state
"
:
"
r
eady
"
,
// deferred writing "running"
//"modified": new Date(),
"
max_retry
"
:
2
,
"
timeout
"
:
1200
,
...
...
@@ -683,7 +683,7 @@
setTimeout
(
function
()
{
commands
[
"
1 Job Manage/get
"
].
success
({
"
data
"
:
{
"
b
"
:
"
c
"
}});
},
50
);
// wait 5
0 ms
},
100
);
// wait 10
0 ms
setTimeout
(
function
()
{
deepEqual
(
workspace
,
{},
'
Job ended, empty workspace
'
);
...
...
@@ -703,7 +703,7 @@
o
.
job1
.
kwargs
.
_id
=
'
b
'
;
// o.job1.created = new Date();
// o.job1.modified = new Date();
},
100
);
// wait 5
0 ms
},
200
);
// wait 10
0 ms
setTimeout
(
function
()
{
commands
[
"
1 Job Manage/get
"
].
storage
({
"
type
"
:
"
fake
"
,
...
...
@@ -720,19 +720,23 @@
},
"
Second job respond
"
);
});
o
.
job1
.
tried
=
1
;
o
.
job1
.
state
=
'
running
'
;
o
.
job2
=
{
"
kwargs
"
:
{
"
_id
"
:
"
c
"
},
"
options
"
:
{},
"
storage_spec
"
:
{
"
type
"
:
"
fake
"
,
"
id
"
:
"
2 Job Manage
"
},
"
method
"
:
"
get
"
,
//"created": new Date(),
"
tried
"
:
1
,
"
state
"
:
"
r
unning
"
,
"
tried
"
:
0
,
// deferred writing 1
"
state
"
:
"
r
eady
"
,
// deferred writing "running"
//"modified": new Date(),
"
max_retry
"
:
2
,
"
timeout
"
:
10000
,
"
id
"
:
2
};
tmp
=
workspace
[
"
jio/jobs/{
\"
id
\"
:
\"
1 Job Manage
\"
,
\"
type
\"
:
\"
fake
\"
}
"
];
tmp
=
JSON
.
parse
(
tmp
);
delete
tmp
[
0
].
created
;
...
...
@@ -743,20 +747,24 @@
o
.
job1
,
o
.
job2
],
'
Job calls another job, workspace have two jobs
'
);
},
150
);
// wait 5
0 ms
},
300
);
// wait 10
0 ms
setTimeout
(
function
()
{
commands
[
'
1 Job Manage/get
'
].
end
();
tmp
=
workspace
[
"
jio/jobs/{
\"
id
\"
:
\"
1 Job Manage
\"
,
\"
type
\"
:
\"
fake
\"
}
"
];
tmp
=
JSON
.
parse
(
tmp
);
delete
tmp
[
0
].
created
;
delete
tmp
[
0
].
modified
;
o
.
job2
.
tried
=
1
;
o
.
job2
.
state
=
'
running
'
;
deepEqual
(
tmp
,
[
o
.
job2
],
'
First Job ended, second still there
'
);
commands
[
'
1 Job Manage/get
'
].
success
({
"
data
"
:
{
"
c
"
:
"
d
"
}});
commands
[
'
2 Job Manage/get
'
].
success
({
"
data
"
:
{
"
d
"
:
"
e
"
}});
deepEqual
(
workspace
,
{},
'
No more job in the queue
'
);
},
200
);
// wait 5
0 ms
},
400
);
// wait 10
0 ms
});
test
(
'
job state running, job recovery
'
,
2
,
function
()
{
...
...
@@ -831,8 +839,11 @@
setTimeout
(
function
()
{
// copy workspace when job is waiting
commands
[
'
Job Recovw/post
'
].
retry
();
workspace
=
jIO
.
util
.
deepClone
(
workspace
);
},
50
);
setTimeout
(
function
()
{
workspace
=
jIO
.
util
.
deepClone
(
workspace
);
},
100
);
setTimeout
(
function
()
{
commands
[
'
Job Recovw/post
'
].
success
({
"
id
"
:
"
a
"
});
},
2100
);
...
...
@@ -850,7 +861,7 @@
if
(
commands
[
'
Job Recovw/post
'
])
{
ok
(
false
,
"
Command called, job recovered to earlier
"
);
}
},
19
999
);
},
19
889
);
// need to wait around 19900 ms
setTimeout
(
function
()
{
if
(
!
commands
[
'
Job Recovw/post
'
])
{
...
...
@@ -865,7 +876,7 @@
start
();
deepEqual
(
workspace
,
{},
'
No more job in the queue
'
);
},
20100
);
},
51
);
},
150
);
//////////////////////////////
// XXX Waiting for jobs job recovery
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment