My solution for the async parallel processing problem
User:
mcholste
Date: 2/21/2010 12:44 pm
Date: 2/21/2010 12:44 pm
Views: 511
Rating: 0
Rating: 0
At the last meeting JT and I were discussing our similar problems regarding web code which needs to asynchronously execute other subs then return within the same sub. For example:
sub query {
my $args = shift;
my @results;
foreach my $async_job (@{ $args->{jobs} }){
push @results, $async_job->($args->{jobs}->{$async_job}->{params}); # do this async so we can process in parallel
}
# wait until all jobs are in
return \@results;
}
JT suggested using a condition variable and AnyEvent to handle the "wait until all jobs are in" part, but I was looking for something more native to POE. Also, he said that he ran into problems when the list of jobs needing to be run was non-deterministic.
My solution is not elegant, but it is fairly straightforward and I think it will work in both situations. In my app, I have one main agent process that handles all of the network socket I/O. When a request comes into that process, it uses POE::Component::PreforkDispatch to asynchronously dispatch the incoming job to a waiting forked process that handles all of the work, freeing the agent up to receive more network communication. The big challenge is that you can only return a result over the network via this single agent process because you can't pass sockets around between PID's. The key I found was that POE starts up a new anonymous session for each socket that gets opened, and if you keep track of what session ID is tied to which query, you can create an interface for the async responses to loop their answers back through the agent, then use the recorded query-to-session-id to respond over the network with the answer to the original query. So here's how it works:
1. Query from web client comes into agent.
2. Agent records the session ID attached to the web client's socket in a hash on the main session heap.
3. Agent dispatches query to a waiting fork.
4. Fork does the work, completely detached from the agent and with as many subsequent forks and async querying as it wants.
5. If there are many jobs to run, each job sends its result to the main agent process with a hash in its params indicating how many jobs are left to do (this list can change dynamically).
6. The agent records the state of the jobs as these partial results come in using a different hash then the one it is using to record which session ID is tied to which socket.
7. If the agent notices that all jobs are complete, then it sends the results to an asynch finalization job in yet another async fork.
8. The finalization job sends the final results back up through the agent.
9. The agent gets the final results and checks the session ID hash to see which session ID contains the socket that can be used to send the result back to the original web client.
10. The socket returns the result to the web client.
Whew! See how easy that was! I know this seems really complicated, but it's actually fairly straightforward in that you just use the agent to do all state tracking for the async stuff, and you always route async results and partial results back through the agent as a central clearinghouse. This allows you to use as many PID's as you need to to asynchronously complete the task. I'm using POE::Event::Message to do the message passing because it has support for predefining multiple hops for messages both within POE and over the network between POE kernels. However, I did have to patch it slightly to allow for payloads larger than one packet and to allow for custom payload serialization. I have a patch for anyone interested.
--Martin
sub query {
my $args = shift;
my @results;
foreach my $async_job (@{ $args->{jobs} }){
push @results, $async_job->($args->{jobs}->{$async_job}->{params}); # do this async so we can process in parallel
}
# wait until all jobs are in
return \@results;
}
JT suggested using a condition variable and AnyEvent to handle the "wait until all jobs are in" part, but I was looking for something more native to POE. Also, he said that he ran into problems when the list of jobs needing to be run was non-deterministic.
My solution is not elegant, but it is fairly straightforward and I think it will work in both situations. In my app, I have one main agent process that handles all of the network socket I/O. When a request comes into that process, it uses POE::Component::PreforkDispatch to asynchronously dispatch the incoming job to a waiting forked process that handles all of the work, freeing the agent up to receive more network communication. The big challenge is that you can only return a result over the network via this single agent process because you can't pass sockets around between PID's. The key I found was that POE starts up a new anonymous session for each socket that gets opened, and if you keep track of what session ID is tied to which query, you can create an interface for the async responses to loop their answers back through the agent, then use the recorded query-to-session-id to respond over the network with the answer to the original query. So here's how it works:
1. Query from web client comes into agent.
2. Agent records the session ID attached to the web client's socket in a hash on the main session heap.
3. Agent dispatches query to a waiting fork.
4. Fork does the work, completely detached from the agent and with as many subsequent forks and async querying as it wants.
5. If there are many jobs to run, each job sends its result to the main agent process with a hash in its params indicating how many jobs are left to do (this list can change dynamically).
6. The agent records the state of the jobs as these partial results come in using a different hash then the one it is using to record which session ID is tied to which socket.
7. If the agent notices that all jobs are complete, then it sends the results to an asynch finalization job in yet another async fork.
8. The finalization job sends the final results back up through the agent.
9. The agent gets the final results and checks the session ID hash to see which session ID contains the socket that can be used to send the result back to the original web client.
10. The socket returns the result to the web client.
Whew! See how easy that was! I know this seems really complicated, but it's actually fairly straightforward in that you just use the agent to do all state tracking for the async stuff, and you always route async results and partial results back through the agent as a central clearinghouse. This allows you to use as many PID's as you need to to asynchronously complete the task. I'm using POE::Event::Message to do the message passing because it has support for predefining multiple hops for messages both within POE and over the network between POE kernels. However, I did have to patch it slightly to allow for payloads larger than one packet and to allow for custom payload serialization. I have a patch for anyone interested.
--Martin