Typically when one wants to use sparklyr
on a custom function (i.e. **non-translated functions) they place them within spark_apply()
. However, I've only encountered examples where a single local data frame is either copy_to()
or spark_read_csv()
to a remote data source and then used spark_apply()
on it. An example, for illustrative purposes only:
library(sparklyr)
sc <- spark_connect(master = "local")
n_sim = 100
iris_samps <- iris %>% dplyr::filter(Species == "virginica") %>%
sapply(rep.int, times=n_sim) %>% cbind(replicate = rep(1:n_sim, each = 50)) %>%
data.frame() %>%
dplyr::group_by(replicate) %>%
dplyr::sample_n(50, replace = TRUE)
iris_samps_tbl <- copy_to(sc, iris_samps)
iris_samps_tbl %>%
spark_apply(function(x) {mean(x$Petal_Length)},
group_by = "replicate") %>%
ggplot(aes(x = result)) + geom_histogram(bins = 20) + ggtitle("Histogram of 100 Bootstrapped Means using sparklyr")
It seems like it would be therefore possible to use this on any range of non-translated functions coming from CRAN
or Bioconductor
packages as long as the data resides in a Spark Object.
I've came up with a specific problem for .jpeg
images as I read that SparkR
can load compressed image (.jpeg
, .png
, etc.) into raw image representation via ImageIO
in Java library - it seems possible that sparklyr
could do this as well.
RsimMosaic::composeMosaicFromImageRandom(inputImage, outputImage, pathToTilesLibrary)
function takes an input image and a path to tiles used to create a photo mosaic and outputs an image (example here).
If this function only took one image and I knew how to turn it into spark object I might imagine the command would look like: composeMosaicFromImageRandom(inputImage, outputImage, spark_obj)
. However, this function is taking a path to 30,000 images.
How would one create 30,000 Spark Objects from the path to these tiles (.jpegs
) and then use this function?
If the underlying code would actually need to be modified I've used jimhester/lookup
to provide the source code:
function (originalImageFileName, outputImageFileName, imagesToUseInMosaic,
useGradients = FALSE, removeTiles = TRUE, fracLibSizeThreshold = 0.7,
repFracSize = 0.25, verbose = TRUE)
{
if (verbose) {
cat(paste("\n ------------------------------------------------ \n"))
cat(paste(" R Simple Mosaic composer - random version \n"))
cat(paste(" ------------------------------------------------ \n\n"))
}
if (verbose) {
cat(paste(" Creating the library... \n"))
}
libForMosaicFull <- createLibraryIndexDataFrame(imagesToUseInMosaic,
saveLibraryIndex = F, useGradients = useGradients)
libForMosaic <- libForMosaicFull
filenameArray <- list.files(imagesToUseInMosaic, full.names = TRUE)
originalImage <- jpeg::readJPEG(filenameArray[1])
xTileSize <- dim(originalImage[, , 1])[1]
yTileSize <- dim(originalImage[, , 1])[2]
if (verbose) {
cat(paste(" -- Tiles in the Library : ", length(libForMosaic[,
1]), "\n"))
cat(paste(" -- Tile dimensions : ", xTileSize, " x ",
yTileSize, "\n"))
}
if (verbose) {
cat(paste("\n"))
cat(paste(" Reading the original image... \n"))
}
originalImage <- jpeg::readJPEG(originalImageFileName)
xOrigImgSize <- dim(originalImage[, , 1])[1]
yOrigImgSize <- dim(originalImage[, , 1])[2]
if (verbose) {
cat(paste(" -- Original image dimensions : ", xOrigImgSize,
" x ", yOrigImgSize, "\n"))
cat(paste(" -- Output image dimensions : ", ((xOrigImgSize -
2) * xTileSize), " x ", ((yOrigImgSize - 2) * yTileSize),
"\n"))
}
if (verbose) {
cat(paste("\n"))
cat(paste(" Computing the mosaic... \n"))
}
outputImage <- array(dim = c(((xOrigImgSize - 2) * xTileSize),
((yOrigImgSize - 2) * yTileSize), 3))
removedList <- c()
l <- 1
pCoord <- matrix(nrow = ((xOrigImgSize - 2) * (yOrigImgSize -
2)), ncol = 2)
for (i in 2:(xOrigImgSize - 1)) {
for (j in 2:(yOrigImgSize - 1)) {
pCoord[l, 1] <- i
pCoord[l, 2] <- j
l <- l + 1
}
}
npixels <- length(pCoord[, 1])
for (i in 1:npixels) {
idx <- round(runif(1, 1, length(pCoord[, 1])))
pixelRGBandNeigArray <- computeStatisticalQuantitiesPixel(pCoord[idx,
1], pCoord[idx, 2], originalImage, useGradients)
tileFilename <- getCloseMatch(pixelRGBandNeigArray,
libForMosaic)
startI <- (pCoord[idx, 1] - 2) * xTileSize + 1
startJ <- (pCoord[idx, 2] - 2) * yTileSize + 1
outputImage[startI:(startI + xTileSize - 1), startJ:(startJ +
yTileSize - 1), ] <- jpeg::readJPEG(tileFilename)
if (removeTiles) {
libForMosaic <- removeTile(tileFilename, libForMosaic)
removedList <- c(removedList, tileFilename)
if (length(libForMosaic[, 1]) < (fracLibSizeThreshold *
length(libForMosaicFull[, 1]))) {
idxs <- runif(round(0.25 * length(libForMosaicFull[,
1])), 1, length(removedList))
for (ii in 1:length(idxs)) {
libForMosaic <- addBackTile(removedList[idxs[ii]],
libForMosaic, libForMosaicFull)
}
removedList <- removedList[-idxs]
}
}
if (length(pCoord[, 1]) > 2) {
pCoord <- pCoord[-idx, ]
}
}
if (verbose) {
cat(paste("\n"))
cat(paste(" Done!\n\n"))
}
jpeg::writeJPEG(outputImage, outputImageFileName)
}
Please Note: My first attempt to speed up this code was 1) using profvis
to find the bottlenecks (i.e. the for-loops) 2) use the foreach
package on for-loops. This resulted in slower code which suggested I was parallelising at too low of a level. As I understand sparklyr
is more about distributing the computing than about parallelizing it so perhaps this could work.
sparklyr
on non-translated functions, specifically forcomposeMosaicFromImageRandom()
. – Fearnoughtspark_apply()
on a single.jpeg
? The help says that you need to provide >An object (usually a spark_tbl) coercable to a Spark DataFrame. – Iodic