An open API service providing repository metadata for many open source software ecosystems.

Package Usage: go: github.com/destel/rill

Package rill is a collection of easy-to-use functions for concurrency, streaming, batching and pipeline construction. It abstracts away the complexities of concurrency, removes boilerplate, and provides a structured way to handle errors. Rill is modular and can be easily integrated into existing projects: it requires no setup and allows using only the necessary functions. At the same time, rill's functions can be composed into complex, concurrent, and reusable pipelines when needed. In this package, a stream refers to a channel of Try containers. A Try container is a simple struct that holds a value and an error. When an "empty stream" is referred to, it means a channel of Try containers that has been closed and was never written to. Most functions in this package are concurrent, and the level of concurrency can be controlled by the argument n. Some functions share common behaviors and characteristics, which are described below. Functions such as Map, Filter, and Batch take a stream as an input and return a new stream as an output. They do not block and return the output stream immediately. All the processing is done in the background by the goroutine pools they spawn. These functions forward all errors from the input stream to the output stream. Any errors returned by the user-provided functions are also sent to the output stream. When such function reaches the end of the input stream, it closes the output stream, stops processing and cleans up resources. Such functions are designed to be composed together to build complex processing pipelines: Functions such as ForEach, Reduce and MapReduce are used at the last stage of the pipeline to consume the stream and return the final result or error. Usually, these functions block until one of the following conditions is met: In case of an early termination (before reaching the end of the input stream), such functions initiate background draining of the remaining items. This is done to prevent goroutine leaks by ensuring that all goroutines feeding the stream are allowed to complete. The input stream should not be used anymore after calling such functions. It's also possible to consume the pipeline results manually, for example using a for-range loop. In this case, add a deferred call to DrainNB before the loop to ensure that goroutines are not leaked. Functions such as Map, Filter and FlatMap write items to the output stream as soon as they become available. Due to the concurrent nature of these functions, the order of items in the output stream may not match the order of items in the input stream. These functions prioritize performance and concurrency over maintaining the original order. Functions such as OrderedMap or OrderedFilter preserve the order of items from the input stream. These functions are still concurrent, but use special synchronization techniques to ensure that items are written to the output stream in the same order as they were read from the input stream. This additional synchronization has some overhead, but it is negligible for i/o bound workloads. Some other functions, such as ToSlice, Batch or First are not concurrent and are ordered by nature. Error handling can be non-trivial in concurrent applications. Rill simplifies this by providing a structured error handling approach. As described above, all errors are automatically propagated down the pipeline to the final stage, where they can be caught. This allows the pipeline to terminate after the first error is encountered and return it to the caller. In cases where more complex error handling logic is required, the Catch function can be used. It allows to catch and handle errors at any point in the pipeline, providing the flexibility to handle not only the first error, but any of them. This example demonstrates a Rill pipeline that fetches users from an API, and updates their status to active and saves them back. Both operations are done concurrently. This example showcases the use of Rill for building a multi-stage data processing pipeline, with a focus on batch processing. It streams user ids from a remote file, then fetches users from an API in batches, updates their status to active, and saves them back. All operations are done concurrently. This example demonstrates how to use context cancellation to terminate a Rill pipeline in case of an early exit. The printOddSquares function initiates a pipeline that prints squares of odd numbers. The infiniteNumberStream function is the initial stage of the pipeline. It generates numbers indefinitely until the context is canceled. When an error occurs in one of the pipeline stages: This example demonstrates how to use the Fan-in and Fan-out patterns to send messages through multiple servers concurrently. This example demonstrates a concurrent MapReduce performed on a set of remote files. It downloads them and calculates how many times each word appears in all the files. This example demonstrates how OrderedMap can be used to enforce ordering of processing results. Pipeline below fetches temperature measurements for a city and calculates daily temperature changes. Measurements are fetched concurrently, but ordered processing is used to calculate the changes.
10 versions
Latest release: about 1 year ago

View more package details: https://packages.ecosystem.code.gouv.fr/registries/proxy.golang.org/packages/github.com/destel/rill

Dependent Repos 0