5

I've searched everywhere but I couldn't find how to add two sparse vectors using Python. I want to add two sparse vectors like this:-

(1048576, {110522: 0.6931, 521365: 1.0986, 697409: 1.0986, 725041: 0.6931, 749730: 0.6931, 962395: 0.6931})

(1048576, {4471: 1.0986, 725041: 0.6931, 850325: 1.0986, 962395: 0.6931})

4 Answers 4

7

Something like this should work:

from pyspark.mllib.linalg import Vectors, SparseVector, DenseVector
import numpy as np

def add(v1, v2):
    """Add two sparse vectors
    >>> v1 = Vectors.sparse(3, {0: 1.0, 2: 1.0})
    >>> v2 = Vectors.sparse(3, {1: 1.0})
    >>> add(v1, v2)
    SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0})
    """
    assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
    assert v1.size == v2.size 
    # Compute union of indices
    indices = set(v1.indices).union(set(v2.indices))
    # Not particularly efficient but we are limited by SPARK-10973
    # Create index: value dicts
    v1d = dict(zip(v1.indices, v1.values))
    v2d = dict(zip(v2.indices, v2.values))
    zero = np.float64(0)
    # Create dictionary index: (v1[index] + v2[index])
    values =  {i: v1d.get(i, zero) + v2d.get(i, zero)
       for i in indices
       if v1d.get(i, zero) + v2d.get(i, zero) != zero}

    return Vectors.sparse(v1.size, values)

If you prefer only single pass and don't care about introduced zeros you can modify above code like this:

from collections import defaultdict

def add(v1, v2):
    assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
    assert v1.size == v2.size
    values = defaultdict(float) # Dictionary with default value 0.0
    # Add values from v1
    for i in range(v1.indices.size):
        values[v1.indices[i]] += v1.values[i]
    # Add values from v2
    for i in range(v2.indices.size):
        values[v2.indices[i]] += v2.values[i]
    return Vectors.sparse(v1.size, dict(values))

If you want you can try monkey patch SparseVector:

SparseVector.__add__ = add
v1 = Vectors.sparse(5, {0: 1.0, 2: 3.0})
v2 = Vectors.sparse(5, {0: -3.0, 2: -3.0, 4: 10})
v1 + v2
## SparseVector(5, {0: -2.0, 4: 10.0})

Alternatively you should be able to use scipy.sparse.

from scipy.sparse import csc_matrix
from pyspark.mllib.regression import LabeledPoint

m1 = csc_matrix((
   v1.values,
   (v1.indices, [0] * v1.numNonzeros())),
   shape=(v1.size, 1))

m2 = csc_matrix((
   v2.values,
   (v2.indices, [0] * v2.numNonzeros())),
   shape=(v2.size, 1))

LabeledPoint(0, m1 + m2)
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks for the answer. It worked. Can you please explain the first method little bit that how addition is computed there
It simply creates two dictionaries {index : value} and adds respective values to create output dict. I've updated an answer with a solution which should be easier to read.
I'm using the scipy solution. It is nice that you get many more vector algebra operations for free. I wonder how performant it is compared to the other approaches?
1

I had the same problem, but I wasn't able to get the other solutions to complete in less than several hours on a moderately sized dataset (~20M records, vector size = 10k)

So instead I took another related approach which finished in just a few minutes:

import numpy as np

def to_sparse(v):
  values = {i: e for i,e in enumerate(v) if e != 0}
  return Vectors.sparse(v.size, values)

rdd.aggregate(
  np.zeros(vector_size), 
  lambda acc, b: acc + b.toArray(), 
  lambda acc, b: acc + b
).map(to_sparse)

The basic idea was to not build the sparse vector at every step of the reduce, just once at the end and let numpy do all the vector addition work. Even using an aggregateByKey which needed to shuffle the dense vectors, it still only took a few minutes.

Comments

1

All the above functions are adding two sparse vectors of the same size. I was trying out to add the sparse vectors with different length and have found something similar to my requirement in Java here How to combine or merge two sparse vectors in Spark using Java? so written that function in python as follows :

def combineSparseVectors(svs):
    size = 0
    nonzeros = 0
    for sv in svs :
        size += sv.size
        nonzeros += len(sv.indices)
    if nonzeros != 0 :
        indices = np.empty([nonzeros])
        values = np.empty([nonzeros])
        pointer_D = 0
        totalPt_D = 0
        pointer_V = 0
        for sv in svs :
            indicesSV = sv.indices
            for i in indicesSV :
                indices[pointer_D] = i + totalPt_D
                pointer_D=pointer_D+1
            totalPt_D += sv.size
            valuesSV = sv.values
            for d in valuesSV :
                values[pointer_V] = d
                pointer_V=pointer_V+1
        return SparseVector(size, indices, values)
    else :
        return null

Comments

0

The other answers contravene the programming concepts of Spark. Much more simply, just convert the pyspark.ml.lingalg.SparseVector (urOldVec in code below) to Scipy.sparse.csc_matrix objects (i.e. column vectors) then add using the "+" operator.

import scipy.sparse as sps
urNewVec = sps.csc_matrix(urOldVec) 
urNewVec + urNewVec

As mentioned in the docs for pyspark.ml.linalg, scipy.sparse vectors can be passed into pyspark instead.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.