I instantiate one grpc server per subprocess by multiprocess pool. When I use multiple clients to access the server, I found the following two problems:
- all clients access same server subprocess
- client raise MaybeEncodingError
By the way, my development environment is :
[OS]
ProductName: Mac OS X
ProductVersion: 10.14.6
BuildVersion: 18G5033
[packages]
grpcio = '1.30.0'
grpcio-tools = '1.30.0'
multiprocess = "0.70.10"
grpcio-status = "1.30.0"
googleapis-common-protos = "1.52.0"
[requires]
python_version = "3.8.3"
Here is the server output:
[PID 83287] Binding to 'localhost:52909'
[PID 83288] Starting new server.
[PID 83289] Starting new server.
[PID 83290] Starting new server.
[PID 83291] Starting new server.
[PID 83292] Starting new server.
[PID 83293] Starting new server.
[PID 83294] Starting new server.
[PID 83295] Starting new server.
[PID 83295] Determining primality of 2
[PID 83295] Determining primality of 9
[PID 83295] Determining primality of 23
[PID 83295] Determining primality of 16
[PID 83295] Determining primality of 10
[PID 83295] Determining primality of 3
[PID 83295] Determining primality of 24
[PID 83295] Determining primality of 17
[PID 83295] Determining primality of 11
[PID 83295] Determining primality of 25
[PID 83295] Determining primality of 4
[PID 83295] Determining primality of 18
[PID 83295] Determining primality of 5
[PID 83295] Determining primality of 12
[PID 83295] Determining primality of 19
[PID 83295] Determining primality of 26
[PID 83295] Determining primality of 6
[PID 83295] Determining primality of 13
[PID 83295] Determining primality of 27
[PID 83295] Determining primality of 20
[PID 83295] Determining primality of 7
[PID 83295] Determining primality of 14
[PID 83295] Determining primality of 8
[PID 83295] Determining primality of 28
[PID 83295] Determining primality of 15
As seen above,all clients access same server subprocess[PID 83295]
. Why?
Here is the client error info:
Traceback (most recent call last):
File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/grpc_multi/client.py", line 96, in <module>
main()
File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/grpc_multi/client.py", line 86, in main
primes = _calculate_primes(args.server_address)
File "/Users/zhaolong/PycharmProjects/pipEnvGrpc/grpc_multi/client.py", line 71, in _calculate_primes
primality = worker_pool.map(_run_worker_query, check_range)
File "/Users/zhaolong/.local/share/virtualenvs/pipEnvGrpc-7cHuZ_0E/lib/python3.8/site-packages/multiprocess/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/Users/zhaolong/.local/share/virtualenvs/pipEnvGrpc-7cHuZ_0E/lib/python3.8/site-packages/multiprocess/pool.py", line 771, in get
raise self._value
multiprocess.pool.MaybeEncodingError: Error sending result: '[isPrime: true
, , isPrime: true
, , isPrime: true
, , ]'. Reason: 'PicklingError("Can't pickle <class 'prime_pb2.Primality'>: it's not the same object as prime_pb2.Primality")'
As seen above,it raises multiprocess.pool.MaybeEncodingError
and PicklingError
. Why?
Below is the full code:
// the prime.proto from https://github.com/grpc/grpc/tree/v1.30.0/examples/python/multiprocessing
syntax = "proto3";
package prime;
// A candidate integer for primality testing.
message PrimeCandidate {
// The candidate.
int64 candidate = 1;
}
// The primality of the requested integer candidate.
message Primality {
// Is the candidate prime?
bool isPrime = 1;
}
// Service to check primality.
service PrimeChecker {
// Determines the primality of an integer.
rpc check (PrimeCandidate) returns (Primality) {}
}
# the server.py from https://github.com/grpc/grpc/tree/v1.30.0/examples/python/multiprocessing,
# and I have modify some palces.
"""An example of multiprocess concurrency with gRPC."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import multiprocessing
from concurrent import futures
import contextlib
import datetime
import logging
import math
import time
import socket
import sys
from multiprocess import pool
import grpc
from grpc_multi import prime_pb2
from grpc_multi import prime_pb2_grpc
_LOGGER = logging.getLogger(__name__)
_ONE_DAY = datetime.timedelta(days=1)
_PROCESS_COUNT = multiprocessing.cpu_count()
_THREAD_CONCURRENCY = _PROCESS_COUNT
def is_prime(n):
for i in range(2, int(math.ceil(math.sqrt(n)))):
if n % i == 0:
return False
else:
return True
class PrimeChecker(prime_pb2_grpc.PrimeCheckerServicer):
def check(self, request, context):
_LOGGER.info('Determining primality of %s', request.candidate)
return prime_pb2.Primality(isPrime=is_prime(request.candidate))
def _wait_forever(server):
try:
while True:
time.sleep(_ONE_DAY.total_seconds())
except KeyboardInterrupt:
server.stop(None)
def _run_server(bind_address):
"""Start a server in a subprocess."""
_LOGGER.info('Starting new server.')
options = (('grpc.so_reuseport', 1),)
server = grpc.server(futures.ThreadPoolExecutor(
max_workers=_THREAD_CONCURRENCY,),
options=options)
prime_pb2_grpc.add_PrimeCheckerServicer_to_server(PrimeChecker(), server)
server.add_insecure_port(bind_address)
server.start()
# _wait_forever(server)
server.wait_for_termination()
@contextlib.contextmanager
def _reserve_port():
"""Find and reserve a port for all subprocesses to use."""
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 0:
raise RuntimeError("Failed to set SO_REUSEPORT.")
sock.bind(('', 0))
try:
yield sock.getsockname()[1]
finally:
sock.close()
def main():
with _reserve_port() as port:
bind_address = 'localhost:{}'.format(port)
_LOGGER.info("Binding to '%s'", bind_address)
sys.stdout.flush()
addrs = [bind_address for _ in range(_PROCESS_COUNT)]
server_pool = pool.Pool(processes=_PROCESS_COUNT)
server_pool.map(_run_server, addrs)
server_pool.close()
# server_pool.join()
if __name__ == '__main__':
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('[PID %(process)d] %(message)s')
handler.setFormatter(formatter)
_LOGGER.addHandler(handler)
_LOGGER.setLevel(logging.INFO)
main()
# the client.py from https://github.com/grpc/grpc/tree/v1.30.0/examples/python/multiprocessing,
# and I have modify some palces.
"""An example of multiprocessing concurrency with gRPC."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import atexit
import logging
from multiprocess import pool
import operator
import sys
import grpc
from grpc_multi import prime_pb2
from grpc_multi import prime_pb2_grpc
_PROCESS_COUNT = 4
_MAXIMUM_CANDIDATE = 100
# Each worker process initializes a single channel after forking.
# It's regrettable, but to ensure that each subprocess only has to instantiate
# a single channel to be reused across all RPCs, we use globals.
_worker_channel_singleton = None
_worker_stub_singleton = None
_LOGGER = logging.getLogger(__name__)
def _shutdown_worker():
_LOGGER.info('Shutting worker process down.')
if _worker_channel_singleton is not None:
_worker_channel_singleton.close()
def _initialize_worker(server_address):
global _worker_channel_singleton # pylint: disable=global-statement
global _worker_stub_singleton # pylint: disable=global-statement
_LOGGER.info('Initializing worker process.')
_worker_channel_singleton = grpc.insecure_channel(server_address)
_worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(
_worker_channel_singleton)
atexit.register(_shutdown_worker)
def _run_worker_query(primality_candidate):
_LOGGER.info('Checking primality of %s.', primality_candidate)
return _worker_stub_singleton.check(
prime_pb2.PrimeCandidate(candidate=primality_candidate))
def _calculate_primes(server_address):
worker_pool = pool.Pool(processes=_PROCESS_COUNT,
initializer=_initialize_worker,
initargs=(server_address,))
check_range = range(2, _MAXIMUM_CANDIDATE)
primality = worker_pool.map(_run_worker_query, check_range)
worker_pool.close()
worker_pool.join()
primes = zip(check_range, map(operator.attrgetter('isPrime'), primality))
return tuple(primes)
def main():
msg = 'Determine the primality of the first {} integers.'.format(
_MAXIMUM_CANDIDATE)
parser = argparse.ArgumentParser(description=msg)
parser.add_argument('--server_address',
default='localhost:52909',
help='The address of the server (e.g. localhost:50051)')
args = parser.parse_args()
primes = _calculate_primes(args.server_address)
print(primes)
if __name__ == '__main__':
handler = logging.StreamHandler(sys.stdout)
formatter = logging.Formatter('[PID %(process)d] %(message)s')
handler.setFormatter(formatter)
_LOGGER.addHandler(handler)
_LOGGER.setLevel(logging.INFO)
main()
I have tried multiple channels for client according to Lidi Zheng's advice, but it seems still not work, Is my method wrong?
def _run_worker_query(param):
server_address, primality_candidate = param
worker_channel_singleton = grpc.insecure_channel(server_address)
worker_stub_singleton = prime_pb2_grpc.PrimeCheckerStub(worker_channel_singleton)
_LOGGER.info('Checking primality of %s.', primality_candidate)
res = worker_stub_singleton.check(
prime_pb2.PrimeCandidate(candidate=primality_candidate))
return res.isPrime
def _calculate_primes(server_address):
# worker_pool = pool.Pool(processes=_PROCESS_COUNT,
# initializer=_initialize_worker,
# initargs=(server_address,))
worker_pool = pool.Pool(processes=_PROCESS_COUNT)
check_range = range(2, _MAXIMUM_CANDIDATE)
params = [(server_address, r) for r in check_range]
primality = worker_pool.map(_run_worker_query, params)
worker_pool.close()
worker_pool.join()
# primes = zip(check_range, map(operator.attrgetter('isPrime'), primality))
primes = zip(check_range, primality)
return tuple(primes)
server output is :
[PID 43279] Determining primality of 3
[PID 43279] Determining primality of 2
[PID 43279] Determining primality of 4
[PID 43279] Determining primality of 5
[PID 43279] Determining primality of 6
[PID 43279] Determining primality of 7
[PID 43279] Determining primality of 8
[PID 43279] Determining primality of 9