معرفی شرکت ها


atbu-mp-pipeline-pkg-0.0.8


Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر
Card image cap
تبلیغات ما

مشتریان به طور فزاینده ای آنلاین هستند. تبلیغات می تواند به آنها کمک کند تا کسب و کار شما را پیدا کنند.

مشاهده بیشتر

توضیحات

ATBU atbu.mp_pipeline package, a multiprocessing work item pipeline.
ویژگی مقدار
سیستم عامل -
نام فایل atbu-mp-pipeline-pkg-0.0.8
نام atbu-mp-pipeline-pkg
نسخه کتابخانه 0.0.8
نگهدارنده []
ایمیل نگهدارنده []
نویسنده Ashley R. Thomas
ایمیل نویسنده ashley.r.thomas.701@gmail.com
آدرس صفحه اصلی https://github.com/AshleyT3/atbu-mp-pipeline
آدرس اینترنتی https://pypi.org/project/atbu-mp-pipeline-pkg/
مجوز -
# Python Multiprocessing Pipeline ## Overview The atbu.mp_pipeline package uses Python multiprocessing capabilities to support multi-stage pipeline capabilities, including support for dual-stage parallel execution of a producer and consumer stages, automatically providing each of those stages with pipe connection, allowing them share what is being produced/consumed. The atbu.mp_pipeline package is currently used by the following project for supporting a backup compression pipeline stage: - [ATBU Backup & Persistent File Information](https://github.com/AshleyT3/atbu) utility package (atbu-pkg). Documentation: https://atbu-mp-pipeline.readthedocs.io/en/latest/ ## Setup To install atbu-common-pkg: ``` pip install atbu-mp-pipeline-pkg ``` See below for a few examples. See source code for other packages mentioned above and for additional details and usage information. ## Examples ### Basic/Simple Pipeline Example ``` import os from multiprocessing import freeze_support from atbu.mp_pipeline.mp_pipeline import ( MultiprocessingPipeline, SubprocessPipelineStage, PipelineWorkItem, ) def queue_worker_func(parm_top_secret, parent_pid): assert parent_pid != os.getpid() return ( parent_pid, os.getpid(), parm_top_secret, ) def int_stage0(wi: PipelineWorkItem): wi.user_obj[0] = 100 return wi def int_stage1(wi: PipelineWorkItem): wi.user_obj[0] = wi.user_obj[0] * 2 wi.user_obj[1] = "stage1" return wi def int_stage2(wi: PipelineWorkItem): wi.user_obj[0] = wi.user_obj[0] * 2 wi.user_obj[2] = f"stage2: got this from parent: {wi.user_obj['parent']}" return wi def always_yes(wi: PipelineWorkItem): return True def main(): sp = MultiprocessingPipeline( name="test_mp_pipeline_basic", stages=[ SubprocessPipelineStage( fn_determiner=always_yes, fn_worker=int_stage0, ), SubprocessPipelineStage( fn_determiner=always_yes, fn_worker=int_stage1, ), SubprocessPipelineStage( fn_determiner=always_yes, fn_worker=int_stage2, ), ], ) d = {} d["parent"] = "This is from parent" wi = PipelineWorkItem(d.copy()) f = sp.submit(wi) r_wi: PipelineWorkItem = f.result() if not wi.is_failed: assert wi == r_wi # On success, should be equal. assert wi.user_obj["parent"] == d["parent"] # Should not change. # Should have values as set by stages... assert wi.user_obj[0] == 400 assert wi.user_obj[1] == "stage1" assert wi.user_obj[2] == "stage2: got this from parent: This is from parent" print(f"Work item completed successfully:") print(f" wi.user_obj[0]={wi.user_obj[0]}") print(f" wi.user_obj[1]={wi.user_obj[1]}") print(f" wi.user_obj[2]={wi.user_obj[2]}") else: print(f"Something did not go as planned:") for ex in r_wi.exceptions: print(f" ex={ex}") sp.shutdown() assert sp.was_graceful_shutdown if __name__ == '__main__': freeze_support() main() ``` **Output:** ``` Work item completed successfully: wi.user_obj[0]=400 wi.user_obj[1]=stage1 wi.user_obj[2]=stage2: got this from parent: This is from parent ``` ### Large Pipeline Example ``` import os from multiprocessing import freeze_support from atbu.mp_pipeline.mp_pipeline import ( MultiprocessingPipeline, SubprocessPipelineStage, PipelineWorkItem, ) class LargePipelineWorkItem(PipelineWorkItem): def __init__(self) -> None: super().__init__(user_obj=self) self.is_ok = True self.num = 0 self.pid = os.getpid() class LargePipelineStage(SubprocessPipelineStage): def __init__(self) -> None: super().__init__() def is_for_stage(self, pwi: LargePipelineWorkItem) -> bool: # Example of stage rejecting its chance to run the work item. # Say "no" to every other request. pwi.is_ok = not pwi.is_ok return not pwi.is_ok def perform_stage_work( self, pwi: LargePipelineWorkItem, **kwargs, ): pwi.num += 1 return pwi def main(): stages = 100 mpp = MultiprocessingPipeline( max_simultaneous_work_items=min(os.cpu_count(), 15), name="test_mp_pipeline_large", ) for _ in range(stages): mpp.add_stage(stage=LargePipelineStage()) print(f"Pipeline has {mpp.num_stages} stages.") wi = LargePipelineWorkItem() f = mpp.submit(wi) r_wi: PipelineWorkItem = f.result() if not r_wi.is_failed: assert wi == r_wi assert r_wi.is_ok assert r_wi.num == 50 print(f"Work item completed successfully:") print(f" r_wi.is_ok={r_wi.is_ok}") print(f" r_wi.pid={r_wi.pid}") else: if r_wi.exceptions is not None: print(f"Something did not go as planned:") for ex in r_wi.exceptions: print(f" ex={ex}") mpp.shutdown() assert mpp.was_graceful_shutdown if __name__ == '__main__': freeze_support() main() ``` **Output:** ``` Pipeline has 100 stages. Work item completed successfully: r_wi.is_ok=True r_wi.pid=8204 r_wi.num=50 ``` ### Mixed Pipeline Example ``` import os from multiprocessing import freeze_support from atbu.mp_pipeline.mp_pipeline import ( MultiprocessingPipeline, PipelineStage, SubprocessPipelineStage, ThreadPipelineStage, PipelineWorkItem, ) class LargePipelineWorkItem(PipelineWorkItem): def __init__(self) -> None: super().__init__(user_obj=self) self.is_ok = True self.num = 0 self.pid = os.getpid() class MixedPipelineSubprocessStage(PipelineStage): def __init__(self) -> None: super().__init__() @property def is_subprocess(self): return True def is_for_stage(self, pwi: LargePipelineWorkItem) -> bool: return True # Yes, this stage wants all work items. def perform_stage_work( self, pwi: LargePipelineWorkItem, **kwargs, ): # Run the work item. assert pwi.pid != os.getpid() # Subprocess should have different pid. pwi.num += 1 return pwi def perform_thread_stage_work( pwi: LargePipelineWorkItem, **kwargs, ): assert pwi.pid == os.getpid() # Thread should have same process. pwi.num += 1 return pwi def main(): mpp = MultiprocessingPipeline( name="mp_pipeline_large_mixed", max_simultaneous_work_items=min(os.cpu_count(), 15), ) for _ in range(10): # Add a stage using our derived-class pipeline stage. mpp.add_stage(stage=MixedPipelineSubprocessStage()) # Add a stage using the library's thread pipeline stage, # where we specify the callable to call for both asking # the pipeline if it wants to run a given work item, and # and other to actually do the work. mpp.add_stage( stage=ThreadPipelineStage( fn_determiner=lambda pwi: True, # Run all work items. fn_worker=perform_thread_stage_work, # Call this to run them. ) ) print(f"Pipeline has {mpp.num_stages} stages.") wi = LargePipelineWorkItem() f = mpp.submit(wi) r_wi: PipelineWorkItem = f.result() if not r_wi.is_failed: assert wi == r_wi assert not r_wi.is_failed assert r_wi.is_ok assert r_wi.num == 20 assert r_wi.pid == os.getpid() print(f"Work item completed successfully:") print(f" r_wi.num={r_wi.num}") print(f" r_wi.pid={r_wi.pid}") else: if r_wi.exceptions is not None: print(f"Something did not go as planned:") for ex in r_wi.exceptions: print(f" ex={ex}") mpp.shutdown() assert mpp.was_graceful_shutdown if __name__ == '__main__': freeze_support() main() ``` **Output:** ``` Pipeline has 20 stages. Work item completed successfully: r_wi.num=20 r_wi.pid=18068 ``` ### Dual-stage Producer/Consumer Example ``` import os from multiprocessing import freeze_support import concurrent.futures from atbu.mp_pipeline.mp_pipeline import ( MultiprocessingPipeline, SubprocessPipelineStage, PipelineWorkItem, PipeConnectionIO, ) class ProducerConsumerWorkItem(PipelineWorkItem): def __init__(self, demo_pipe_io_wrapper) -> None: super().__init__(auto_copy_attr=False) self.demo_pipe_io_wrapper = demo_pipe_io_wrapper self.producer_data = None # What producer sent. self.producer_bytes_written = None self.consumer_data = None # What consumer received def stage_complete( self, stage_num: int, # pylint: disable=unused-argument wi: "PipelineWorkItem", # pylint: disable=unused-argument ex: Exception, ): super().stage_complete( stage_num=stage_num, wi=wi, ex=ex, ) if not wi.is_failed: if stage_num == 0: self.producer_data = wi.producer_data self.producer_bytes_written = wi.producer_bytes_written elif stage_num == 1: self.consumer_data = wi.consumer_data class ProducerPipelineSubprocessStage(SubprocessPipelineStage): def __init__(self) -> None: super().__init__() def is_for_stage(self, pwi: ProducerConsumerWorkItem) -> bool: return True # Yes, we want to see all work items in this stage. @property def is_pipe_with_next_stage(self): """Return True to indicate we want this stage and the next one to run in parallel, where this stage is the producer feeding a pipeline-supplied pipe, and the next stage is consuming from that same pipe (on the reader side). """ return True def perform_stage_work( self, pwi: ProducerConsumerWorkItem, **kwargs, ): try: if not isinstance(pwi, ProducerConsumerWorkItem): raise ValueError( f"Producer: Pipeline gave us unexpected work item." ) pwi.producer_data = os.urandom(10) if pwi.demo_pipe_io_wrapper: # PipeConnectionIO wraps the multiprocessing Pipe, providing # an io.RawIOBase interface (with limitations... i.e., seek is # not supported). with PipeConnectionIO(pwi.pipe_conn, is_write=True) as pipe_io: pwi.producer_bytes_written = pipe_io.write(pwi.producer_data) else: # Just use the pipe connection directly. pwi.pipe_conn.send_bytes(pwi.producer_data) pwi.producer_bytes_written = len(pwi.producer_data) return pwi except Exception: if not pwi.pipe_conn.closed: pwi.pipe_conn.close() raise class ConsumerPipelineSubprocessStage(SubprocessPipelineStage): def __init__(self) -> None: super().__init__() def is_for_stage(self, pwi: ProducerConsumerWorkItem) -> bool: return True # Yes, we want to see all work items in this stage. def perform_stage_work( self, pwi: ProducerConsumerWorkItem, **kwargs, ): try: if not isinstance(pwi, ProducerConsumerWorkItem): raise ValueError( f"Consumer: Pipeline gave us unexpected work item." ) if pwi.demo_pipe_io_wrapper: # PipeConnectionIO wraps the multiprocessing Pipe, providing # an io.RawIOBase interface (with limitations... i.e., seek is # not supported). with PipeConnectionIO(pwi.pipe_conn, is_write=False) as pipe_io: pwi.consumer_data = pipe_io.read() else: # Just use the pipe connection directly. pwi.consumer_data = pwi.pipe_conn.recv_bytes() return pwi except Exception: if not pwi.pipe_conn.closed: pwi.pipe_conn.close() raise def main(): mpp = MultiprocessingPipeline( name="test_mp_producer_oncsoler", stages=[ ProducerPipelineSubprocessStage(), ConsumerPipelineSubprocessStage() ] ) wil = [ ProducerConsumerWorkItem(demo_pipe_io_wrapper=True), ProducerConsumerWorkItem(demo_pipe_io_wrapper=False) ] fut = [mpp.submit(wi) for wi in wil] done, _ = concurrent.futures.wait( fs=fut, return_when=concurrent.futures.ALL_COMPLETED ) assert len(done) == 2 for i, f in enumerate(done): r_wi: ProducerConsumerWorkItem = f.result() if not r_wi.is_failed: assert r_wi.producer_bytes_written == 10 assert r_wi.producer_data == r_wi.consumer_data print(f"Success for #{i}:") print(f" {i}: num_bytes={r_wi.producer_bytes_written}") print(f" {i}: p_bytes={r_wi.producer_data.hex(' ')}") print(f" {i}: c_bytes={r_wi.consumer_data.hex(' ')}") else: if r_wi.exceptions is not None: print(f"Something did not go as planned for #{i}:") for ex in r_wi.exceptions: print(f" {i}: Bad thing happened: {ex}") mpp.shutdown() assert mpp.was_graceful_shutdown if __name__ == '__main__': freeze_support() main() ``` **Output:** ``` Success for #0: 0: num_bytes=10 0: p_bytes=76 d4 32 f0 4f 3f 31 30 19 00 0: c_bytes=76 d4 32 f0 4f 3f 31 30 19 00 Success for #1: 1: num_bytes=10 1: p_bytes=34 d6 5c d0 be 82 62 c3 5d 61 1: c_bytes=34 d6 5c d0 be 82 62 c3 5d 61 ```


نیازمندی

مقدار نام
>=0.0.3 atbu-common-pkg


زبان مورد نیاز

مقدار نام
>=3.9 Python


نحوه نصب


نصب پکیج whl atbu-mp-pipeline-pkg-0.0.8:

    pip install atbu-mp-pipeline-pkg-0.0.8.whl


نصب پکیج tar.gz atbu-mp-pipeline-pkg-0.0.8:

    pip install atbu-mp-pipeline-pkg-0.0.8.tar.gz