# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db.models import Field, Lookup, Transform
from django.db.models.lookups import RegisterLookupMixin
from django.utils import six
from django.core import checks, exceptions
from django.utils.translation import string_concat, ugettext_lazy as _
from django.db import models
import re
__all__ = ('TSVectorField', 'TSVectorBaseField', 'TSVectorTsQueryLookup',
'TSVectorSearchLookup', 'TSVectorISearchLookup',
'DictionaryTransform')
"""
pg_fts.fields
-------------
Implementation of postgres full text search
@author: David Miguel
"""
tsvector_re = re.compile(r'[^\w &:\|\!\*\']', flags=re.U)
search_re = re.compile(r'[^\w ]', flags=re.U)
[docs]class TSVectorBaseField(Field):
"""
Base vector field
:param dictionary: Dictionary name as is in PostgreSQL
```pg_catalog.pg_ts_config``` more information in
:pg_docs:`PostgreSQL documentation 12.6. Dictionaries
<textsearch-dictionaries.html>`
:raises: exceptions.FieldError if lookup isn't tsquery, search or isearch
"""
valid_lookups = ('search', 'isearch', 'tsquery')
empty_strings_allowed = True
def __init__(self, dictionary='english', **kwargs):
"""Vector field"""
kwargs['null'] = True
kwargs['default'] = ''
kwargs['editable'] = False
kwargs['serialize'] = False
kwargs['db_index'] = False # use migrations for index creation
self.dictionary = dictionary
super(TSVectorBaseField, self).__init__(**kwargs)
def db_type(self, connection):
return 'tsvector'
def get_db_prep_lookup(self, lookup_type, value, connection,
prepared=False):
if lookup_type not in self.valid_lookups:
raise exceptions.FieldError("'%s' isn't valid Lookup for %s" % (
lookup_type, self.__class__.__name__))
return [self.get_db_prep_value(
self._get_db_prep_lookup(lookup_type, value),
connection=connection,
prepared=prepared)]
@staticmethod
def _get_db_prep_lookup(lookup_type, value):
if lookup_type in ('search', 'isearch'):
operation = ' & ' if lookup_type == 'search' else ' | '
return "%s" % operation.join(search_re.sub('', value).split())
elif lookup_type == 'tsquery':
return "%s" % ' '.join(tsvector_re.sub('', value).split())
def deconstruct(self):
name, path, args, kwargs = super(TSVectorBaseField, self).deconstruct()
path = 'pg_fts.fields.TSVectorBaseField'
kwargs['dictionary'] = self.dictionary
return name, path, args, kwargs
[docs]class TSVectorField(TSVectorBaseField):
"""
:param fields: A tuple containing a tuple of fields and rank to be indexed,
it can be only the field name the default the rank 'D' will be added
Example::
('field_name', ('field_name2', 'A'))
Will result in::
(('field_name', 'D'), ('field_name2', 'A'))
:param dictionary: available options:
- Can be string with the dictionary name ```pg_catalog.pg_ts_config```
consult :pg_docs:`PostgreSQL documentation 12.6. Dictionaries
<textsearch-dictionaries.html>`
- A TextField name in case of multiple dictionaries
.. caution::
Dictionary(ies) used must be installed in your database, check
``pg_catalog.pg_ts_config``
:raises: exceptions.FieldError if lookup isn't tsquery, search or isearch
or not a valid option dictionary (in case of multiple dictionaries)
.. caution::
TSVectorField does not support iexact, it will raise an exception
"""
DEFAUL_RANK = 'D'
RANK_LEVELS = ('A', 'B', 'C', 'D')
default_error_messages = {
'fields_error': _('Fields must be tuple or list of fields:'),
'index_error': _('Invalid index:'),
}
def __init__(self, fields, dictionary='english', **kwargs):
self.fields = fields
super(TSVectorField, self).__init__(dictionary, **kwargs)
def _get_fields_and_ranks(self):
for field in self.fields:
if isinstance(field, (tuple, list)):
yield (self.model._meta.get_field_by_name(field[0])[0],
field[1].upper())
else:
yield (self.model._meta.get_field_by_name(field)[0],
self.DEFAUL_RANK)
def check(self, **kwargs):
errors = super(TSVectorField, self).check(**kwargs)
for f in self.fields:
field = None
if isinstance(f, (tuple, list)):
if len(f) > 2:
errors.append(
checks.Error(
'Invalid value in fields "%s"' % (str(f),),
hint='can be "field" or ("field", "rank")',
obj=self,
id='fts.E001'
)
)
else:
field = f[0]
if f[1].upper() not in self.RANK_LEVELS:
errors.append(
checks.Error(
'Invalid rank "%s" in "%s"' % (str(f[1]), f),
hint=('Available ranks %s' %
' or '.join(self.RANK_LEVELS)),
obj=self,
id='fts.E001'
)
)
else:
field = f
if field and not isinstance(field, six.string_types):
errors.append(
checks.Error(
'Invalid value in fields "%s"' % (str(f),),
hint='can be name of field or (field, rank)',
obj=self,
id='fts.E001'
)
)
elif field:
try:
t = self.model._meta.get_field_by_name(field)[0]
if not isinstance(t, (models.CharField, models.TextField)):
errors.append(
checks.Error(
'Field must be CharField or TextField.',
hint=None,
obj=self,
id='fts.E001'
)
)
except models.FieldDoesNotExist:
errors.append(
checks.Error(
'FieldDoesNotExistFieldDoesNotExist %s' % str(f),
hint=None,
obj=self,
id='fts.E001'
)
)
return errors
@property
def description(self):
return 'TSVector of (%s)' % ', '.join(self.fields)
def deconstruct(self):
name, path, args, kwargs = super(TSVectorField, self).deconstruct()
path = 'pg_fts.fields.TSVectorField'
kwargs['fields'] = self.fields
return name, path, args, kwargs
def get_dictionary(self):
try:
df = self.model._meta.get_field(self.dictionary)
return df.default if df.default else df.options[0][0]
except models.FieldDoesNotExist:
return self.dictionary
def get_transform(self, name):
transform = super(TSVectorField, self).get_transform(name)
if transform:
return transform
try:
if name in map(
lambda x: x[1],
self.model._meta.get_field(self.dictionary).choices):
return DictionaryTransformFactory(name)
else:
raise exceptions.FieldError("The '%s' is not in %s choices" % (
name, self.model._meta.get_field(self.dictionary))
)
except ValueError:
pass
[docs]class TSVectorTsQueryLookup(Lookup):
"""
TSVectorField Lookup tsquery
Raw to_tsquery lookup, check the documentation for more details
:pg_docs:`12.3.2. Parsing Queries
<textsearch-controls.html#TEXTSEARCH-PARSING-QUERIES>`
:param query values: valid PostgreSQL tsquery
.. caution::
If the query is not a valid PostgreSQL to_tsquery will result in a
syntax error
Example::
Article.objects.filter(
tsvector__tsquery="'single-quoted phrases' & prefix:A*B & !not | or | weight:ABC"
)
SQL equivalent:
.. code-block:: sql
"tsvector" @@ to_tsquery('english', '''singlequoted phrases'' & prefix:A*B & !not | or | weight:ABC')
.. note::
The lookup will get dictionary value in
:class:`~pg_fts.fields.TSVectorField`, this case *english*
In case of multiple dictionaries see
:class:`~pg_fts.fields.DictionaryTransform`
"""
lookup_name = 'tsquery'
lookup_sql = "%s @@ to_tsquery('%s', %s)"
def as_sql(self, qn, connection):
lhs, lhs_params = self.process_lhs(qn, connection)
rhs, rhs_params = self.process_rhs(qn, connection)
params = lhs_params + rhs_params
if hasattr(self.lhs, 'dictionary'):
dictionary = self.lhs.dictionary
else:
dictionary = self.lhs.source.get_dictionary()
return self.lookup_sql % (lhs, dictionary, rhs), params
@property
def output_field(self):
return TSVectorBaseField(self.dictionary)
TSVectorBaseField.register_lookup(TSVectorTsQueryLookup)
[docs]class TSVectorSearchLookup(TSVectorTsQueryLookup):
"""
TSVectorField Lookup search
An to_tsquery with AND *&* operator
:param query values: a string with words
Example::
Article.objects.filter(
tsvector__search="an and query"
)
SQL equivalent:::
.. code-block:: sql
"tsvector" @@ to_tsquery('english', 'an & and & query')
.. note::
The lookup will get dictionary value in
:class:`~pg_fts.fields.TSVectorField`, this case *english*
In case of multiple dictionaries see
:class:`~pg_fts.fields.DictionaryTransform`
"""
lookup_name = 'search'
TSVectorBaseField.register_lookup(TSVectorSearchLookup)
[docs]class TSVectorISearchLookup(TSVectorTsQueryLookup):
"""
TSVectorField Lookup isearch
An to_tsquery with OR *|* operator
:param query values: a string with words
Example::
Article.objects.filter(
tsvector__isearch="an and query"
)
SQL equivalent:
.. code-block:: sql
"tsvector" @@ to_tsquery('english', 'an | and | query')
.. note::
The lookup will get dictionary value in
:class:`~pg_fts.fields.TSVectorField`, this case *english*
In case of multiple dictionaries see
:class:`~pg_fts.fields.DictionaryTransform`
"""
lookup_name = 'isearch'
TSVectorBaseField.register_lookup(TSVectorISearchLookup)
class DictionaryTransformFactory(object):
def __init__(self, dictionary):
self.dictionary = dictionary
def __call__(self, *args, **kwargs):
return DictionaryTransform(self.dictionary, *args, **kwargs)