Automating Twitter Video Scraping with Scrapy. Extending MediaPipeline.
Automating Twitter Video Scraping with Scrapy. Extending MediaPipeline.

In Scraping Twitter Video article we did four consecutive steps, following the same pattern: -request page; -apply some xpath to result; -compose the link of next page. If you do this in your spider you would need 4 callbacks, each composing a new scrapy.Request object. Lots of code would be the same. Given, the extraction of a video may be a part of a larger process, I don't like having this code in a spider. I'm going to demonstrate how to do it in a pipeline, reusing as much code as possible.

Part 2. Using Media Pipeline for Multiple Concurrent Downloads (Automating Twitter video scraping)
A webpage can contain few embedded twitter videos.
I assume that your code already extracted Twitter video IDs for those videos and store them in item['twitter_ids']
Then below classes come in play and extract the video itself.

The class TwitterPipelineBase was written for the steps needed for Twitter video extraction:
-request page;
-apply some xpath to result to extract some data;
-compose the link of next page and request it;
-this all done in parallel for any number of Twitter videos on the page; (Here "in parallell" just means that you fire multiple requests at the same point. The underlying Twisted framework actually queues downloads and performs them one after another)
In a subclass I basically need to override only one method extract_next_link(self, response), returning a new url from previous response.

Let's start from usage, then talk about how the base class works. The below classes implement steps described in Scraping Twitter Video article.

1
2
3
class TwitterPipeline0(TwitterPipelineBase):
1   def extract_next_link(self, response_text):
        return "https://twitter.com/i/videos/tweet/" + response_text


1 TwitterPipeline0 corresponds to the first step, so extract_next_link() recieves just a Twitter video id, stored in item['twitter_data'].
The id is used to form and return the next url.
extract_next_link() receives previous step response for each non-first step in a chain.

1
2
3
4
5
6
7
class TwitterPipeline1(TwitterPipelineBase):
    def extract_next_link(self, response):
2       config_s = response.xpath("//div[@class='player-container']/@data-config").extract_first()
        if config_s:
            config_s = config_s.encode("ascii", "ignore") 
            config = json.loads(config_s)
            return config['vmap_url']


2 Extracting next url. See article Twitter video extraction for details.

1
2
3
4
5
6
7
class TwitterPipeline2(TwitterPipelineBase):
    def extract_next_link(self, response):
3       mp4_lnk = response.xpath("//MediaFile/text()").extract_first()
        if mp4_lnk:
            mp4_lnk = mp4_lnk.strip() 
            log("Downloading %i twitter %s" % (response.meta['data_num'], response.meta['item']['raw_url']))
            return mp4_lnk


3.Extracting next url. See article Twitter video extraction for details.

1
2
3
4
5
6
7
8
4   def media_downloaded(self, response, request, info):
        try:
	    """ Use response.meta to get additional info """
            data_num = response.meta['data_num']
            """ Save downloaded file from response.body. """
...
        except Exception as e:
            format_exc(self, "media_downloaded", e)


4.This is the last step in a chain. Finally getting the video here. Saving results. media_downloaded() is overridden in this class only, because we don't need to save response for the next step, but save data.

The pipeline works as follows.
TwitterPipelineBase sublcasses are supposed to form a chain, repeating a request chain.

1. A instance receives an incoming data from item['twitter_data']. Incoming data is just some string (Twitter video id) for a first pipeline in a chain and scrapy.Response object for any pipeline which is not the first in a chain.
2. extract_next_link() method is called to get next url
3. The next request is done and response is saved back to item['twitter_data'] which is passed to the next instance of this pipeline (next step of video extraction)
4. Go to step 1.

The above is done for the number of Twitter ids. item['twitter_data'] is actually a list, holding all all previous responses on each step.
It is important that MediaPipeline finish all requests before passing an item to the next instance, so that we can reuse the same container item['twitter_data'] for each step. This is the key point of an alogorithm. Only after media_downloaded() is called for all pending requests in TwitterPipeline0, an item will be passed to TwitterPipeline1's get_media_requests() first call.



 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class TwitterPipelineBase(MediaPipelineNoFilter):
    """Get either a list or a single link string"""
    def extract_next_link(self, response_text, item):
        raise NotImplementedError()

    def start_state_if_needed(self, item, data_num):
        pass

    def get_media_requests(self, item, info):
        item_url = item['raw_url']
        try:
            data_cnt = len(item['twitter_data'])
            if data_cnt:
                log("%s extracting for %s" % (self.__class__.__name__, item['raw_url']), DEBUG)
            for data_num in range(0, data_cnt):
1                prev_response = item['twitter_data'][data_num]
                item['twitter_data'][data_num] = None 
                if not prev_response:
                    log("Stopped num %i for %s" % (data_num, item_url), INFO)
                else:
                    err_msg = ""
                    try:
2                        next_link = self.extract_next_link(prev_response)
                    except Exception as e:
                        err_msg = str(e) 
                        next_link = None
                    if next_link:
                       self.start_state_if_needed(item, data_num)
                        log("%i requesting %s for %s" % (data_num, next_link, item_url), DEBUG)
3                        yield scrapy.Request(
                            url=next_link,
                            method="GET",
                            headers={
                                "Accept" : "*/*",
                                "User-Agent" : "Mozilla",
                            },
                            meta={'item':item, 'data_num':data_num},
                        )
                    else:
                        item['vlog'](("data_num %i\n" % data_num) + prev_response.body)
                        log("Extraction failed num %i: %s for %s" % (data_num, err_msg, item_url), DEBUG)

        except Exception as e:
            format_exc(self, "get_media_requests %s" % item_url, e)

    def media_downloaded(self, response, request, info):
        item = response.meta['item']
        try:
            data_num = response.meta['data_num']
4            item['twitter_data'][data_num] = response
        except Exception as e:
            format_exc(self, "media_downloaded %s" % item['raw_url'], e)

    def media_failed(self, failure, request, info):
        item = request.meta['item']
        try:
            log("%s failed: %s" % (self.__class__.__name__, str(failure)), ERROR)
        except Exception as e:
            format_exc(self, "media_failed %s" % item['raw_url'], e)


1 Getting response from previous step. This and below is done for each item['twitter_data'] list item.
2 Extracting the next link, by calling method which is supposed to be overridden in a subclass
3 Fire request with the next link
4 Save response for the next step

Although this pipeline is created especially for Twitter, the same approach can be used for any website, when you need to extract few videos from a page and each of them need a request/extraction chain.