Does anyone have example code for a sqlite pipelin

2019-01-22 22:43发布

I am looking for some example code of a SQLite pipeline in Scrapy. I know there is no built in support for it, but I'm sure it has been done. Only actual code can help me, as I only know enough Python and Scrapy to complete my very limited task, and need the code as a starting point.

5条回答
家丑人穷心不美
2楼-- · 2019-01-22 23:18

For anyone trying to solve a similar problem, I just came across a nice Sqlite Item Exproter for SQLite: https://github.com/RockyZ/Scrapy-sqlite-item-exporter.

After including it to your project settings, you can use it with:

scrapy crawl <spider name> -o sqlite.db -t sqlite

It could also be adapted to be used as an Item Pipeline instead of Item Exporter.

查看更多
来,给爷笑一个
3楼-- · 2019-01-22 23:19

The following should be compatible with:

# Python 3.6.4
# Scrapy 1.5.1
# SQLite 3.21.0+
# APSW 3.9.2.post1

If you want to use APSW, just replace sqlite as noted in the comments.

If you have your items.py look like this:

...
class NewAdsItem(Item):
    AdId        = Field()
    DateR       = Field()
    DateA       = Field()
    DateN       = Field()
    DateE       = Field()
    URL         = Field()   # AdURL

In settings.py:

ITEM_PIPELINES = { 
    'adbot.pipelines.DbPipeline':  100, 
}
SQLITE_FILE  = 'mad.db'
SQLITE_TABLE = 'ads'

In pipelines.py:

import os
import sqlite3      # pip install pysqlite3
#import apsw        # pip install apsw

from scrapy import signals
from scrapy.conf import settings
from adbot.items import NewAdsItem     # Get items from "items.py"

con = None
ikeys = None
class DbPipeline(object):
    dbfile  = settings.get('SQLITE_FILE')   # './test.db'
    dbtable = settings.get('SQLITE_TABLE')

    def __init__(self):
        self.setupDBCon()
        self.createTables()

    def setupDBCon(self):
        #self.con = apsw.Connection(self.dbfile)   # apsw
        self.con = sqlite3.connect(self.dbfile)    # sqlite3
        self.cur = self.con.cursor()

    def createTables(self):
        self.dropDbTable()
        self.createDbTable()

    def dropDbTable(self):
        print("Dropping old table: %s" % self.dbtable )
        self.cur.execute("DROP TABLE IF EXISTS %s" % self.dbtable )

    def closeDB(self):
        self.con.close()

    def __del__(self):
        self.closeDB()

    def createDbTable(self):
        print("Creating new table: %s" % self.dbtable )
        #------------------------------
        # Construct the item strings:
        #------------------------------
        # NOTE: This does not work, because items.py class re-orders the items!
        #self.ikeys = NewAdsItem.fields.keys()
        #print("Keys in creatDbTable: \t%s" % ",".join(self.ikeys) )
        #cols = " TEXT, ".join(self.ikeys)  + " TEXT"
        #print("cols:  \t%s:" % cols, flush=True)
        #------------------------------
        cols = "AdId TEXT, DateR TEXT, DateA TEXT, DateN TEXT, DateE TEXT, URL TEXT"

        # NOTE:  Use "INSERT OR IGNORE", if you also use: "AdId TEXT NOT NULL UNIQUE"
        sql = "CREATE TABLE IF NOT EXISTS %s (id INTEGER PRIMARY KEY NOT NULL, %s)" % (self.dbtable, cols )
        #print (sql)
        self.cur.execute(sql)

    def process_item(self, item, spider):
        self.storeInDb(item)
        return item

    def storeInDb(self, item):
        # NOTE:  Use "INSERT OR IGNORE", if you also use: "AdId TEXT NOT NULL UNIQUE"
        # "INSERT INTO ads ( AdId, DateR, AdURL ) VALUES (?, ?, ?)"
        sql = "INSERT INTO {0} ({1}) VALUES ({2})".format(self.dbtable, ','.join(item.keys()), ','.join(['?'] * len(item.keys())) )

        # (item.get('AdId',''),item.get('DateR',''),item.get('AdURL',''), ...)
        itkeys = ','.join(item.keys())      # item keys as a list
        itvals = ','.join(item.values())    # item values as a list
        ivals  = tuple(item.values())       # item values as a tuple

        #print (sql)
        #print("  itkeys: \t%s" % itkeys, flush=True)
        #print("  itvals: \t%s" % itvals, flush=True)
        self.cur.execute(sql, ivals)     # WARNING: Does NOT handle '[]'s ==> use: '' in spider

        self.con.commit()               # used in sqlite3 ONLY! (Remove for APSW)

You can then check your DB from command line with:

echo "select * from ads;" | sqlite3 -csv -header mad.db

WARNING:

Because a difference in how the items.py item keys are ordered when obtained via item.keys() or by importing your item class directly via self.ikeys = NewAdsItem.fields.keys(), you will find that the first case is sorted according to the order of appearance (in the file), whereas in the second case it is alphabetically ordered. This is very sad, since it creates trouble when you're trying to create the DB tables dynamically, before having executed process_item().

查看更多
乱世女痞
4楼-- · 2019-01-22 23:32

I did something like this:

#
# Author: Jay Vaughan
#
# Pipelines for processing items returned from a scrape.
# Dont forget to add pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/topics/item-pipeline.html
#
from scrapy import log
from pysqlite2 import dbapi2 as sqlite

# This pipeline takes the Item and stuffs it into scrapedata.db
class scrapeDatasqLitePipeline(object):
    def __init__(self):
        # Possible we should be doing this in spider_open instead, but okay
        self.connection = sqlite.connect('./scrapedata.db')
        self.cursor = self.connection.cursor()
        self.cursor.execute('CREATE TABLE IF NOT EXISTS myscrapedata ' \
                    '(id INTEGER PRIMARY KEY, url VARCHAR(80), desc VARCHAR(80))')

    # Take the item and put it in database - do not allow duplicates
    def process_item(self, item, spider):
        self.cursor.execute("select * from myscrapedata where url=?", item['url'])
        result = self.cursor.fetchone()
        if result:
            log.msg("Item already in database: %s" % item, level=log.DEBUG)
        else:
            self.cursor.execute(
                "insert into myscrapedata (url, desc) values (?, ?)",
                    (item['url'][0], item['desc'][0])

            self.connection.commit()

            log.msg("Item stored : " % item, level=log.DEBUG)
        return item

    def handle_error(self, e):
        log.err(e)
查看更多
祖国的老花朵
5楼-- · 2019-01-22 23:32

Here is a sqlite pipeline with sqlalchemy. With sqlalchemy you can easily change your database if ever needed.

In settings.py add database configuration

# settings.py
# ...
DATABASE = {
    'drivername': 'sqlite',
    # 'host': 'localhost',
    # 'port': '5432',
    # 'username': 'YOUR_USERNAME',
    # 'password': 'YOUR_PASSWORD',
    'database': 'books.sqlite'
}

Then in pipelines.py add the following

# pipelines.py
import logging

from scrapy import signals
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy import create_engine
from sqlalchemy.engine.url import URL
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import NullPool

logger = logging.getLogger(__name__)

DeclarativeBase = declarative_base()

class Book(DeclarativeBase):
    __tablename__ = "books"

    id = Column(Integer, primary_key=True)
    title = Column('title', String)
    author = Column('author', String)
    publisher = Column('publisher', String)
    url = Column('url', String)
    scrape_date = Column('scrape_date', DateTime)

    def __repr__(self):
        return "<Book({})>".format(self.url)


class SqlitePipeline(object):
    def __init__(self, settings):
        self.database = settings.get('DATABASE')
        self.sessions = {}

    @classmethod
    def from_crawler(cls, crawler):
        pipeline = cls(crawler.settings)
        crawler.signals.connect(pipeline.spider_opened, signals.spider_opened)
        crawler.signals.connect(pipeline.spider_closed, signals.spider_closed)
        return pipeline

    def create_engine(self):
        engine = create_engine(URL(**self.database), poolclass=NullPool, connect_args = {'charset':'utf8'})
        return engine

    def create_tables(self, engine):
        DeclarativeBase.metadata.create_all(engine, checkfirst=True)

    def create_session(self, engine):
        session = sessionmaker(bind=engine)()
        return session

    def spider_opened(self, spider):
        engine = self.create_engine()
        self.create_tables(engine)
        session = self.create_session(engine)
        self.sessions[spider] = session

    def spider_closed(self, spider):
        session = self.sessions.pop(spider)
        session.close()

    def process_item(self, item, spider):
        session = self.sessions[spider]
        book = Book(**item)
        link_exists = session.query(Book).filter_by(url=item['url']).first() is not None

        if link_exists:
            logger.info('Item {} is in db'.format(book))
            return item

        try:
            session.add(book)
            session.commit()
            logger.info('Item {} stored in db'.format(book))
        except:
            logger.info('Failed to add {} to db'.format(book))
            session.rollback()
            raise

        return item

and items.py should look like this

#items.py
import scrapy

class BookItem(scrapy.Item):
    title = scrapy.Field()
    author = scrapy.Field()
    publisher = scrapy.Field()
    scrape_date = scrapy.Field()

You may also consider to move class Book into items.py

查看更多
仙女界的扛把子
6楼-- · 2019-01-22 23:34

If you feel comfortable with twisted's adbapi, you can take as starting point this mysql pipeline: http://github.com/darkrho/scrapy-googledir-mysql/blob/master/googledir/pipelines.py

And use this line at __init__:

self.dbpool = adbapi.ConnectionPool("sqlite3", database="/path/sqlite.db")
查看更多
登录 后发表回答