Creating Spark Objects from JPEG and using spark_apply() on a non-translated function
Asked Answered
F

0

18

Typically when one wants to use sparklyr on a custom function (i.e. **non-translated functions) they place them within spark_apply(). However, I've only encountered examples where a single local data frame is either copy_to() or spark_read_csv() to a remote data source and then used spark_apply() on it. An example, for illustrative purposes only:

library(sparklyr)
sc <- spark_connect(master = "local")

n_sim = 100
iris_samps <- iris %>% dplyr::filter(Species == "virginica") %>%
  sapply(rep.int, times=n_sim) %>% cbind(replicate = rep(1:n_sim, each = 50)) %>% 
  data.frame() %>%
  dplyr::group_by(replicate) %>%
  dplyr::sample_n(50, replace = TRUE)

iris_samps_tbl <- copy_to(sc, iris_samps)

iris_samps_tbl %>% 
  spark_apply(function(x) {mean(x$Petal_Length)}, 
    group_by = "replicate") %>%
  ggplot(aes(x = result)) + geom_histogram(bins = 20) + ggtitle("Histogram of 100 Bootstrapped Means using sparklyr")

It seems like it would be therefore possible to use this on any range of non-translated functions coming from CRAN or Bioconductor packages as long as the data resides in a Spark Object.

I've came up with a specific problem for .jpeg images as I read that SparkR can load compressed image (.jpeg, .png, etc.) into raw image representation via ImageIO in Java library - it seems possible that sparklyr could do this as well.

RsimMosaic::composeMosaicFromImageRandom(inputImage, outputImage, pathToTilesLibrary) function takes an input image and a path to tiles used to create a photo mosaic and outputs an image (example here).

If this function only took one image and I knew how to turn it into spark object I might imagine the command would look like: composeMosaicFromImageRandom(inputImage, outputImage, spark_obj). However, this function is taking a path to 30,000 images.

How would one create 30,000 Spark Objects from the path to these tiles (.jpegs) and then use this function?

If the underlying code would actually need to be modified I've used jimhester/lookup to provide the source code:

function (originalImageFileName, outputImageFileName, imagesToUseInMosaic, 
        useGradients = FALSE, removeTiles = TRUE, fracLibSizeThreshold = 0.7, 
        repFracSize = 0.25, verbose = TRUE) 
{
        if (verbose) {
                cat(paste("\n ------------------------------------------------ \n"))
                cat(paste("    R Simple Mosaic composer - random version   \n"))
                cat(paste(" ------------------------------------------------ \n\n"))
        }
        if (verbose) {
                cat(paste("    Creating the library... \n"))
        }
        libForMosaicFull <- createLibraryIndexDataFrame(imagesToUseInMosaic, 
                saveLibraryIndex = F, useGradients = useGradients)
        libForMosaic <- libForMosaicFull
        filenameArray <- list.files(imagesToUseInMosaic, full.names = TRUE)
        originalImage <- jpeg::readJPEG(filenameArray[1])
        xTileSize <- dim(originalImage[, , 1])[1]
        yTileSize <- dim(originalImage[, , 1])[2]
        if (verbose) {
                cat(paste("    -- Tiles in the Library : ", length(libForMosaic[, 
                        1]), "\n"))
                cat(paste("    -- Tile dimensions : ", xTileSize, " x ", 
                        yTileSize, "\n"))
        }
        if (verbose) {
                cat(paste("\n"))
                cat(paste("    Reading the original image... \n"))
        }
        originalImage <- jpeg::readJPEG(originalImageFileName)
        xOrigImgSize <- dim(originalImage[, , 1])[1]
        yOrigImgSize <- dim(originalImage[, , 1])[2]
        if (verbose) {
                cat(paste("    -- Original image dimensions : ", xOrigImgSize, 
                        " x ", yOrigImgSize, "\n"))
                cat(paste("    -- Output image dimensions : ", ((xOrigImgSize - 
                        2) * xTileSize), " x ", ((yOrigImgSize - 2) * yTileSize), 
                        "\n"))
        }
        if (verbose) {
                cat(paste("\n"))
                cat(paste("    Computing the mosaic... \n"))
        }
        outputImage <- array(dim = c(((xOrigImgSize - 2) * xTileSize), 
                ((yOrigImgSize - 2) * yTileSize), 3))
        removedList <- c()
        l <- 1
        pCoord <- matrix(nrow = ((xOrigImgSize - 2) * (yOrigImgSize - 
                2)), ncol = 2)
        for (i in 2:(xOrigImgSize - 1)) {
                for (j in 2:(yOrigImgSize - 1)) {
                        pCoord[l, 1] <- i
                        pCoord[l, 2] <- j
                        l <- l + 1
                }
        }
        npixels <- length(pCoord[, 1])
        for (i in 1:npixels) {
                idx <- round(runif(1, 1, length(pCoord[, 1])))
                pixelRGBandNeigArray <- computeStatisticalQuantitiesPixel(pCoord[idx, 
                        1], pCoord[idx, 2], originalImage, useGradients)
                tileFilename <- getCloseMatch(pixelRGBandNeigArray, 
                        libForMosaic)
                startI <- (pCoord[idx, 1] - 2) * xTileSize + 1
                startJ <- (pCoord[idx, 2] - 2) * yTileSize + 1
                outputImage[startI:(startI + xTileSize - 1), startJ:(startJ + 
                        yTileSize - 1), ] <- jpeg::readJPEG(tileFilename)
                if (removeTiles) {
                        libForMosaic <- removeTile(tileFilename, libForMosaic)
                        removedList <- c(removedList, tileFilename)
                        if (length(libForMosaic[, 1]) < (fracLibSizeThreshold * 
                                length(libForMosaicFull[, 1]))) {
                                idxs <- runif(round(0.25 * length(libForMosaicFull[, 
                                        1])), 1, length(removedList))
                                for (ii in 1:length(idxs)) {
                                        libForMosaic <- addBackTile(removedList[idxs[ii]], 
                                                libForMosaic, libForMosaicFull)
                                }
                                removedList <- removedList[-idxs]
                        }
                }
                if (length(pCoord[, 1]) > 2) {
                        pCoord <- pCoord[-idx, ]
                }
        }
        if (verbose) {
                cat(paste("\n"))
                cat(paste("    Done!\n\n"))
        }
        jpeg::writeJPEG(outputImage, outputImageFileName)
}

Please Note: My first attempt to speed up this code was 1) using profvis to find the bottlenecks (i.e. the for-loops) 2) use the foreach package on for-loops. This resulted in slower code which suggested I was parallelising at too low of a level. As I understand sparklyr is more about distributing the computing than about parallelizing it so perhaps this could work.

Fearnought answered 18/3, 2020 at 16:33 Comment(3)
A long post; what exactly are you trying to achieve?Barkentine
I would like to use sparklyr on non-translated functions, specifically for composeMosaicFromImageRandom().Fearnought
Have you successfully used spark_apply() on a single .jpeg? The help says that you need to provide >An object (usually a spark_tbl) coercable to a Spark DataFrame.Iodic

© 2022 - 2024 — McMap. All rights reserved.