keep learning blog(キープラーニングブログ)

自分が興味を持ったことを備忘録として残すブログです。

24.NMFとMFCCで音源分離してみよう

――明日死ぬかのように生きよ。 永遠に生きるかのように学べ。――

マハトマ・ガンディー

 

 

NMFとMFCCで音源分離してみよう

最近、身の回りがバタバタとしていて更新が途絶えていました。日本でも猛威を奮っている、例のあれです。コロナちゃんです。彼らがついに私の住む地域(ド田舎)にまでやってきました。

どうやらデンマーク人は冬になるとイタリアなどの海外でスキーをするのが定番らしく、かつデンマーク人の楽天的な性格のせいもあって、旅行帰りに持ち帰ったウイルスは短期間で爆発的に流行し、瞬く間に日本の感染者(確認)数よりも多くなってしまいました。

実は私は、東日本大震災のときにも大学院の卒業旅行でイタリアにいました。こういう大事件が起きるとき、なぜか欧州に滞在していることが多いようです。今回も不思議な巡り合わせだなあと感じます。

さて、ニュースを見てもコロナ一色、くらーい話題ばかりで楽しくないので、今回は楽しい(?)Pythonで音声処理の記事を書きたいと思います。おさらいしますと、第21回の記事では、人間が二つの周波数の差を等間隔だと感じる心理的なピッチの尺度「メル尺度」についてご紹介しました。

keep-learning.hatenablog.jp

他方、第22回の記事では、正の値しか取らないデータを教師なし機械学習で基底行列Hと活性化行列Uの行列積に分解する手法「非負値行列因子分解(Non-negative matrix factorization)」をご紹介しました。

keep-learning.hatenablog.jp

メル尺度とNMF、ちょうどこの二つを紹介したこのタイミングで、偶然にも両者に関連する論文を発見しました。今回はその論文の概要を説明しつつ、実際にPythonアルゴリズムを実装してみたいと思います。

f:id:yuki0718:20200320194313j:plain

 

音源分離(Sound separation)

日本人が得意な音声処理分野の一つに、音源分離(Sound separation)というものがあります。音声の中から邪魔な背景雑音を分離したり、音楽の中から歌声だけを取り除いたり(カラオケ化)するようなタスクです。

想像ではありますが、音源分離はNTTやSONYが得意な技術であり、こういった大企業に優秀な研究者を集めているため、日本は存在感を維持できているのではないでしょうか*1

さて、音源分離技術を評価する研究会として、SiSEC(The second community-based Signal Separation Evaluation Campaign)という第三者機関が主催しているイベントが有名です。

sisec.inria.fr

SiSECでは、プロのミュージシャンが作曲した音楽をドラムやギター等のパートに分離する性能を競うイベントが定期的に行われており、テストに使用可能な音楽データが入手できます。今回はSiSEC2011のデータ*2を利用して、音源分離タスクに挑戦してみます。

 

NMFと基底クラスタリング

前々回の記事(22.機械学習の基本part.5)では、異なる二音階のピアノ音を打鍵した音声データをサンプルとし、NMFを使って基底行列Hと活性化行列Uの行列積に分解しました。

このくらい単純な音声であれば、Hに含まれる各列ベクトル(基底ベクトル)の持つ意味は非常に明確でした。すなわち、一つ目の基底はある音階のピアノ音、二つ目の基底は別の音階のピアノ音、三つ目の基底は鍵盤を叩いたときの打音、といった解釈です。

しかしながら、音楽というのは複数の楽器が複数の音階を奏でている複雑なデータなので、単純に音楽データをNMFで分解すると、基底行列Hに含まれる各列ベクトル一つ一つがどの楽器による演奏なのかを特定するのは非常に困難です*3

したがって、音源分離をNMFで行う場合、このタスクは「各々の基底ベクトルがどの楽器に対応するものなのかラベル付けする」という一種の分類問題に帰着します。

手元に楽器の音色のお手本となる教師データがないのであれば、これは以前ご紹介したクラスタリング(Clustering)と呼ばれる教師なし機械学習(Unsupervised machine learning)の問題設定に他なりません。

ただし、クラスタリングするには音声データは情報量が多すぎて冗長(Redundant)過ぎるので、まずは特徴量を抽出しなければいけません。音声の特徴量抽出には数多くの手法が提案されており、Variational Autoencoder(VAE)と呼ばれるディープラーニングベースの手法も知られています。

今回は、2009年にM. SpiertzとV. Gnannが発表した論文「Source-Filter Based Clustering For Monaural Blind Source Separation」に従い、メル尺度の特徴量を使って音源分離を行ってみます。

 

ソース・フィルタ理論

人間の声の物理モデルとして、ソース・フィルタ理論という考え方があります。この理論では、音声の周波数特性が喉の発声(音源=Source)と声道における音の調整(フィルタ=Filter)との積によって構成されるという仮定(Assumption)を置きます。

多くの場合、人間の声音を特徴づけているのは声道、すなわちフィルタ部分の方なので、ある音声データのソースを維持してフィルタだけ他人のものに変えれば、見た目は子供の名探偵が使っているボイスチェンジャーのような装置を作ることができます*4

ソース・フィルタ理論は、人間の声道に似たフルートやクラリネットのような管楽器にも成り立つと想像されます。さらに、これを楽器一般に成り立つと(ちょっと強引に)仮定してNMFの基底ベクトルをクラスタリングしてみよう、というのがSpiertzらのアイデアです。

この仮定が成り立つとすれば、音楽をNMFで分離して得られる基底行列 H_{k,m} もまた、ソース E_{k,m} とフィルタ F_{k,m}との積で表されることになります。

このとき、特徴量をメル周波数に圧縮するためのメル・フィルタバンク行列 R_{mel,k} と基底ベクトルとの行列積を計算し、そのlogを取ったものは、

 \log(R_{mel,k}H^{2}_{k,m}) \approx \log(R_{mel,k}E^{2}_{k,m})+\log(R_{mel,k}H^{2}_{k,m})

対数関数の性質から上式の右辺のようにソース項とフィルタ項に分かれます。ソースは楽器にあまり依存しないので、この値は右辺第二項に引きずられて、メル尺度の次元数を持つ特徴量空間に楽器ごとの偏りをもって分布することが期待されます。

Spiertzらは、この楽器ごとの偏りをクラスタリングする手法として、以下の二つを提案しています。

一つ目は、次元数を更に減らすため上式の値をMFCCに変換して、一般的な古典クラスタリングモデルに通す方法です。具体的には、離散コサイン変換(DCT)係数をとって低次のものだけ抜き出し、k-means法(特徴量空間で距離が近いもの同士を仲間として分類する手法)でラベル付けします。

二つ目はなかなかユニークで、上式の値を一回目とは別次元のNMFで二つの行列因子W, Vに分解して、得られた活性化行列Vのうち値が最も大きい行を各列ベクトル(基底ベクトル)の楽器ラベルにするという手法です。具体的には、

 \log(R_{mel,k}H^{2}_{k,m}) \Rightarrow W_{mel,i}V_{i,m}

 \displaystyle \newcommand{\argmax}{\mathop{\rm arg~max}\limits} \begin{equation} Label(m)=\argmax_{i=1,2,\cdots,I} V_{i,m}\end{equation}  \hspace{10pt} ( I は分離したい楽器の総数)

というプロセスを経て、m番目の基底ベクトルが所属する楽器のラベル番号 Label(m)  1,2,…,I のいずれか)を特定します。実装してみると分かるのですが、この手法はNMFを収束させる際にWを行方向で正規化しておくと非常に良い精度でクラスタリングできます*5

論文中には他にも、MFCCをmの軸方向に正規化したり、logを取る前に1を足したり、メル尺度を定数倍してその+1に起因する非線形性を打ち消したりと、様々な細かい工夫が記載されています。論文書くって大変ですね。

しかし、この論文の基本的なコンセプトは以上の説明に集約されていると思います。メル尺度とNMFは以前の記事でプログラミング済みなので、後はちょっとアレンジすれば簡単に実装できます。

 

Pythonで実装

さっそくソースコードを貼り付けます。かなり長いので、読み飛ばして音源分離テストの結果だけでも眺めてみてください。大部分は以前の記事でご紹介したメル尺度とNMFの計算を行う関数です。

import IPython.display as ipd ###これはjupyter notebook専用のライブラリ###
import sys
import time
import soundfile as sf
import matplotlib.pyplot as plt
import numpy as np
from scipy import signal as sg
from scipy import fftpack as fp
from sklearn.cluster import KMeans

#Function for audio pre-processing
def pre_processing(data, Fs, down_sam):
    
    #Transform stereo into monoral
    if data.ndim == 2:
        wavdata = 0.5*data[:, 0] + 0.5*data[:, 1]
    else:
        wavdata = data
    
    #Down sampling and normalization of the wave
    if down_sam is not None:
        wavdata = sg.resample_poly(wavdata, down_sam, Fs)
        Fs = down_sam
    
    return wavdata, Fs

#Function for getting STFT
def get_STFT(wav, Fs, frame_length, frame_shift):
    
    #Calculate the index of window size and overlap
    FL = round(frame_length * Fs)
    FS = round(frame_shift * Fs)
    OL = FL - FS
    
    #Execute STFT
    freqs, times, dft = sg.stft(wav, fs=Fs, window='hamm', nperseg=FL, noverlap=OL)
    arg = np.angle(dft) #Preserve the phase
    Adft = np.abs(dft) #Preserve the absolute amplitude
    Y = Adft
    
    #Display the size of input
    print("Spectrogram size (freq, time) = " + str(Y.shape))
    
    return Y, arg, Fs, freqs, times

#Function for getting inverse STFT
def get_invSTFT(Y, arg, Fs, frame_length, frame_shift):
    
    #Restrive the phase from original wave
    Y = Y * np.exp(1j*arg)
    
    #Get the inverse STFT
    FL = round(frame_length * Fs)
    FS = round(frame_shift * Fs)
    OL = FL - FS
    _, rec_wav = sg.istft(Y, fs=Fs, window='hamm', nperseg=FL, noverlap=OL)
    
    return rec_wav, Fs

#Function for removing components closing to zero
def get_nonzero(tensor):
    
    tensor = np.where(np.abs(tensor) < 1e-10, 1e-10+tensor, tensor)
    return tensor

#Function for getting basements and weights matrix by NMF
def get_NMF(Y, num_iter, num_base, loss_func, norm_H):
    
    #Initialize basements and weights based on the Y size(k, n)
    K, N = Y.shape[0], Y.shape[1]
    if num_base >= K or num_base >= N:
        print("The number of basements should be lower than input size.")
        sys.exit()
    
    #Remove Y entries closing to zero
    Y = get_nonzero(Y)
    
    #Initialize as random number
    H = np.random.rand(K, num_base) #basements (distionaries)
    U = np.random.rand(num_base, N) #weights (coupling coefficients)
    
    #Initialize loss
    loss = np.zeros(num_iter)
    
    #For a progress bar
    unit = int(np.floor(num_iter/10))
    bar = "#" + " " * int(np.floor(num_iter/unit))
    start = time.time()
    
    #In the case of squared Euclidean distance
    if loss_func == "EU":
        
        #Repeat num_iter times
        for i in range(num_iter):
            
            #Display a progress bar
            print("\rProgress:[{0}] {1}/{2} Processing..".format(bar, i, num_iter), end="")
            if i % unit == 0:
                bar = "#" * int(np.ceil(i/unit)) + " " * int(np.floor((num_iter-i)/unit))
                print("\rProgress:[{0}] {1}/{2} Processing..".format(bar, i, num_iter), end="")
            
            #Update the basements
            X = H @ U
            H = H * (Y @ U.T) / get_nonzero(X @ U.T)
            #Normalize the basements
            if norm_H == True:
                H = H / H.sum(axis=0, keepdims=True)
            
            #Update the weights
            X = H @ U
            U = U * (H.T @ Y) / get_nonzero(H.T @ X)
            
            #Normalize to ensure equal energy
            if norm_H == False:
                A = np.sqrt(np.sum(U**2, axis=1)/np.sum(H**2, axis=0))
                H = H * A[np.newaxis, :]
                U = U / A[:, np.newaxis]
            
            #Compute the loss function
            X = H @ U
            loss[i] = np.sum((Y - X)**2)
    
    #In the case of Kullback–Leibler divergence
    elif loss_func == "KL":
        
        #Repeat num_iter times
        for i in range(num_iter):
            
            #Display a progress bar
            print("\rProgress:[{0}] {1}/{2} Processing..".format(bar, i, num_iter), end="")
            if i % unit == 0:
                bar = "#" * int(np.ceil(i/unit)) + " " * int(np.floor((num_iter-i)/unit))
                print("\rProgress:[{0}] {1}/{2} Processing..".format(bar, i, num_iter), end="")
            
            #Update the basements
            X = get_nonzero(H @ U)
            denom_H = U.T.sum(axis=0, keepdims=True)
            H = H * ((Y / X) @ U.T) / get_nonzero(denom_H)
            #Normalize the basements
            if norm_H == True:
                H = H / H.sum(axis=0, keepdims=True)
            
            #Update the weights
            X = get_nonzero(H @ U)
            denom_U = H.T.sum(axis=1, keepdims=True)
            U = U * (H.T @ (Y / X)) / get_nonzero(denom_U)
            
            #Normalize to ensure equal energy
            if norm_H == False:
                A = np.sqrt(np.sum(U**2, axis=1)/np.sum(H**2, axis=0))
                H = H * A[np.newaxis, :]
                U = U / A[:, np.newaxis]
            
            #Compute the loss function
            X = get_nonzero(H @ U)
            loss[i] = np.sum(Y*np.log(Y) - Y*np.log(X) - Y + X)
    
    #In the case of Itakura–Saito divergence
    elif loss_func == "IS":
            
        #Repeat num_iter times
        for i in range(num_iter):
            
            #Display a progress bar
            print("\rProgress:[{0}] {1}/{2} Processing..".format(bar, i, num_iter), end="")
            if i % unit == 0:
                bar = "#" * int(np.ceil(i/unit)) + " " * int(np.floor((num_iter-i)/unit))
                print("\rProgress:[{0}] {1}/{2} Processing..".format(bar, i, num_iter), end="")
            
            #Update the basements
            X = get_nonzero(H @ U)
            denom_H = np.sqrt(X**-1 @ U.T)
            H = H * np.sqrt((Y / X**2) @ U.T) / get_nonzero(denom_H)
            #Normalize the basements (it is recommended when IS divergence)
            H = H / H.sum(axis=0, keepdims=True)
            
            #Update the weights
            X = get_nonzero(H @ U)
            denom_U = np.sqrt(H.T @ X**-1)
            U = U * (np.sqrt(H.T @ (Y / X**2))) / get_nonzero(denom_U)
            
            #Compute the loss function
            X = get_nonzero(X)
            loss[i] = np.sum(Y / X - np.log(Y) + np.log(X) - 1)
    
    else:
        print("The deviation shold be either 'EU', 'KL', or 'IS'.")
        sys.exit()
    
    #Finish the progress bar
    bar = "#" * int(np.ceil(num_iter/unit))
    print("\rProgress:[{0}] {1}/{2} {3:.2f}sec Completed!".format(bar, i+1, num_iter, time.time()-start), end="")
    print()
    
    return H, U, loss

#Function for plotting Spectrogram and loss curve
def display_graph(Y, X, times, freqs, loss_func, num_iter):
    
    #Plot the loss curve
    plt.rcParams["font.size"] = 16
    plt.figure(figsize=(18, 6))
    plt.subplot(1, 2, 1)
    plt.title('An original spectrogram')
    plt.xlabel('Time [sec]')
    plt.ylabel('Frequency [Hz]')
    plt.pcolormesh(times, freqs, 10*np.log10(np.abs(Y)), cmap='jet')
    plt.colorbar(orientation='horizontal').set_label('Power')
    
    plt.subplot(1, 2, 2)
    plt.title('The spectrogram approximated by NMF')
    plt.xlabel('Time [sec]')
    plt.ylabel('Frequency [Hz]')
    plt.pcolormesh(times, freqs, 10*np.log10(np.abs(X)), cmap='jet')
    plt.colorbar(orientation='horizontal').set_label('Power')
    
    #Plot the loss curve
    plt.figure(figsize=(10, 5))
    plt.plot(np.arange(1, num_iter+1), loss[:], marker='.')
    plt.title(loss_func + '_loss curve')
    plt.xlabel('Iteration')
    plt.ylabel('Loss value')
    plt.show()
    
    return

#■Function for generating Mel-scale filters■
def melFilterBank(Fs, fftsize, Mel_channel, Mel_norm, Amax):
    
    #Mel-frequency is proportional to "log(f/Mel_scale + 1)" [Default]700 or 1000
    Mel_scale = 700
    
    #Define Mel-scale parameter m0 based on "1000Mel = 1000Hz"
    m0 = 1000.0 / np.log(1000.0 / Mel_scale + 1.0)
    
    #Resolution of frequency
    df = Fs / fftsize
    
    #Define Nyquist frequency (the end point) as Hz, mel, and index scale
    Nyq = Fs / 2
    mel_Nyq = m0 * np.log(Nyq / Mel_scale + 1.0)
    n_Nyq = int(np.floor(fftsize / 2))+1
    
    #Calculate the Mel-scale interval between triangle-shaped structures
    #Divided by channel+1 because the termination is not the center of triangle but its right edge
    dmel = mel_Nyq / (Mel_channel + 1)
    
    #List up the center position of each triangle
    mel_center = np.arange(1, Mel_channel + 1) * dmel
    
    #Convert the center position into Hz-scale
    f_center = Mel_scale * (np.exp(mel_center / m0) - 1.0)
    
    #Define the center, start, and end position of triangle as index-scale
    n_center = np.round(f_center / df)
    n_start = np.hstack(([0], n_center[0 : Mel_channel - 1]))
    n_stop = np.hstack((n_center[1 : Mel_channel], [n_Nyq]))
    
    #Initial condition is defined as 0 padding matrix
    output = np.zeros((n_Nyq, Mel_channel))
    
    #Mel-scale filters are periodic triangle-shaped structures
    #Repeat every channel
    for c in np.arange(0, Mel_channel):
        
        #Slope of a triangle(growing slope)
        upslope = 1.0 / (n_center[c] - n_start[c])
        
        #Add a linear function passing through (nstart, 0) to output matrix 
        for x in np.arange(n_start[c], n_center[c]):
            #Add to output matrix
            x = int(x)
            output[x, c] = (x - n_start[c]) * upslope
        
        #Slope of a triangle(declining slope)
        dwslope = 1.0 / (n_stop[c] - n_center[c])
        
        #Add a linear function passing through (ncenter, 1) to output matrix 
        for x in np.arange(n_center[c], n_stop[c]):
            #Add to output matrix
            x = int(x)
            output[x, c] = 1.0 - ((x - n_center[c]) * dwslope)
        
        #Normalize area underneath each Mel-filter into 1
        #[Ref] T.Ganchev, N.Fakotakis, and G.Kokkinakis, Proc. of SPECOM 1, 191-194 (2005)
        #[URL] https://pdfs.semanticscholar.org/f4b9/8dbd75c87a86a8bf0d7e09e3ebbb63d14954.pdf
        if Mel_norm == True:
            output[:, c] = output[:, c] * 2 / (n_stop[c] - n_start[c])
    
    #Return Mel-scale filters as list (row=frequency, column=Mel channel)
    return output

#■Function for calculating MFCC■
def get_Melfeature(A, Fs, frame_length, frame_shift, Mel_channel, Mel_norm, MFCC_num, Amax, clu_mode):
    
    #Calculate the index of window size and overlap
    FL = round(frame_length * Fs)
    FS = round(frame_shift * Fs)
    OL = FL - FS
    
    #Call my function for generating Mel-scale filters(row: fftsize/2, column: Channel)
    filterbank = melFilterBank(Fs, FL, Mel_channel, Mel_norm, Amax)
    
    #Multiply the filters into the STFT amplitude
    melA = A.T @ filterbank
    
    #Normalization and get logarithm
    melA = melA * Amax / np.amax(melA)
    melA = np.log10(melA + 1) #Non-negative value
    
    #In the case of k-means clustering method
    if clu_mode == "kmeans":
        #Get the DCT coefficients (DCT: Discrete Cosine Transformation)
        output = fp.realtransforms.dct(melA, type=2, norm="ortho", axis=1)
        
        #Trim the MFCC features from C(0) to C(MFCC_num-1)
        output = np.array(output[:, 0:MFCC_num])
    
    #In the case of second NMF clustering method
    elif clu_mode == "2ndNMF":
        output = melA
    
    #Return MFCC or mel-spectrogram as (frames, order) numpy array
    return output

#■Main■
if __name__ == "__main__":
    
    #MFCC clustering is according to "Source-filter based clustering for monaural blind source separation"
    #[Ref] M. Spiertz and V. Gnann, (2009), Proc. International Conference on Digital Audio Effect
    #[URL] http://dafx.de/paper-archive/2009/papers/paper_13.pdf
    
    #Setup
    down_sam = None        #Downsampling rate (Hz) [Default]None
    frame_length = 0.064   #STFT window width (second) [Default]0.064
    frame_shift = 0.032    #STFT window shift (second) [Default]0.032
    num_iter = 200         #The number of iteration in NMF [Default]200
    num_base = 25          #The number of basements in NMF [Default]20~30
    loss_func = "KL"       #Select either EU, KL, or IS divergence [Default]KL
    Mel_channel = 20       #The number of frequency channel for Mel-scale filters [Default]20
    Mel_norm = True        #Normalize the area underneath each Mel-filter into 1 [Default]True
    MFCC_num = 9           #The number of MFCCs including C(0) [Default]9
    Amax = 1e4             #Normalization for log-Mel conversion [Default]1e4 (10000)
    clu_mode = "kmeans"    #Clustering mode introduced by M. Spiertz's paper [Default]kmeans or 2ndNMF
    clu_loss = "EU"        #Using 2ndNMF, select either EU, KL, or IS divergence [Default]EU
    clu_iter = 100         #Using 2ndNMF, specify the number of iteration in 2nd NMF [Default]100
    num_rep = 10           #The number of repetitions [Default]10
    
    #Define random seed
    np.random.seed(seed=32)
    
    #Repeat for each myu
    for music in ["Roads", "Que_pena_tanto_faz", "Ultimate_NZ_tour"]:
        
        #File path
        source1 = "./music/test/" + music + "/0dB_mixed.wav"
        source2 = "./music/test/" + music + "/0dB_data1.wav"
        source3 = "./music/test/" + music + "/0dB_data2.wav"
        print("\n\nMusic: {}".format(music))
        
        ### NMF step (to get basements matrix H) ###
        #Read mixed audio and true sources
        data, Fs = sf.read(source1)
        truth1, Fs = sf.read(source2)
        truth2, Fs = sf.read(source3)
        
        #Call my function for audio pre-processing
        data, Fs = pre_processing(data, Fs, down_sam)
        truth1, Fs = pre_processing(truth1, Fs, down_sam)
        truth2, Fs = pre_processing(truth2, Fs, down_sam)
        
        #Call my function for getting STFT (amplitude or power)
        print("Mixed sound")
        ipd.display(ipd.Audio(data=data, rate=Fs))
        Y, arg, Fs, freqs, times = get_STFT(data, Fs, frame_length, frame_shift)
        
        #Call my function for updating NMF basements and weights
        H, U, loss = get_NMF(Y, num_iter, num_base, loss_func, False)
        
        #Call my function for getting inverse STFT
        X = H @ U
        rec_wav, Fs = get_invSTFT(X, arg, Fs, frame_length, frame_shift)
        rec_wav = rec_wav[: int(data.shape[0])] #inverse stft includes residual part due to zero padding
        print("Reconstructed sound by NMF")
        ipd.display(ipd.Audio(data=rec_wav, rate=Fs))
        
        #Call my function for displaying graph
        display_graph(Y, X, times, freqs, loss_func, num_iter)
        
        ### Clustering step (to get label for each sound source) ###
        #In the case of k-means clustering
        if clu_mode == "kmeans":
            
            #Call my function for getting MFCCs
            MFCC = get_Melfeature(H**2, Fs, frame_length, frame_shift, Mel_channel, Mel_norm, MFCC_num, Amax, clu_mode)
            
            #Normalize MFCC along with basements-axis
            MFCC = MFCC - np.average(MFCC, axis=0)[np.newaxis, :]
            MFCC = MFCC / np.std(MFCC, axis=0)[np.newaxis, :]
            
            #Get clustering by kmeans++
            clf = KMeans(n_clusters=2, init='k-means++', n_jobs=4)
            label1 = np.array(clf.fit(MFCC).labels_)
        
        #In the case of second NMF clustering
        elif clu_mode == "2ndNMF":
            
            #Call my function for getting mel-spectrogram
            melA = get_Melfeature(H**2, Fs, frame_length, frame_shift, Mel_channel, Mel_norm, MFCC_num, Amax, clu_mode)
            
            #Call my function for getting second NMF
            W, V, loss = get_NMF(melA.T, clu_iter, 2, clu_loss, 0, 0, True)
            
            #Plot the loss curve for 2nd NMF
            plt.figure(figsize=(10, 5))
            plt.plot(np.arange(1, clu_iter+1), loss[:], marker='.')
            plt.show()
            
            #Get clustering by second NMF
            label1 = np.argmax(V, axis=0)
            
        else:
            print("The 'clu_mode' should be either 'kmeans' or 'NMF'.")
            sys.exit()
        
        print("Clustering vector a(i):{}".format(label1))
        label2 = np.ones(num_base) - label1
        label1 = label1[np.newaxis, :]
        label2 = label2[np.newaxis, :]
            
        #Decide which label corresponds to source1
        X = (H * label1) @ U
        rec_wav, Fs = get_invSTFT(X, arg, Fs, frame_length, frame_shift)
        rec_wav = rec_wav[: int(truth1.shape[0])] #inverse stft includes residual part due to zero padding
        sdr1,_,_,_,_ = get_metrics(truth1, rec_wav)
        sdr2,_,_,_,_ = get_metrics(truth2, rec_wav)
        if sdr1 > sdr2:
            H1 = H * label1
            H2 = H * label2
        else:
            H1 = H * label2
            H2 = H * label1
        
        #Get separation by using Wiener filter
        X1 = Y * (H1 @ U) / (H @ U)
        X2 = Y * (H2 @ U) / (H @ U)
        
        #Call my function for getting inverse STFT
        sep_wav1, Fs = get_invSTFT(X1, arg, Fs, frame_length, frame_shift)
        sep_wav1 = sep_wav1[: int(truth1.shape[0])] #inverse stft includes residual part due to zero padding
        sep_wav2, Fs = get_invSTFT(X2, arg, Fs, frame_length, frame_shift)
        sep_wav2 = sep_wav2[: int(truth2.shape[0])] #inverse stft includes residual part due to zero padding
        
        ### Evaluation step###
        #Display audio bar for each source
        print("True source1")
        ipd.display(ipd.Audio(data=truth1, rate=Fs))
        print("Estimated source1")
        ipd.display(ipd.Audio(data=sep_wav1, rate=Fs))
        print("True source2")
        ipd.display(ipd.Audio(data=truth2, rate=Fs))
        print("Estimated source2")
        ipd.display(ipd.Audio(data=sep_wav2, rate=Fs))

今回のコードはコマンドプロンプトからPythonを呼び出すより、jupyter notebook上で動作させた方が良いです。というのも、jupyter notebookには「IPython.display」という便利なライブラリがあり、これを使うと音源の再生バーを画面上に出せるからです。

 

音源分離のテスト

今回は、SiSEC2011のDevelopment datasetで提供されている曲「Que_pena_tanto_faz」と「Ultimate_NZ_tour」をこのアルゴリズムで分離してみます。

まずは音圧が同一(0dB)となるように「Que_pena_tanto_faz」のギターパートとボーカルパートを混合したものがこちらです。

これをSpiertzらの提案する一つ目の手法(MFCC+kmeans)で分離したところ、こうなりました。

ボーカルパートを聞くとよく分かるのですが、若干ギターの音が混じってしまっています。続いて、二つ目の手法(2ndNMF)で分離してみます。

明らかにギター音の混合が減り、一つ目の手法よりも分離性能が上がっていることが分かります。このように、特徴量の抽出方法やクラスタリングモデルは分離性能に多大な影響を与えます。

続いて、二曲目の「Ultimate_NZ_tour」でギターパートとシンセサイザーパートを混合したものがこちらです。

MFCC+kmeansで分離すると、

かなりファンシーなシンセサイザー音が残ってしまっています。2ndNMFで分離すると、

こちらも分離性能が上がっている気がします。ちなみに、こういった分離性能の客観的指標としてSDR(Source to distortion ratio)と呼ばれるMetricsがあり、研究ではこういう指標を通じて定量的に性能を測る必要があります。

PythonにはMusevalという便利な評価指標ライブラリがあるので、SDRを自分で実装する必要はなく「bss_eval_images」という関数*6を呼び出すだけで簡単に測れます。

 

今回のところは以上です。教師なし機械学習なので、1つの音楽データからすぐに分離テストができるのはとても面白いです。この手軽さが音声処理の醍醐味ですね。

稚文をお読みいただきありがとうございました。

*1:私の師事する教授曰く、欧州各国は電電公社のような組織を早々に完全民営化して一部潰れてしまったので、NTTを維持している日本などの国々に対して、音声処理の技術で後れを取ったそうです。

*2:http://sisec2011.wiki.irisa.fr/tiki-index165d.html?page=Professionally+produced+music+recordings

*3:もちろん、周波数領域(Frequency domain)から時間領域(Time domain)に戻して人間が聞けば分かるものもあるでしょうが、10~100個もの基底ベクトルをいちいち人間が聴いて分類しないといけないというのはあまりにも効率が悪いです。

*4:現実的には、ソース・フィルタ理論のモデルであそこまで自然な音声を作り出すのは難しいそうです。

*5:値の大小をラベルの判定に使うアルゴリズムなので、正規化をすることは非常に重要です。これをうっかり忘れるとスケールがめちゃくちゃになって全然分離できなかったりします。

*6:bss_eval_sources」という兄弟分のような関数もあるのですが、これは推奨されていません。詳しくはJ. Le Rouxらの「SDR-half-baked or well done? (2018)」という寄稿記事を参考にしてください。Version管理って大事なんですね。