NOTE
Further updates to this function will be published to the GitHub repo as well as to the PowerShell Gallery. The code in this answer will no longer be maintained.
The documentation for the function as well as usage examples can be found here. Note that the Module version no longer has a -ThreadOptions
parameter and implements -UseNewRunspace
and -TimeoutSeconds
parameters however its usage should be the same.
Contributions are more than welcome, if you wish to contribute, fork the repo and submit a pull request with the changes.
Since this is a topic that can be confusing and often brings questions to the site I have decided to create this function that can simplify this tedious task and help those stuck in Windows PowerShell. The aim is to have it as simple and as friendly as possible, it should also be a function that could be copy-pasted in our $PROFILE
to be reused whenever needed and not require the installation of a Module (as stated in the question).
This function has been greatly inspired by RamblingCookieMonster's Invoke-Parallel
and Boe Prox's PoshRSJob
and is merely a simplified take on those with a few improvements.
DEFINITION
using namespace System.Collections
using namespace System.Collections.Generic
using namespace System.Management.Automation
using namespace System.Management.Automation.Language
using namespace System.Management.Automation.Runspaces
using namespace System.Threading
using namespace System.Text
# The function must run in the scope of a Module.
# `New-Module` must be used for portability. Otherwise store the
# function in a `.psm1` and import it via `Import-Module`.
New-Module PSParallelPipeline -ScriptBlock {
class CommandCompleter : IArgumentCompleter {
[IEnumerable[CompletionResult]] CompleteArgument(
[string] $commandName,
[string] $parameterName,
[string] $wordToComplete,
[CommandAst] $commandAst,
[IDictionary] $fakeBoundParameters) {
return [CompletionCompleters]::CompleteCommand(
$wordToComplete,
[NullString]::Value,
[CommandTypes]::Function)
}
}
function Invoke-Parallel {
[CmdletBinding(PositionalBinding = $false)]
[Alias('parallel')]
param(
[Parameter(Mandatory, ValueFromPipeline)]
[object] $InputObject,
[Parameter(Mandatory, Position = 0)]
[scriptblock] $ScriptBlock,
[Parameter()]
[ValidateRange(1, 63)]
[int] $ThrottleLimit = 5,
[Parameter()]
[hashtable] $Variables,
[Parameter()]
[ValidateNotNullOrEmpty()]
[ArgumentCompleter([CommandCompleter])]
[string[]] $Functions,
[Parameter()]
[ValidateSet('ReuseThread', 'UseNewThread')]
[PSThreadOptions] $ThreadOptions = [PSThreadOptions]::ReuseThread
)
begin {
try {
$iss = [initialsessionstate]::CreateDefault2()
foreach ($key in $Variables.PSBase.Keys) {
if ($Variables[$key] -is [scriptblock]) {
$PSCmdlet.ThrowTerminatingError([ErrorRecord]::new(
[PSArgumentException]::new('Passed-in script block variables are not supported.'),
'VariableCannotBeScriptBlock',
[ErrorCategory]::InvalidType,
$Variables[$key]))
}
$iss.Variables.Add([SessionStateVariableEntry]::new($key, $Variables[$key], ''))
}
foreach ($function in $Functions) {
$def = (Get-Command $function).Definition
$iss.Commands.Add([SessionStateFunctionEntry]::new($function, $def))
}
$usingParams = @{}
foreach ($usingstatement in $ScriptBlock.Ast.FindAll({ $args[0] -is [UsingExpressionAst] }, $true)) {
$variableAst = [UsingExpressionAst]::ExtractUsingVariable($usingstatement)
$varPath = $variableAst.VariablePath.UserPath
$varText = $usingstatement.ToString()
if ($usingstatement.SubExpression -is [VariableExpressionAst]) {
$varText = $varText.ToLowerInvariant()
}
$key = [Convert]::ToBase64String([Encoding]::Unicode.GetBytes($varText))
if ($usingParams.ContainsKey($key)) {
continue
}
$value = $PSCmdlet.SessionState.PSVariable.GetValue($varPath)
if ($value -is [scriptblock]) {
$PSCmdlet.ThrowTerminatingError([ErrorRecord]::new(
[PSArgumentException]::new('Passed-in script block variables are not supported.'),
'VariableCannotBeScriptBlock',
[ErrorCategory]::InvalidType,
$value))
}
if ($usingstatement.SubExpression -isnot [VariableExpressionAst]) {
[Stack[Ast]] $subexpressionStack = $usingstatement.SubExpression.FindAll({
$args[0] -is [IndexExpressionAst] -or
$args[0] -is [MemberExpressionAst] },
$false)
while ($subexpressionStack.Count) {
$subexpression = $subexpressionStack.Pop()
if ($subexpression -is [IndexExpressionAst]) {
$idx = $subexpression.Index.SafeGetValue()
$value = $value[$idx]
continue
}
if ($subexpression -is [MemberExpressionAst]) {
$member = $subexpression.Member.SafeGetValue()
$value = $value.$member
}
}
}
$usingParams.Add($key, $value)
}
$pool = [runspacefactory]::CreateRunspacePool(1, $ThrottleLimit, $iss, $Host)
$tasks = [List[hashtable]]::new()
$pool.ThreadOptions = $ThreadOptions
$pool.Open()
}
catch {
$PSCmdlet.ThrowTerminatingError($_)
}
}
process {
try {
# Thanks to Patrick Meinecke for his help here.
# https://github.com/SeeminglyScience/
$ps = [powershell]::Create().
AddScript({ $args[0].InvokeWithContext($null, [psvariable]::new('_', $args[1])) }).
AddArgument($ScriptBlock.Ast.GetScriptBlock()).
AddArgument($InputObject)
# This is how `Start-Job` does it's magic.
# Thanks to Jordan Borean for his help here.
# https://github.com/jborean93
if ($usingParams.Count) {
$null = $ps.AddParameters(@{ '--%' = $usingParams })
}
$ps.RunspacePool = $pool
$tasks.Add(@{
Instance = $ps
AsyncResult = $ps.BeginInvoke()
})
}
catch {
$PSCmdlet.WriteError($_)
}
}
end {
try {
while ($tasks.Count) {
$id = [WaitHandle]::WaitAny($tasks.AsyncResult.AsyncWaitHandle, 200)
if ($id -eq [WaitHandle]::WaitTimeout) {
continue
}
$task = $tasks[$id]
$task.Instance.EndInvoke($task.AsyncResult)
foreach ($err in $task.Instance.Streams.Error) {
$PSCmdlet.WriteError($err)
}
$tasks.RemoveAt($id)
}
}
catch {
$PSCmdlet.WriteError($_)
}
finally {
foreach ($task in $tasks.Instance) {
if ($task -is [IDisposable]) {
$task.Dispose()
}
}
if ($pool -is [IDisposable]) {
$pool.Dispose()
}
}
}
}
} -Function Invoke-Parallel | Import-Module -Force
workflow
is awfully inefficient and not even recommended. – Cerberus