Skip to content

Extractors & Generators



Handlers for extracting patterns from text and generating text from given data

HeadlineExtractor

Extracting headlines with a given patterns

Source code in app/handlers.py
class HeadlineExtractor:
    """
    Extracting headlines with a given patterns
    """

    def __init__(self, patterns, banned_words, locations):
        """
        Initiate patterns, spacy matcher and add patterns to that matcher
        """
        self.patterns = patterns
        self.matcher = DependencyMatcher(nlp.vocab)
        self.matcher.add("HOME", patterns)
        self.banned_words = banned_words
        self.locations = locations

    @staticmethod
    def extract_all_matches(matches):
        """
        Filer matches with descending order and not repeating attributes

        Parameters
        ----------
        matches : list
            a list of tuples with pattern id and extracted attributes ids' list

        Returns
        -------
        all_matches : list
            a list of filtered matches lists
        """

        all_matches = []
        for match in matches:
            if match[-1] == sorted(match[-1], reverse=True):
                sorted_match = sorted(set(match[-1]))
                all_matches.append(sorted_match)
        all_matches = sorted(all_matches, key=len)
        return all_matches

    @staticmethod
    def extract_best_headlines(filtered_matches, filtered_headlines):
        """
        Filer the longest matches from the intersected matches

        Parameters
        ----------
        filtered_matches : list
            a list of filtered matches list
        filtered_headlines : list
            a list of filtered headlines list
        Returns
        -------
        best_matches : list
            a list of the filtered matches
        best_headlines : list
            a list of the filtered headlines
        """
        best_matches = []
        best_headlines = []
        for i, match in enumerate(filtered_matches):
            if len(set(match).intersection(
                    list(itertools.chain(*filtered_matches[i + 1:])))) != len(match):
                best_matches.append(match)
                best_headlines.append(filtered_headlines[i])
        best_headlines = sorted(best_headlines, key=len, reverse=True)
        return best_headlines, best_matches

    def restore_headlines(self, doc, all_matches):
        """
        Add the missing tokens of the headline
        Parameters
        ----------
        doc : spacy.tokens.doc.Doc
            sequence of remark tokens
        all_matches : list
            all extracted matches

        Returns
        -------
        restored_headlines : list
            headlines with missing tokens
        restored_matches : list
            matches with missing tokens indexes
        loc_filter_headlines : list
            parts of remark between minimum and maximum indexes of each match
        """
        loc_filter_headlines = []
        restored_headlines = []
        restored_matches = []
        for match in all_matches:
            loc_filter_headlines.append(doc[min(match):max(match) + 1].text.lower())
            headline = ""
            home_syn = doc[match[-1]].text.lower()
            headline_tokens = []
            for i in match:

                if i in headline_tokens:
                    continue
                # don't extract headlines with banned words
                if doc[i].text.lower() in self.banned_words.always_banned:
                    headline = ""
                    break
                if (doc[i].text.lower() in self.banned_words.banned_in_short) & (len(match) == 2):
                    headline = ""
                    break
                # don't extract headlines with double home synonyms
                if (doc[i].text.lower() == home_syn) & (i < match[-1]):
                    continue
                # take previous and next items of -
                if doc[i].text == "-":
                    if (i + 1 not in headline_tokens) & (i - 1 not in headline_tokens):
                        headline_tokens.append(i - 1)
                        headline_tokens.append(i)
                        headline_tokens.append(i + 1)
                        headline = headline + doc[i - 1].text + doc[i].text + doc[i + 1].text + " "
                    elif i + 1 not in match:
                        headline_tokens.append(i)
                        headline_tokens.append(i + 1)
                        headline = headline.strip() + doc[i].text + doc[i + 1].text + " "
                    elif i - 1 not in match:
                        headline_tokens.append(i - 1)
                        headline_tokens.append(i)
                        headline = headline + doc[i - 1].text + doc[i].text
                    else:
                        headline_tokens.append(i)
                        headline = headline.strip() + doc[i].text
                # don't check i+1 when i is the last
                elif i + 1 >= len(doc):
                    headline_tokens.append(i)
                    headline = headline + doc[i].text
                # check the next item is -
                elif doc[i + 1].text in ["-", "/"]:
                    if (i + 1 not in headline_tokens) and (i + 2 not in headline_tokens):
                        headline_tokens.append(i)
                        headline_tokens.append(i + 1)
                        headline_tokens.append(i + 2)
                        headline = headline + doc[i].text + doc[i + 1].text + doc[i + 2].text + " "
                    elif (i + 1 not in headline_tokens) and (i + 2 in headline_tokens):
                        headline_tokens.append(i)
                        headline_tokens.append(i + 1)
                        headline = headline + doc[i].text + doc[i + 1].text
                # second word of - in match
                elif doc[i - 1].text in ["-", "/"]:
                    if (i - 1 not in headline_tokens) & (i - 2 not in headline_tokens) & (
                            i - 2 >= 0):
                        headline_tokens.append(i - 2)
                        headline_tokens.append(i - 1)
                        headline_tokens.append(i)
                        headline = headline + doc[i - 2].text + doc[i - 1].text + doc[i].text + " "
                    elif (i - 1 not in headline_tokens) & (i - 2 in headline_tokens) & (i - 2 >= 0):
                        headline_tokens.append(i - 1)
                        headline_tokens.append(i)
                        headline = headline.strip() + doc[i - 1].text + doc[i].text + " "

                else:
                    headline_tokens.append(i)
                    headline = headline + doc[i].text + " "

            headline = headline.strip()
            headline = headline.replace("- ", "-")

            restored_headlines.append(headline)
            restored_matches.append(headline_tokens)
        return restored_headlines, restored_matches, loc_filter_headlines

    def filter_headlines(self, doc, restored_headlines, restored_matches, loc_filter_headlines,
                         min_length):
        """
        Filter matches that don't contain any location and which are longer than minimum length

        Parameters
        ----------
        doc : spacy.tokens.doc.Doc
            sequence of remark tokens
        restored_headlines : list
            headlines with missing tokens
        restored_matches : list
            matches with missing tokens indexes
        loc_filter_headlines : list
            parts of remark between minimum and maximum indexes of each match
        min_length : int
            minimum length of each headline

        Returns
        -------
        filtered_headlines : list
            Longer than minimum length headlines without locations
        filtered matches : list
            Matches of filtered headlines
        """

        filtered_headlines = []
        filtered_matches = []
        entities = [ent.text for ent in doc.ents if (ent.label_ == "GPE") | (ent.label_ == "LOC")]

        for i, headline in enumerate(restored_headlines):
            if len(headline) >= min_length:
                headline_split = headline.split()
                headline_lower_split = headline.lower().split()
                headline = headline.lower().capitalize()
                filtered_headlines.append(headline)
                filtered_matches.append(restored_matches[i])

                for entity in entities:  # remove cases when token is a part of geo name
                    entity_split = entity.split()
                    if (len(set(entity_split).intersection(headline_split)) > 0) & \
                            (len(set(entity_split).intersection(headline_split)) <= len(
                                entity_split)):
                        filtered_headlines.remove(headline)
                        filtered_matches.remove(restored_matches[i])
                        break

                if headline in filtered_headlines:  # remove cases when US city state, county or part of them in headline

                    for loc in self.locations:
                        loc = loc.split()
                        # check if location word has intersection with headline
                        if len(set(loc).intersection(headline_lower_split)) > 0:
                            # if it is subset of a large headline remove it
                            if set(loc).issubset(loc_filter_headlines[i].split()):
                                filtered_headlines.remove(headline)
                                filtered_matches.remove(restored_matches[i])
                                break
                            else:
                                # go back if there are intersection and check again
                                l = len(loc)
                                min_ind = max(min(restored_matches[i]) - l + 1, 0)
                                max_ind = max(restored_matches[i])
                                headline_with_loc_in_start = doc[min_ind:max_ind + 1].text.lower()

                                if set(loc).issubset(headline_with_loc_in_start.split()):
                                    filtered_headlines.remove(headline)
                                    filtered_matches.remove(restored_matches[i])
                                    break

        return filtered_headlines, filtered_matches

    @staticmethod
    def extract_all_headlines(doc, all_matches):
        """
        Extract headlines from matches
        Parameters
        ----------
        doc : spacy.tokens.doc.Doc
            sequence of remark tokens
        all_matches : list
            all extracted matches

        Returns
        -------
        all_headlines : list
            extracted headlines from matches
        """
        all_headlines = []
        for match in all_matches:
            headline = ""
            for i in match:
                headline = headline + doc[i].text + " "
            all_headlines.append(headline)

        return all_headlines

    def extract_headlines(self, remark, min_length):
        """
        Extract headlines which length are greater than min_length

        Parameters
        ----------
        remark : str
            remark of a house
        min_length : int
            minimum length of each headline

        Returns
        -------
        best_headlines : list
            best extracted headlines
        all_headlines : list
            all extracted headlines
        """
        if remark is None:
            return [], []
        doc = nlp(remark)

        matches = self.matcher(doc)

        all_matches = self.extract_all_matches(matches)
        all_headlines = self.extract_all_headlines(doc, all_matches)

        restored_headlines, restored_matches, loc_filter_headlines = self.restore_headlines(doc,
                                                                                            all_matches)

        filtered_headlines, filtered_matches = self.filter_headlines(doc, restored_headlines,
                                                                     restored_matches,
                                                                     loc_filter_headlines,
                                                                     min_length)

        best_headlines, best_matches = self.extract_best_headlines(filtered_matches,
                                                                   filtered_headlines)

        return set(best_headlines), all_headlines

    @staticmethod
    def list_sentences(remark):
        """
        Return a list of sentences of a given text

        Parameters
        ----------
        remark : str
            text to sentencize

        Returns
        -------
        out : List of str
            sentences
        """
        if not remark:
            return []
        sentences = [sent.text.strip() for sent in nlp(remark).sents]
        return sentences

__init__(patterns, banned_words, locations)

Initiate patterns, spacy matcher and add patterns to that matcher

Source code in app/handlers.py
def __init__(self, patterns, banned_words, locations):
    """
    Initiate patterns, spacy matcher and add patterns to that matcher
    """
    self.patterns = patterns
    self.matcher = DependencyMatcher(nlp.vocab)
    self.matcher.add("HOME", patterns)
    self.banned_words = banned_words
    self.locations = locations

extract_all_matches(matches) staticmethod

Filer matches with descending order and not repeating attributes

Parameters:

Name Type Description Default
matches list

a list of tuples with pattern id and extracted attributes ids' list

required

Returns:

Name Type Description
all_matches list

a list of filtered matches lists

Source code in app/handlers.py
@staticmethod
def extract_all_matches(matches):
    """
    Filer matches with descending order and not repeating attributes

    Parameters
    ----------
    matches : list
        a list of tuples with pattern id and extracted attributes ids' list

    Returns
    -------
    all_matches : list
        a list of filtered matches lists
    """

    all_matches = []
    for match in matches:
        if match[-1] == sorted(match[-1], reverse=True):
            sorted_match = sorted(set(match[-1]))
            all_matches.append(sorted_match)
    all_matches = sorted(all_matches, key=len)
    return all_matches

extract_best_headlines(filtered_matches, filtered_headlines) staticmethod

Filer the longest matches from the intersected matches

Parameters:

Name Type Description Default
filtered_matches list

a list of filtered matches list

required
filtered_headlines list

a list of filtered headlines list

required

Returns:

Name Type Description
best_matches list

a list of the filtered matches

best_headlines list

a list of the filtered headlines

Source code in app/handlers.py
@staticmethod
def extract_best_headlines(filtered_matches, filtered_headlines):
    """
    Filer the longest matches from the intersected matches

    Parameters
    ----------
    filtered_matches : list
        a list of filtered matches list
    filtered_headlines : list
        a list of filtered headlines list
    Returns
    -------
    best_matches : list
        a list of the filtered matches
    best_headlines : list
        a list of the filtered headlines
    """
    best_matches = []
    best_headlines = []
    for i, match in enumerate(filtered_matches):
        if len(set(match).intersection(
                list(itertools.chain(*filtered_matches[i + 1:])))) != len(match):
            best_matches.append(match)
            best_headlines.append(filtered_headlines[i])
    best_headlines = sorted(best_headlines, key=len, reverse=True)
    return best_headlines, best_matches

restore_headlines(doc, all_matches)

Add the missing tokens of the headline

Parameters:

Name Type Description Default
doc Doc

sequence of remark tokens

required
all_matches list

all extracted matches

required

Returns:

Name Type Description
restored_headlines list

headlines with missing tokens

restored_matches list

matches with missing tokens indexes

loc_filter_headlines list

parts of remark between minimum and maximum indexes of each match

Source code in app/handlers.py
def restore_headlines(self, doc, all_matches):
    """
    Add the missing tokens of the headline
    Parameters
    ----------
    doc : spacy.tokens.doc.Doc
        sequence of remark tokens
    all_matches : list
        all extracted matches

    Returns
    -------
    restored_headlines : list
        headlines with missing tokens
    restored_matches : list
        matches with missing tokens indexes
    loc_filter_headlines : list
        parts of remark between minimum and maximum indexes of each match
    """
    loc_filter_headlines = []
    restored_headlines = []
    restored_matches = []
    for match in all_matches:
        loc_filter_headlines.append(doc[min(match):max(match) + 1].text.lower())
        headline = ""
        home_syn = doc[match[-1]].text.lower()
        headline_tokens = []
        for i in match:

            if i in headline_tokens:
                continue
            # don't extract headlines with banned words
            if doc[i].text.lower() in self.banned_words.always_banned:
                headline = ""
                break
            if (doc[i].text.lower() in self.banned_words.banned_in_short) & (len(match) == 2):
                headline = ""
                break
            # don't extract headlines with double home synonyms
            if (doc[i].text.lower() == home_syn) & (i < match[-1]):
                continue
            # take previous and next items of -
            if doc[i].text == "-":
                if (i + 1 not in headline_tokens) & (i - 1 not in headline_tokens):
                    headline_tokens.append(i - 1)
                    headline_tokens.append(i)
                    headline_tokens.append(i + 1)
                    headline = headline + doc[i - 1].text + doc[i].text + doc[i + 1].text + " "
                elif i + 1 not in match:
                    headline_tokens.append(i)
                    headline_tokens.append(i + 1)
                    headline = headline.strip() + doc[i].text + doc[i + 1].text + " "
                elif i - 1 not in match:
                    headline_tokens.append(i - 1)
                    headline_tokens.append(i)
                    headline = headline + doc[i - 1].text + doc[i].text
                else:
                    headline_tokens.append(i)
                    headline = headline.strip() + doc[i].text
            # don't check i+1 when i is the last
            elif i + 1 >= len(doc):
                headline_tokens.append(i)
                headline = headline + doc[i].text
            # check the next item is -
            elif doc[i + 1].text in ["-", "/"]:
                if (i + 1 not in headline_tokens) and (i + 2 not in headline_tokens):
                    headline_tokens.append(i)
                    headline_tokens.append(i + 1)
                    headline_tokens.append(i + 2)
                    headline = headline + doc[i].text + doc[i + 1].text + doc[i + 2].text + " "
                elif (i + 1 not in headline_tokens) and (i + 2 in headline_tokens):
                    headline_tokens.append(i)
                    headline_tokens.append(i + 1)
                    headline = headline + doc[i].text + doc[i + 1].text
            # second word of - in match
            elif doc[i - 1].text in ["-", "/"]:
                if (i - 1 not in headline_tokens) & (i - 2 not in headline_tokens) & (
                        i - 2 >= 0):
                    headline_tokens.append(i - 2)
                    headline_tokens.append(i - 1)
                    headline_tokens.append(i)
                    headline = headline + doc[i - 2].text + doc[i - 1].text + doc[i].text + " "
                elif (i - 1 not in headline_tokens) & (i - 2 in headline_tokens) & (i - 2 >= 0):
                    headline_tokens.append(i - 1)
                    headline_tokens.append(i)
                    headline = headline.strip() + doc[i - 1].text + doc[i].text + " "

            else:
                headline_tokens.append(i)
                headline = headline + doc[i].text + " "

        headline = headline.strip()
        headline = headline.replace("- ", "-")

        restored_headlines.append(headline)
        restored_matches.append(headline_tokens)
    return restored_headlines, restored_matches, loc_filter_headlines

filter_headlines(doc, restored_headlines, restored_matches, loc_filter_headlines, min_length)

Filter matches that don't contain any location and which are longer than minimum length

Parameters:

Name Type Description Default
doc Doc

sequence of remark tokens

required
restored_headlines list

headlines with missing tokens

required
restored_matches list

matches with missing tokens indexes

required
loc_filter_headlines list

parts of remark between minimum and maximum indexes of each match

required
min_length int

minimum length of each headline

required

Returns:

Name Type Description
filtered_headlines list

Longer than minimum length headlines without locations

filtered matches : list

Matches of filtered headlines

Source code in app/handlers.py
def filter_headlines(self, doc, restored_headlines, restored_matches, loc_filter_headlines,
                     min_length):
    """
    Filter matches that don't contain any location and which are longer than minimum length

    Parameters
    ----------
    doc : spacy.tokens.doc.Doc
        sequence of remark tokens
    restored_headlines : list
        headlines with missing tokens
    restored_matches : list
        matches with missing tokens indexes
    loc_filter_headlines : list
        parts of remark between minimum and maximum indexes of each match
    min_length : int
        minimum length of each headline

    Returns
    -------
    filtered_headlines : list
        Longer than minimum length headlines without locations
    filtered matches : list
        Matches of filtered headlines
    """

    filtered_headlines = []
    filtered_matches = []
    entities = [ent.text for ent in doc.ents if (ent.label_ == "GPE") | (ent.label_ == "LOC")]

    for i, headline in enumerate(restored_headlines):
        if len(headline) >= min_length:
            headline_split = headline.split()
            headline_lower_split = headline.lower().split()
            headline = headline.lower().capitalize()
            filtered_headlines.append(headline)
            filtered_matches.append(restored_matches[i])

            for entity in entities:  # remove cases when token is a part of geo name
                entity_split = entity.split()
                if (len(set(entity_split).intersection(headline_split)) > 0) & \
                        (len(set(entity_split).intersection(headline_split)) <= len(
                            entity_split)):
                    filtered_headlines.remove(headline)
                    filtered_matches.remove(restored_matches[i])
                    break

            if headline in filtered_headlines:  # remove cases when US city state, county or part of them in headline

                for loc in self.locations:
                    loc = loc.split()
                    # check if location word has intersection with headline
                    if len(set(loc).intersection(headline_lower_split)) > 0:
                        # if it is subset of a large headline remove it
                        if set(loc).issubset(loc_filter_headlines[i].split()):
                            filtered_headlines.remove(headline)
                            filtered_matches.remove(restored_matches[i])
                            break
                        else:
                            # go back if there are intersection and check again
                            l = len(loc)
                            min_ind = max(min(restored_matches[i]) - l + 1, 0)
                            max_ind = max(restored_matches[i])
                            headline_with_loc_in_start = doc[min_ind:max_ind + 1].text.lower()

                            if set(loc).issubset(headline_with_loc_in_start.split()):
                                filtered_headlines.remove(headline)
                                filtered_matches.remove(restored_matches[i])
                                break

    return filtered_headlines, filtered_matches

extract_all_headlines(doc, all_matches) staticmethod

Extract headlines from matches

Parameters:

Name Type Description Default
doc Doc

sequence of remark tokens

required
all_matches list

all extracted matches

required

Returns:

Name Type Description
all_headlines list

extracted headlines from matches

Source code in app/handlers.py
@staticmethod
def extract_all_headlines(doc, all_matches):
    """
    Extract headlines from matches
    Parameters
    ----------
    doc : spacy.tokens.doc.Doc
        sequence of remark tokens
    all_matches : list
        all extracted matches

    Returns
    -------
    all_headlines : list
        extracted headlines from matches
    """
    all_headlines = []
    for match in all_matches:
        headline = ""
        for i in match:
            headline = headline + doc[i].text + " "
        all_headlines.append(headline)

    return all_headlines

extract_headlines(remark, min_length)

Extract headlines which length are greater than min_length

Parameters:

Name Type Description Default
remark str

remark of a house

required
min_length int

minimum length of each headline

required

Returns:

Name Type Description
best_headlines list

best extracted headlines

all_headlines list

all extracted headlines

Source code in app/handlers.py
def extract_headlines(self, remark, min_length):
    """
    Extract headlines which length are greater than min_length

    Parameters
    ----------
    remark : str
        remark of a house
    min_length : int
        minimum length of each headline

    Returns
    -------
    best_headlines : list
        best extracted headlines
    all_headlines : list
        all extracted headlines
    """
    if remark is None:
        return [], []
    doc = nlp(remark)

    matches = self.matcher(doc)

    all_matches = self.extract_all_matches(matches)
    all_headlines = self.extract_all_headlines(doc, all_matches)

    restored_headlines, restored_matches, loc_filter_headlines = self.restore_headlines(doc,
                                                                                        all_matches)

    filtered_headlines, filtered_matches = self.filter_headlines(doc, restored_headlines,
                                                                 restored_matches,
                                                                 loc_filter_headlines,
                                                                 min_length)

    best_headlines, best_matches = self.extract_best_headlines(filtered_matches,
                                                               filtered_headlines)

    return set(best_headlines), all_headlines

list_sentences(remark) staticmethod

Return a list of sentences of a given text

Parameters:

Name Type Description Default
remark str

text to sentencize

required

Returns:

Name Type Description
out List of str

sentences

Source code in app/handlers.py
@staticmethod
def list_sentences(remark):
    """
    Return a list of sentences of a given text

    Parameters
    ----------
    remark : str
        text to sentencize

    Returns
    -------
    out : List of str
        sentences
    """
    if not remark:
        return []
    sentences = [sent.text.strip() for sent in nlp(remark).sents]
    return sentences

AdCopyGenerator

Generate headlines and primary text for house ad copy based on house data

Source code in app/handlers.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
class AdCopyGenerator:
    """
    Generate headlines and primary text for house ad copy based on house data
    """

    def __init__(self, config):
        # TODO think of assigning config directly instead of unpacking it
        self.rankings = config.home_related_rankings
        self.dummy_headlines = config.dummy_headlines
        self.property_type_mapping = config.property_type_mapping
        self.subtype_mapping = config.subtype_mapping
        self.price_bin_values = config.price_bin_config.values
        self.price_bin_names = config.price_bin_config.names
        self.area_bin_values = config.area_bin_config.values
        self.area_bin_names = config.area_bin_config.names
        self.location_related_phrases = config.location_related_phrases
        self.popular_places = config.popular_places

    def generate_adjective_pattern(self, value, mode, subtype, property_type, state):
        """
        Get adjective descriptor for given value

        Parameters
        ----------
        value : float, int
            value to consider
        mode : str
            name of the property attribute to check the bins of. "area" for Area and "price" for
            Price
        subtype : int
            subtype of the house
        property_type : int
            property_type of the house
        state : str
            state of the house

        Returns
        -------
        out : str
            adjective corresponding to the bin where the value lies
        """
        default_key = f"DEFAULTp{property_type}s{subtype}"
        sample_key = f"{state}p{property_type}s{subtype}"
        subtype_alias = self.subtype_mapping.get(subtype, subtype)
        all_bin_values = getattr(self, f"{mode}_bin_values")
        bin_names = getattr(self, f"{mode}_bin_names")
        bin_values = all_bin_values.get(sample_key, all_bin_values[default_key])
        idx = len([bin_value for bin_value in bin_values if value > bin_value])
        return f"{bin_names[idx]} {subtype_alias}"

    def generate_patterns(self, subtype, property_type, state, price=None, living_area=None,
                          min_char=1):
        """
        Generate patterns based on house information. Current version maps price and living area
        to bins in their state/subtype/proptype, takes corresponding adjectives
        for the range bin they lie in and adds a subtype alias like "Home" or "Condo".

        Parameters
        ----------
        subtype : int
            subtype of the house
        property_type : int
            property_type of the house
        state : str
            state of the house
        price : float, int
            price value of the house
        living_area : int, float
            area of the house
        min_char : int
            minimal length of each pattern

        Returns
        -------
        out : List of str
            headlines
        """
        # TODO consider creating dummy patterns with some adjectives like gorgeous, beautiful etc
        res = []
        gen_args = {"subtype": subtype,
                    "state": state,
                    "property_type": property_type}

        if living_area:
            res.append(self.generate_adjective_pattern(value=living_area, mode="area", **gen_args))
        if price:
            res.append(self.generate_adjective_pattern(value=price, mode="price", **gen_args))

        return list(filter(lambda x: len(x) > min_char, res))

    @staticmethod
    def is_valid_pattern(pattern):
        """
        Checks whether pattern contains numbers and is about bathroom/bedroom etc.
        Returns true if pattern contain numbers and is about whitelisted things (street,
        floor, bathroom number) and False otherwise

        Parameters
        ----------
        pattern : str
            string to check

        Returns
        -------
        out : bool
        """
        if not pattern:
            return False
        if any(char.isdigit() for char in pattern):
            return False
        return True

    def filter_sort_nlp_features(self, nlp_features, property_type, subtype, state):
        """
        Filter and sort a list of nlp features based on state, property type and subtype ranking.

        Parameters
        ----------
        nlp_features : List of str
            features to rank
        subtype : int
            enum for subtype. Will be used to get rankings from config
        property_type : int
            enum for property_type. Will be used to get rankings from config
        state : str
            name of the state where the house is. Will be used to get rankings from config

        Returns
        -------
        out : List of str
            filtered and sorted features
        """
        # TODO implement logic for borrowing from neighbor states

        default_key = f"DEFAULTp{property_type}s{subtype}"
        sample_key = f"{state}p{property_type}s{subtype}"
        ranking = self.rankings.get(sample_key, self.rankings[default_key])
        sorted_nlp_features = filter_and_sort_list(to_process=nlp_features, by=ranking)
        return sorted_nlp_features

    def generate_standard_headlines(self, subtype, property_type, max_length, min_length):
        """
        Use subtype and property type to generate standard headlines

        Parameters
        ----------
        subtype : int
            subtype of the house
        property_type : int
            property type of the house
        max_length : int
            maximal number of character in each returned headline
        min_length : int
            minimal number of characters in each returned headline

        Returns
        -------
        out : List of str
            standard headlines
        """
        subtype_alias = self.subtype_mapping.get(subtype, subtype)
        property_type_alias = self.property_type_mapping.get(property_type, property_type)
        shuffle(self.dummy_headlines)
        headlines = [line.replace("subtype",
                                  subtype_alias).replace("property_type",
                                                         property_type_alias)
                     for line in self.dummy_headlines]

        return filter(lambda x: min_length <= len(x) <= max_length, headlines)

    def generate_headlines(self, patterns, subtype, property_type,
                           living_area=None, living_area_unit=None,
                           sorted_nlp_features=None, min_headline=0, max_headline=40):
        """
        Extract descriptive patterns from given remark and modify it using house data to
        generate headlines.

        Parameters
        ----------
        patterns : list of str
            patterns to process
        subtype : int
            subtype of the house
        property_type : int
            property type of the house
        living_area : float
            area of the house
        living_area_unit : str
            unit in which the area is measured
        sorted_nlp_features : List of str
            filtered house-related features of the house ranked by their correlation with price
        min_headline : int, default: 0
            minimal length of a headline. Can't be less than min_char. If such values are passed,
            min_headline will be overriden to be equal to min_char
        max_headline : int, default: 40
            maximal length of the headlines to return

        Returns
        -------
        out : List of str
            generated headlines
        """
        # convert to list to use in if statements as the filter object always return
        long_patterns = list(filter(lambda x: len(x) >= min_headline, patterns))
        short_patterns = list(filter(lambda x: len(x) < min_headline, patterns))
        headlines = []
        if long_patterns:
            headlines = sorted(long_patterns, key=len, reverse=True)
        if short_patterns:
            short_patterns.sort(key=len, reverse=True)
            short_patterns.sort(key=lambda x: len(x.split()), reverse=True)  # sort by num of words

            # If there are no long patterns that can serve as headlines directly, use shorter ones
            if sorted_nlp_features is None:
                sorted_nlp_features = []
            phrases_to_add = [f"with {feature}" for feature in sorted_nlp_features]
            if living_area and living_area_unit:
                phrases_to_add.extend([f"approx. {'{:,}'.format(living_area)}{living_area_unit}"])
            if phrases_to_add:
                short_headlines = [f"{pattern} {phrase}" for pattern, phrase in
                                   connect_lists_by_order(short_patterns, phrases_to_add)]
                headlines.extend(short_headlines)

        headlines += self.generate_standard_headlines(subtype=subtype,
                                                      property_type=property_type,
                                                      max_length=max_headline,
                                                      min_length=min_headline)
        return list(filter(lambda x: len(x) <= max_headline, headlines))

    def create_location_descriptor(self, neighborhood=None, city=None, county=None, state=None):
        """
        Create a string that describes the location of the house using its neighborhood, city,
        county & state

        Parameters
        ----------
        neighborhood, county, city, state : str: Optional[str]
            location of the house

        Returns
        -------
        out : str
            If the place is popular, only its name will be returned (for example, only county).
            otherwise passed parameters will be joined in a single string
        """

        existing = [item.title() for item in [neighborhood, city] if item is not None]

        if state is not None:
            existing.append(state)
        if not existing:
            raise ValueError("All provided values for location are null. Please provide at least "
                             "one valid argument")

        # We use county only when detecting popular names and ignore in actual title
        popular = case_insensitive_intersection(self.popular_places,
                                                existing + [county.title()] if county else existing)
        if popular:
            unique_popular = del_duplicates_ordered(popular, ignore_case=True)
            return ", ".join(unique_popular)

        return ", ".join(del_duplicates_ordered(existing, ignore_case=True))

    def generate_describing_sentence(self, subtype, property_type, city, state,
                                     neighborhood=None, county=None, patterns=None, bedrooms=None,
                                     bathrooms=None, sorted_nlp_features=None,
                                     exterior_style=None, image_tags=None):
        """
        Generate a describing sentence based on available home data

        Parameters
        ----------

        subtype : int
            enum of the subtype
        property_type : int
            enum of the property_type
        city, state, neighborhood, county : str
            location of the house
        patterns : List of str
            home-related patterns extracted from the remark
        bedrooms : int
            number of bedrooms
        bathrooms : int
            number of bathrooms
        sorted_nlp_features : List of str
            home-related nlp features ranked by correlation with the house's price
        exterior_style : str
            Architectural style of the house
        image_tags : List of str
            features of the house extracted from images

        # TODO create ranking like nlp for image tags too
        Returns
        -------
        out : str
            Sentence generated based on home data
        out : List of str
            descriptor words from home data used in this sentence, like home style or the
            bedroom num. Will help to avoid using same words in multiple sentences of primary text
        """

        # Transform required args to text
        proptype_alias = self.property_type_mapping[property_type]
        subtype_alias = self.subtype_mapping[subtype]
        location = self.create_location_descriptor(neighborhood=neighborhood,
                                                   city=city,
                                                   county=county,
                                                   state=state)
        if bedrooms:
            if bathrooms:
                description = f"A {bedrooms} BR, {bathrooms} BA {subtype_alias}"
            else:
                description = f"A {bedrooms} BR {subtype_alias}"
        elif patterns:
            description = patterns[0]
        elif exterior_style:
            description = f"A {exterior_style} style {subtype_alias}"
        else:
            description = f"A {subtype_alias}"

        feats = sorted_nlp_features.copy() if sorted_nlp_features else []
        if image_tags:
            feats.extend(image_tags)
        if feats:
            feats[0] = f" with {feats[0]}"
            if len(feats) > 1:
                feats = feats[:4]  # might be tunable
                feats[-1] = f"and {feats[-1]}"
            description += ", ".join(feats)  # may be tunable
            description = description.replace(", and", " and")
        sentence = f"{f'{description} for {proptype_alias}'} in {location}."

        used_words = sentence.replace("-", " ").lower().split(" ")
        if "a" in used_words:
            used_words.remove("a")
        return sentence, used_words

    @staticmethod
    def filter_sentences_by_phrases(all_sentences=None, phrases=None):
        """
        Generate second sentence of primary text based on location related patterns extracted
        from the remark

        Parameters
        ----------
        all_sentences : List of str
            candidate sentences
        phrases : List of str
            phrases to search in candidate sentences

        Returns
        -------
        out : List of str
            sentences containing at list one of phrases

        """
        if not all_sentences:
            return []
        if not phrases:
            return all_sentences

        filtered = [sentence for sentence in all_sentences
                    if any(pattern in sentence.lower() for pattern in phrases)]

        sorted_sentences = sorted(filtered, key=len, reverse=True)

        return sorted_sentences

    def filter_location_related_sentences(self, all_sentences):
        """
        Filter out all location based sentences from the given list
        Parameters
        ----------
        all_sentences : List of str
            sentences to filter

        Returns
        -------
        out : List of str
            sentences containing location related phrases from config
        """
        return self.filter_sentences_by_phrases(all_sentences=all_sentences,
                                                phrases=self.location_related_phrases)

    def generate_primary_text(self, subtype, property_type, city, state,
                              neighborhood=None, county=None, patterns=None, all_sentences=None,
                              bedrooms=None, bathrooms=None, sorted_nlp_features=None,
                              exterior_style=None, image_tags=None):
        """
        Generate primary text for given house's ad based on available info

        Parameters
        ----------
        subtype : int
            enum of the subtype
        property_type : int
            enum of the property_type
        city, state, neighborhood, county : str
            location of the house
        patterns : List of str
            home-related patterns extracted from the remark
        all_sentences : List of str
            list of sentences of remark
        bedrooms : int
            number of bedrooms
        bathrooms : int
            number of bathrooms
        sorted_nlp_features : List of str
            home-related nlp features ranked by correlation with the house's price
        exterior_style : str
            Architectural style of the house
        image_tags : List of str
            features of the house extracted from images

        # TODO create ranking like nlp for image tags too
        Returns
        -------
        out : str
            primary text containing generated descriptive sentence and a location-related
            sentence from remark if available
        """

        first_sentence, words_used = \
            self.generate_describing_sentence(subtype=subtype,
                                              property_type=property_type,
                                              city=city, state=state,
                                              neighborhood=neighborhood,
                                              county=county,
                                              patterns=patterns,
                                              bedrooms=bedrooms,
                                              bathrooms=bathrooms,
                                              sorted_nlp_features=sorted_nlp_features,
                                              exterior_style=exterior_style,
                                              image_tags=image_tags)
        location_related = self.filter_location_related_sentences(all_sentences=all_sentences)
        # filter out sentences containing phrases already used in the first sentence
        # TODO find and try to handle cases where this filtering cant help, like bedroom-bed
        location_related = list(filter(lambda x: not any(word in x.lower().split() for word in
                                                         words_used),
                                       location_related))
        if location_related:
            second_sentence = location_related[0]
            return f"{first_sentence} {second_sentence}"

        return first_sentence

generate_adjective_pattern(value, mode, subtype, property_type, state)

Get adjective descriptor for given value

Parameters:

Name Type Description Default
value (float, int)

value to consider

required
mode str

name of the property attribute to check the bins of. "area" for Area and "price" for Price

required
subtype int

subtype of the house

required
property_type int

property_type of the house

required
state str

state of the house

required

Returns:

Name Type Description
out str

adjective corresponding to the bin where the value lies

Source code in app/handlers.py
def generate_adjective_pattern(self, value, mode, subtype, property_type, state):
    """
    Get adjective descriptor for given value

    Parameters
    ----------
    value : float, int
        value to consider
    mode : str
        name of the property attribute to check the bins of. "area" for Area and "price" for
        Price
    subtype : int
        subtype of the house
    property_type : int
        property_type of the house
    state : str
        state of the house

    Returns
    -------
    out : str
        adjective corresponding to the bin where the value lies
    """
    default_key = f"DEFAULTp{property_type}s{subtype}"
    sample_key = f"{state}p{property_type}s{subtype}"
    subtype_alias = self.subtype_mapping.get(subtype, subtype)
    all_bin_values = getattr(self, f"{mode}_bin_values")
    bin_names = getattr(self, f"{mode}_bin_names")
    bin_values = all_bin_values.get(sample_key, all_bin_values[default_key])
    idx = len([bin_value for bin_value in bin_values if value > bin_value])
    return f"{bin_names[idx]} {subtype_alias}"

generate_patterns(subtype, property_type, state, price=None, living_area=None, min_char=1)

Generate patterns based on house information. Current version maps price and living area to bins in their state/subtype/proptype, takes corresponding adjectives for the range bin they lie in and adds a subtype alias like "Home" or "Condo".

Parameters:

Name Type Description Default
subtype int

subtype of the house

required
property_type int

property_type of the house

required
state str

state of the house

required
price (float, int)

price value of the house

None
living_area (int, float)

area of the house

None
min_char int

minimal length of each pattern

1

Returns:

Name Type Description
out List of str

headlines

Source code in app/handlers.py
def generate_patterns(self, subtype, property_type, state, price=None, living_area=None,
                      min_char=1):
    """
    Generate patterns based on house information. Current version maps price and living area
    to bins in their state/subtype/proptype, takes corresponding adjectives
    for the range bin they lie in and adds a subtype alias like "Home" or "Condo".

    Parameters
    ----------
    subtype : int
        subtype of the house
    property_type : int
        property_type of the house
    state : str
        state of the house
    price : float, int
        price value of the house
    living_area : int, float
        area of the house
    min_char : int
        minimal length of each pattern

    Returns
    -------
    out : List of str
        headlines
    """
    # TODO consider creating dummy patterns with some adjectives like gorgeous, beautiful etc
    res = []
    gen_args = {"subtype": subtype,
                "state": state,
                "property_type": property_type}

    if living_area:
        res.append(self.generate_adjective_pattern(value=living_area, mode="area", **gen_args))
    if price:
        res.append(self.generate_adjective_pattern(value=price, mode="price", **gen_args))

    return list(filter(lambda x: len(x) > min_char, res))

is_valid_pattern(pattern) staticmethod

Checks whether pattern contains numbers and is about bathroom/bedroom etc. Returns true if pattern contain numbers and is about whitelisted things (street, floor, bathroom number) and False otherwise

Parameters:

Name Type Description Default
pattern str

string to check

required

Returns:

Name Type Description
out bool
Source code in app/handlers.py
@staticmethod
def is_valid_pattern(pattern):
    """
    Checks whether pattern contains numbers and is about bathroom/bedroom etc.
    Returns true if pattern contain numbers and is about whitelisted things (street,
    floor, bathroom number) and False otherwise

    Parameters
    ----------
    pattern : str
        string to check

    Returns
    -------
    out : bool
    """
    if not pattern:
        return False
    if any(char.isdigit() for char in pattern):
        return False
    return True

filter_sort_nlp_features(nlp_features, property_type, subtype, state)

Filter and sort a list of nlp features based on state, property type and subtype ranking.

Parameters:

Name Type Description Default
nlp_features List of str

features to rank

required
subtype int

enum for subtype. Will be used to get rankings from config

required
property_type int

enum for property_type. Will be used to get rankings from config

required
state str

name of the state where the house is. Will be used to get rankings from config

required

Returns:

Name Type Description
out List of str

filtered and sorted features

Source code in app/handlers.py
def filter_sort_nlp_features(self, nlp_features, property_type, subtype, state):
    """
    Filter and sort a list of nlp features based on state, property type and subtype ranking.

    Parameters
    ----------
    nlp_features : List of str
        features to rank
    subtype : int
        enum for subtype. Will be used to get rankings from config
    property_type : int
        enum for property_type. Will be used to get rankings from config
    state : str
        name of the state where the house is. Will be used to get rankings from config

    Returns
    -------
    out : List of str
        filtered and sorted features
    """
    # TODO implement logic for borrowing from neighbor states

    default_key = f"DEFAULTp{property_type}s{subtype}"
    sample_key = f"{state}p{property_type}s{subtype}"
    ranking = self.rankings.get(sample_key, self.rankings[default_key])
    sorted_nlp_features = filter_and_sort_list(to_process=nlp_features, by=ranking)
    return sorted_nlp_features

generate_standard_headlines(subtype, property_type, max_length, min_length)

Use subtype and property type to generate standard headlines

Parameters:

Name Type Description Default
subtype int

subtype of the house

required
property_type int

property type of the house

required
max_length int

maximal number of character in each returned headline

required
min_length int

minimal number of characters in each returned headline

required

Returns:

Name Type Description
out List of str

standard headlines

Source code in app/handlers.py
def generate_standard_headlines(self, subtype, property_type, max_length, min_length):
    """
    Use subtype and property type to generate standard headlines

    Parameters
    ----------
    subtype : int
        subtype of the house
    property_type : int
        property type of the house
    max_length : int
        maximal number of character in each returned headline
    min_length : int
        minimal number of characters in each returned headline

    Returns
    -------
    out : List of str
        standard headlines
    """
    subtype_alias = self.subtype_mapping.get(subtype, subtype)
    property_type_alias = self.property_type_mapping.get(property_type, property_type)
    shuffle(self.dummy_headlines)
    headlines = [line.replace("subtype",
                              subtype_alias).replace("property_type",
                                                     property_type_alias)
                 for line in self.dummy_headlines]

    return filter(lambda x: min_length <= len(x) <= max_length, headlines)

generate_headlines(patterns, subtype, property_type, living_area=None, living_area_unit=None, sorted_nlp_features=None, min_headline=0, max_headline=40)

Extract descriptive patterns from given remark and modify it using house data to generate headlines.

Parameters:

Name Type Description Default
patterns list of str

patterns to process

required
subtype int

subtype of the house

required
property_type int

property type of the house

required
living_area float

area of the house

None
living_area_unit str

unit in which the area is measured

None
sorted_nlp_features List of str

filtered house-related features of the house ranked by their correlation with price

None
min_headline int

minimal length of a headline. Can't be less than min_char. If such values are passed, min_headline will be overriden to be equal to min_char

0
max_headline int

maximal length of the headlines to return

40

Returns:

Name Type Description
out List of str

generated headlines

Source code in app/handlers.py
def generate_headlines(self, patterns, subtype, property_type,
                       living_area=None, living_area_unit=None,
                       sorted_nlp_features=None, min_headline=0, max_headline=40):
    """
    Extract descriptive patterns from given remark and modify it using house data to
    generate headlines.

    Parameters
    ----------
    patterns : list of str
        patterns to process
    subtype : int
        subtype of the house
    property_type : int
        property type of the house
    living_area : float
        area of the house
    living_area_unit : str
        unit in which the area is measured
    sorted_nlp_features : List of str
        filtered house-related features of the house ranked by their correlation with price
    min_headline : int, default: 0
        minimal length of a headline. Can't be less than min_char. If such values are passed,
        min_headline will be overriden to be equal to min_char
    max_headline : int, default: 40
        maximal length of the headlines to return

    Returns
    -------
    out : List of str
        generated headlines
    """
    # convert to list to use in if statements as the filter object always return
    long_patterns = list(filter(lambda x: len(x) >= min_headline, patterns))
    short_patterns = list(filter(lambda x: len(x) < min_headline, patterns))
    headlines = []
    if long_patterns:
        headlines = sorted(long_patterns, key=len, reverse=True)
    if short_patterns:
        short_patterns.sort(key=len, reverse=True)
        short_patterns.sort(key=lambda x: len(x.split()), reverse=True)  # sort by num of words

        # If there are no long patterns that can serve as headlines directly, use shorter ones
        if sorted_nlp_features is None:
            sorted_nlp_features = []
        phrases_to_add = [f"with {feature}" for feature in sorted_nlp_features]
        if living_area and living_area_unit:
            phrases_to_add.extend([f"approx. {'{:,}'.format(living_area)}{living_area_unit}"])
        if phrases_to_add:
            short_headlines = [f"{pattern} {phrase}" for pattern, phrase in
                               connect_lists_by_order(short_patterns, phrases_to_add)]
            headlines.extend(short_headlines)

    headlines += self.generate_standard_headlines(subtype=subtype,
                                                  property_type=property_type,
                                                  max_length=max_headline,
                                                  min_length=min_headline)
    return list(filter(lambda x: len(x) <= max_headline, headlines))

create_location_descriptor(neighborhood=None, city=None, county=None, state=None)

Create a string that describes the location of the house using its neighborhood, city, county & state

Parameters:

Name Type Description Default
neighborhood str: Optional[str]

location of the house

None
county str: Optional[str]

location of the house

None
city str: Optional[str]

location of the house

None
state str: Optional[str]

location of the house

None

Returns:

Name Type Description
out str

If the place is popular, only its name will be returned (for example, only county). otherwise passed parameters will be joined in a single string

Source code in app/handlers.py
def create_location_descriptor(self, neighborhood=None, city=None, county=None, state=None):
    """
    Create a string that describes the location of the house using its neighborhood, city,
    county & state

    Parameters
    ----------
    neighborhood, county, city, state : str: Optional[str]
        location of the house

    Returns
    -------
    out : str
        If the place is popular, only its name will be returned (for example, only county).
        otherwise passed parameters will be joined in a single string
    """

    existing = [item.title() for item in [neighborhood, city] if item is not None]

    if state is not None:
        existing.append(state)
    if not existing:
        raise ValueError("All provided values for location are null. Please provide at least "
                         "one valid argument")

    # We use county only when detecting popular names and ignore in actual title
    popular = case_insensitive_intersection(self.popular_places,
                                            existing + [county.title()] if county else existing)
    if popular:
        unique_popular = del_duplicates_ordered(popular, ignore_case=True)
        return ", ".join(unique_popular)

    return ", ".join(del_duplicates_ordered(existing, ignore_case=True))

generate_describing_sentence(subtype, property_type, city, state, neighborhood=None, county=None, patterns=None, bedrooms=None, bathrooms=None, sorted_nlp_features=None, exterior_style=None, image_tags=None)

Generate a describing sentence based on available home data

Parameters:

Name Type Description Default
subtype int

enum of the subtype

required
property_type int

enum of the property_type

required
city str

location of the house

required
state str

location of the house

required
neighborhood str

location of the house

required
county str

location of the house

required
patterns List of str

home-related patterns extracted from the remark

None
bedrooms int

number of bedrooms

None
bathrooms int

number of bathrooms

None
sorted_nlp_features List of str

home-related nlp features ranked by correlation with the house's price

None
exterior_style str

Architectural style of the house

None
image_tags List of str

features of the house extracted from images

None

TODO create ranking like nlp for image tags too

Returns:

Name Type Description
out str

Sentence generated based on home data

out List of str

descriptor words from home data used in this sentence, like home style or the bedroom num. Will help to avoid using same words in multiple sentences of primary text

Source code in app/handlers.py
def generate_describing_sentence(self, subtype, property_type, city, state,
                                 neighborhood=None, county=None, patterns=None, bedrooms=None,
                                 bathrooms=None, sorted_nlp_features=None,
                                 exterior_style=None, image_tags=None):
    """
    Generate a describing sentence based on available home data

    Parameters
    ----------

    subtype : int
        enum of the subtype
    property_type : int
        enum of the property_type
    city, state, neighborhood, county : str
        location of the house
    patterns : List of str
        home-related patterns extracted from the remark
    bedrooms : int
        number of bedrooms
    bathrooms : int
        number of bathrooms
    sorted_nlp_features : List of str
        home-related nlp features ranked by correlation with the house's price
    exterior_style : str
        Architectural style of the house
    image_tags : List of str
        features of the house extracted from images

    # TODO create ranking like nlp for image tags too
    Returns
    -------
    out : str
        Sentence generated based on home data
    out : List of str
        descriptor words from home data used in this sentence, like home style or the
        bedroom num. Will help to avoid using same words in multiple sentences of primary text
    """

    # Transform required args to text
    proptype_alias = self.property_type_mapping[property_type]
    subtype_alias = self.subtype_mapping[subtype]
    location = self.create_location_descriptor(neighborhood=neighborhood,
                                               city=city,
                                               county=county,
                                               state=state)
    if bedrooms:
        if bathrooms:
            description = f"A {bedrooms} BR, {bathrooms} BA {subtype_alias}"
        else:
            description = f"A {bedrooms} BR {subtype_alias}"
    elif patterns:
        description = patterns[0]
    elif exterior_style:
        description = f"A {exterior_style} style {subtype_alias}"
    else:
        description = f"A {subtype_alias}"

    feats = sorted_nlp_features.copy() if sorted_nlp_features else []
    if image_tags:
        feats.extend(image_tags)
    if feats:
        feats[0] = f" with {feats[0]}"
        if len(feats) > 1:
            feats = feats[:4]  # might be tunable
            feats[-1] = f"and {feats[-1]}"
        description += ", ".join(feats)  # may be tunable
        description = description.replace(", and", " and")
    sentence = f"{f'{description} for {proptype_alias}'} in {location}."

    used_words = sentence.replace("-", " ").lower().split(" ")
    if "a" in used_words:
        used_words.remove("a")
    return sentence, used_words

filter_sentences_by_phrases(all_sentences=None, phrases=None) staticmethod

Generate second sentence of primary text based on location related patterns extracted from the remark

Parameters:

Name Type Description Default
all_sentences List of str

candidate sentences

None
phrases List of str

phrases to search in candidate sentences

None

Returns:

Name Type Description
out List of str

sentences containing at list one of phrases

Source code in app/handlers.py
@staticmethod
def filter_sentences_by_phrases(all_sentences=None, phrases=None):
    """
    Generate second sentence of primary text based on location related patterns extracted
    from the remark

    Parameters
    ----------
    all_sentences : List of str
        candidate sentences
    phrases : List of str
        phrases to search in candidate sentences

    Returns
    -------
    out : List of str
        sentences containing at list one of phrases

    """
    if not all_sentences:
        return []
    if not phrases:
        return all_sentences

    filtered = [sentence for sentence in all_sentences
                if any(pattern in sentence.lower() for pattern in phrases)]

    sorted_sentences = sorted(filtered, key=len, reverse=True)

    return sorted_sentences

Filter out all location based sentences from the given list

Parameters:

Name Type Description Default
all_sentences List of str

sentences to filter

required

Returns:

Name Type Description
out List of str

sentences containing location related phrases from config

Source code in app/handlers.py
def filter_location_related_sentences(self, all_sentences):
    """
    Filter out all location based sentences from the given list
    Parameters
    ----------
    all_sentences : List of str
        sentences to filter

    Returns
    -------
    out : List of str
        sentences containing location related phrases from config
    """
    return self.filter_sentences_by_phrases(all_sentences=all_sentences,
                                            phrases=self.location_related_phrases)

generate_primary_text(subtype, property_type, city, state, neighborhood=None, county=None, patterns=None, all_sentences=None, bedrooms=None, bathrooms=None, sorted_nlp_features=None, exterior_style=None, image_tags=None)

Generate primary text for given house's ad based on available info

Parameters:

Name Type Description Default
subtype int

enum of the subtype

required
property_type int

enum of the property_type

required
city str

location of the house

required
state str

location of the house

required
neighborhood str

location of the house

required
county str

location of the house

required
patterns List of str

home-related patterns extracted from the remark

None
all_sentences List of str

list of sentences of remark

None
bedrooms int

number of bedrooms

None
bathrooms int

number of bathrooms

None
sorted_nlp_features List of str

home-related nlp features ranked by correlation with the house's price

None
exterior_style str

Architectural style of the house

None
image_tags List of str

features of the house extracted from images

None

TODO create ranking like nlp for image tags too

Returns:

Name Type Description
out str

primary text containing generated descriptive sentence and a location-related sentence from remark if available

Source code in app/handlers.py
def generate_primary_text(self, subtype, property_type, city, state,
                          neighborhood=None, county=None, patterns=None, all_sentences=None,
                          bedrooms=None, bathrooms=None, sorted_nlp_features=None,
                          exterior_style=None, image_tags=None):
    """
    Generate primary text for given house's ad based on available info

    Parameters
    ----------
    subtype : int
        enum of the subtype
    property_type : int
        enum of the property_type
    city, state, neighborhood, county : str
        location of the house
    patterns : List of str
        home-related patterns extracted from the remark
    all_sentences : List of str
        list of sentences of remark
    bedrooms : int
        number of bedrooms
    bathrooms : int
        number of bathrooms
    sorted_nlp_features : List of str
        home-related nlp features ranked by correlation with the house's price
    exterior_style : str
        Architectural style of the house
    image_tags : List of str
        features of the house extracted from images

    # TODO create ranking like nlp for image tags too
    Returns
    -------
    out : str
        primary text containing generated descriptive sentence and a location-related
        sentence from remark if available
    """

    first_sentence, words_used = \
        self.generate_describing_sentence(subtype=subtype,
                                          property_type=property_type,
                                          city=city, state=state,
                                          neighborhood=neighborhood,
                                          county=county,
                                          patterns=patterns,
                                          bedrooms=bedrooms,
                                          bathrooms=bathrooms,
                                          sorted_nlp_features=sorted_nlp_features,
                                          exterior_style=exterior_style,
                                          image_tags=image_tags)
    location_related = self.filter_location_related_sentences(all_sentences=all_sentences)
    # filter out sentences containing phrases already used in the first sentence
    # TODO find and try to handle cases where this filtering cant help, like bedroom-bed
    location_related = list(filter(lambda x: not any(word in x.lower().split() for word in
                                                     words_used),
                                   location_related))
    if location_related:
        second_sentence = location_related[0]
        return f"{first_sentence} {second_sentence}"

    return first_sentence

FeatureExtractor

Extracting features with a given patterns

Source code in app/handlers.py
class FeatureExtractor:
    """
    Extracting features with a given patterns
    """

    def __init__(self, patterns):
        """
        Initiate patterns, spacy phrase matcher and add patterns to that matcher
        """
        self.patterns = patterns
        self.matcher = PhraseMatcher(nlp.vocab)
        for pattern_key, pattern in patterns.items():
            self.matcher.add(pattern_key, [
                nlp(" ".join([token.text for token in
                              nlp(phrase)])) for phrase in pattern])

    @staticmethod
    def preprocess_remark(remark):
        """
        Preprocess initial remark
        Parameters
        ----------
        remark: string
            initial remark of a property

        Returns
        -------
        remark: string
            preprocessed remark of a property
        """

        # replace punctuations with spaces
        punctuations = string.punctuation.replace("'", "")
        remark = remark.translate(str.maketrans(punctuations, " " * len(punctuations)))
        # delete multiple spaces
        remark = " ".join(remark.split())
        return remark

    def extract_features(self, remark):
        """
        Extract features from the given remark
        Parameters
        ----------
        remark : string
            remark of a property

        Returns
        -------
        features : list
            list of extracted features
        """

        remark = self.preprocess_remark(remark)
        doc = nlp(remark.lower())

        features = {}
        for match_id, start, end in self.matcher(doc):
            matched_feature = nlp.vocab.strings[match_id]
            # generate random id and add to the feature name, because match_id is not unique
            # we need to keep random ids because, same feature can be extracted from different positions
            matched_feature_unique = matched_feature + "_" + str(generate_guid(k=6))
            features[matched_feature_unique] = (start, end)

        features_copy = features.copy()
        # iterate over matches and remove intersections
        for matched_feature_unique, (start, end) in features_copy.items():
            for k, v in features_copy.items():
                if k != matched_feature_unique:
                    if (start <= v[0]) & (v[1] <= end):  # if matched_feature_unique contains k
                        try:
                            del features[k]  # remove k from features
                        except KeyError as e:  # catch key error if the feature is already removed
                            continue

        unique_features = set([f.split("_")[0] for f in features.keys()])
        # delete not pet-friendly
        if "not pet-friendly" in unique_features:
            unique_features.remove("not pet-friendly")

        return unique_features

    def return_unique_features(self):
        """
        Return unique features

        Returns
        -------
        unique_features : list
            unique features
        """
        unique_features = list(self.patterns.keys())
        unique_features.remove("not pet-friendly")
        return set(unique_features)

__init__(patterns)

Initiate patterns, spacy phrase matcher and add patterns to that matcher

Source code in app/handlers.py
def __init__(self, patterns):
    """
    Initiate patterns, spacy phrase matcher and add patterns to that matcher
    """
    self.patterns = patterns
    self.matcher = PhraseMatcher(nlp.vocab)
    for pattern_key, pattern in patterns.items():
        self.matcher.add(pattern_key, [
            nlp(" ".join([token.text for token in
                          nlp(phrase)])) for phrase in pattern])

preprocess_remark(remark) staticmethod

Preprocess initial remark

Parameters:

Name Type Description Default
remark

initial remark of a property

required

Returns:

Name Type Description
remark string

preprocessed remark of a property

Source code in app/handlers.py
@staticmethod
def preprocess_remark(remark):
    """
    Preprocess initial remark
    Parameters
    ----------
    remark: string
        initial remark of a property

    Returns
    -------
    remark: string
        preprocessed remark of a property
    """

    # replace punctuations with spaces
    punctuations = string.punctuation.replace("'", "")
    remark = remark.translate(str.maketrans(punctuations, " " * len(punctuations)))
    # delete multiple spaces
    remark = " ".join(remark.split())
    return remark

extract_features(remark)

Extract features from the given remark

Parameters:

Name Type Description Default
remark string

remark of a property

required

Returns:

Name Type Description
features list

list of extracted features

Source code in app/handlers.py
def extract_features(self, remark):
    """
    Extract features from the given remark
    Parameters
    ----------
    remark : string
        remark of a property

    Returns
    -------
    features : list
        list of extracted features
    """

    remark = self.preprocess_remark(remark)
    doc = nlp(remark.lower())

    features = {}
    for match_id, start, end in self.matcher(doc):
        matched_feature = nlp.vocab.strings[match_id]
        # generate random id and add to the feature name, because match_id is not unique
        # we need to keep random ids because, same feature can be extracted from different positions
        matched_feature_unique = matched_feature + "_" + str(generate_guid(k=6))
        features[matched_feature_unique] = (start, end)

    features_copy = features.copy()
    # iterate over matches and remove intersections
    for matched_feature_unique, (start, end) in features_copy.items():
        for k, v in features_copy.items():
            if k != matched_feature_unique:
                if (start <= v[0]) & (v[1] <= end):  # if matched_feature_unique contains k
                    try:
                        del features[k]  # remove k from features
                    except KeyError as e:  # catch key error if the feature is already removed
                        continue

    unique_features = set([f.split("_")[0] for f in features.keys()])
    # delete not pet-friendly
    if "not pet-friendly" in unique_features:
        unique_features.remove("not pet-friendly")

    return unique_features

return_unique_features()

Return unique features

Returns:

Name Type Description
unique_features list

unique features

Source code in app/handlers.py
def return_unique_features(self):
    """
    Return unique features

    Returns
    -------
    unique_features : list
        unique features
    """
    unique_features = list(self.patterns.keys())
    unique_features.remove("not pet-friendly")
    return set(unique_features)

TextGenerator

Generate texts using ChatGPT

Source code in app/handlers.py
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
class TextGenerator:
    """
    Generate texts using ChatGPT
    """

    def __init__(self, key, model, examples, features_ranking):
        """
        Initiate key, model, tokenizer of ChatGPT and greetings, system introductions, closings texts, tokenizer
        """
        self.key = key
        self.model = model
        self.examples = examples
        self.features_ranking = features_ranking
        self.greetings = self.examples["greetings"]
        self.system_introductions = self.examples["system_introductions"]
        self.closings = self.examples["closings"]
        self.tokenizer = tiktoken.encoding_for_model(self.model)

    def count_tokens(self, text):
        """
        Count text tokens

        Parameters
        ----------
        text: str
            remarks

        Returns
        -------
        tokens_count : int
            count of tokens
        """
        tokens = self.tokenizer.encode(text)
        tokens_count = len(tokens)
        return tokens_count

    def introduce_system(self):
        """
        Generate the introduction of the system

        Returns
        -------
        system_introductions : str
            email system introduction text
        """
        system_introductions = choice(self.system_introductions)
        return system_introductions

    def close_email(self, agent_name):
        """
        Generate the closing part of email

        Parameters
        ----------
        agent_name : str
            agent name

        Returns
        -------
        email_closing : str
            email closing part
        """
        closing = choice(self.closings)
        email_closing = f"{closing}\n{agent_name}"
        return email_closing

    def generate_empty_collection_invitation(self, client_name, agent_name):
        """
        Generate invitation for empty collection

        Parameters
        ----------
        client_name : str
            name of client
        agent_name : str
            name of agent

        Returns
        -------
        greeting : str
            Client greeting part of email
        system_introduction : str
            Introduction of system
        closing : str
            Closing part of email
        full_text : str
            Email full text
        """
        greeting = f"{choice(self.greetings)} {client_name},"
        system_introduction = self.introduce_system()
        closing = self.close_email(agent_name)
        full_text = f"{greeting}\n{system_introduction}\n{closing}"
        return greeting, system_introduction, closing, full_text

    def generate_text(self, prompt):
        """
        Generate text for the given prompt using ChatGPT

        Parameters
        ----------
        prompt : list
            prompt for ChatGPT

        Returns
        -------
        text_title : json
            generated text and title
        status_code : int
            status_code of ChatGPT response
        message : str
            message of ChatGPT response
        """
        try:
            openai.api_key = self.key
            response = openai.ChatCompletion.create(model=self.model, messages=prompt)
            generated_text = response.choices[0].message["content"]
            generated_text = generated_text[0].capitalize() + generated_text[1:]
            message = "OK"
            status_code = 200
        except openai.error.APIError as error:
            generated_text = None
            message = error.message,
            status_code = error.status
        return generated_text, status_code, message

    async def async_generate_text(self, system_message: List[Dict],
                                  prompt: str, temperature: float,
                                  timeout: int) -> str:
        """
        Asynchronously generate text for the given prompt using ChatGPT.

        Parameters
        ----------
        system_message: List[Dict]
            The message to help generate text
        prompt : List[Dict[str, Union[str, Any]]
            Prompt for ChatGPT, structured as a list of message dictionaries.
        temperature : float
            Temperature controls the randomness of the text that GPT generates
        timeout : int
            Timeout parameter which decides when to throw a timeout error

        Returns
        -------
        out : str
        """
        try:
            openai.api_key = self.key
            response = await openai.ChatCompletion.acreate(model=self.model,
                                                           messages=make_prompt(system_message, prompt),
                                                           response_format={"type": "json_object"},
                                                           temperature=temperature,
                                                           timeout=timeout,
                                                           frequency_penalty=1,
                                                           presence_penalty=1)
            generated_text = response.choices[0].message["content"]
        except openai.error.APIError as error:
            raise Exception(f"API Error: {error.message}, Status Code: {error.status}")
        return generated_text

    @staticmethod
    def get_locations_parts(df_collection):
        """
        Generate the state city and neighborhood parts

        Parameters
        ----------
        df_collection : DataFrame
            collection data

        Returns
        -------
        state_part : str
            state part of general sentence in prompt
        city_part : str
            city part of general sentence in prompt
        neighborhood_part : str
            neighborhood part of general sentence in prompt
        """
        unique_states = df_collection.state.dropna().unique()
        state = ', '.join(unique_states)
        if len(unique_states) == 1:
            state_part = f"state: {state}"
        else:
            state_part = f"states: {state}"
        unique_cities = df_collection.city.dropna().unique()
        neighborhood_part = ""
        if (len(unique_cities) > 0) and (len(unique_cities) <= 3):
            city = ', '.join(unique_cities)
            if len(unique_cities) == 1:
                city_part = f", city: {city}"
            else:
                city_part = f", cities: {city}"
            unique_neighborhoods = df_collection.neighborhood.dropna().unique()
            if (len(unique_neighborhoods) > 0) and (len(unique_neighborhoods) <= 3):
                neighborhood = ', '.join(unique_neighborhoods)
                if len(unique_neighborhoods) == 1:
                    neighborhood_part = f", neighborhood: {neighborhood}."
                else:
                    neighborhood_part = f", neighborhoods: {neighborhood}."
            else:
                neighborhood_part = "."
        else:
            city_part = "."
        return state_part, city_part, neighborhood_part

    @staticmethod
    def get_price_sentence(df_collection):
        """
        Generate the price sentence of prompt.

        Parameters
        ----------
        df_collection : DataFrame
            collection data

        Returns
        -------
        price_sent : str
            sentence about the prices of collection properties
        """
        price_min = df_collection.price.min()
        price_max = df_collection.price.max()
        if len(df_collection) == 1:
            price_sent = f"The property costs {price_min}."
        else:
            if price_min != price_max:
                price_range = f"{price_min} - {price_max}"
                price_sent = f"Properties are in price range {price_range}."
            else:
                price_sent = f"Properties cost {price_min}."
        return price_sent

    def get_non_home_collection_prompt(self, df_collection):
        """
        Make ChatGPT prompt for non_home collection

        Parameters
        ----------
        df_collection : DataFrame
            collection's homes data in DataFrame format

        Returns
        -------
        prompt : list
            prompt for non_home collection
        collection_information : str
            general information of not home collection
        """
        prompt = None
        property_type = df_collection["property_type"].unique()[0]
        home_type = df_collection["home_type"].unique()[0]
        if home_type == "Other":
            home_type = "properties"
        state_part, city_part, neighborhood_part = self.get_locations_parts(df_collection)

        general_sent = f"The collection includes properties for {property_type} in the " \
                       f"{state_part}{city_part}{neighborhood_part}"

        price_sent = self.get_price_sentence(df_collection)

        school_sent = self.get_school(df_collection)

        collection_information = f"{general_sent} {price_sent} {school_sent}"
        all_remarks_none = all(df_collection["remark"].isna())
        if not all_remarks_none:
            i = len(df_collection)
            remarks = '\n\n\n '.join(df_collection.iloc[:i].remark.dropna())
            while self.count_tokens(remarks) > REMARKS_CHAR_MAX_COUNT_IN_PROMPT:
                i -= 1
                remarks = '\n\n\n '.join(df_collection.iloc[:i].remarks.dropna())
            prompt = [
                {"role": "system",
                 "content": "You are an experienced real estate content creator."},
                {"role": "user",
                 "content": f"Given the information below, generate 3-4 sentences describing the collection of "
                            f"{home_type}. Do not use any fair housing act violation, including information "
                            f"about families, race, sex, religion, and other sensitive content. "
                            f"Do not mention certain group of people like families, couples, singles, individuals. "
                            f"Use the $ symbol instead of the word dollars. "
                            f"Summarize location based common features, sizes and common usage of all collection. "
                            f"The remarks are separated by\n\n\n{collection_information}\n{remarks}"}]

        return prompt, collection_information

    def get_multi_home_collection_prompt(self, df_collection):
        """
        Make ChatGPT prompt for collection of multiple homes

        Parameters
        ----------
        df_collection : DataFrame
            collection's homes data in DataFrame format

        Returns
        -------
        prompt : list
            prompt for the multi home collection
        collection_information : str
            general information of multi home collection
        """
        property_type = df_collection["property_type"].unique()[0]
        state_part, city_part, neighborhood_part = self.get_locations_parts(df_collection)
        subtype = ', '.join(df_collection.physical_property_type.unique())

        general_sent = f"The collection includes {subtype} homes for {property_type} in the " \
                       f"{state_part}{city_part}{neighborhood_part}"

        price_sent = self.get_multi_property_price(df_collection, "homes")
        bed_bath_sent = self.get_multi_property_bed_bath(df_collection)

        style_sent = self.get_multi_property_style(df_collection)
        feature_sent = self.get_multi_property_features(df_collection)
        school_sent = self.get_school(df_collection)
        collection_information = f"{general_sent} {price_sent} {bed_bath_sent} {style_sent} {school_sent} " \
                                 f"{feature_sent}"

        prompt = [
            {"role": "system",
             "content": "You are an experienced real estate content creator."},
            {"role": "user",
             "content": f"Given the information below, generate 3-4 sentences describing the collection of homes. "
                        f"Do not use any fair housing act violation, including information about families, race, sex, "
                        f"religion, and other sensitive content. "
                        f"Do not mention certain group of people like families, couples, singles, individuals. "
                        f"Use the $ symbol instead of the word dollars. {collection_information}"}]
        return prompt, collection_information

    def get_unique_property_prompt(self, df_collection):
        """
        Make ChatGPT prompt for collection of unique property

        Parameters
        ----------
        df_collection : DataFrame
            collection's homes data in DataFrame format

        Returns
        -------
        prompt : list
            prompt for the unique property collection
        collection_information : str
            general information of unique property collection
        """
        property_data = df_collection.to_dict(orient='records')[0]
        if property_data["home_type"] == "Other":
            prop_type = "property"
        else:
            prop_type = property_data["home_type"]
        property_type = property_data["property_type"]
        state = property_data["state"]
        city_part, neighborhood_part = self.get_unique_property_city_neighborhood(
            property_data)

        general_sent = f"The collection is a {prop_type} for {property_type} in the " \
                       f"state: {state}{city_part}{neighborhood_part}"

        price_bed_bath_sent = self.get_unique_property_price_bed_bath(
            property_data, prop_type)
        style_sent = self.get_unique_property_style(property_data)
        school_sent = self.get_school(df_collection)

        collection_information = f"{general_sent} {price_bed_bath_sent} {style_sent} {school_sent}"
        remark = property_data["remark"]
        if remark:
            remark_part = f"It also has the following remark: {remark}"
        else:
            remark_part = ""
        prompt = [
            {"role": "system",
             "content": "You are an experienced real estate content creator."},
            {"role": "user",
             "content": f"Given the information below, generate 3-4 sentences describing the collection of a "
                        f"{prop_type}. Do not use any fair housing act violation, including information about families,"
                        f" race, sex, religion, and other sensitive content. "
                        f"Do not mention certain group of people like families, couples, singles, individuals. "
                        f"Use the $ symbol instead of the word dollars. {collection_information} {remark_part}"}]
        return prompt, collection_information

    @staticmethod
    def get_unique_property_price_bed_bath(property_data, prop_type):
        """
        Make unique property collection information price bedroom and bathroom part

        Parameters
        ----------
        property_data : dict
            the data of a property
        prop_type : str
            the home type of the property
        Returns
        -------
        price_bed_bath_sent : str
            price, bedroom and bathroom part of unique property collection
        """

        price = property_data["price"]
        price_bed_bath_sent = f"The {prop_type} costs {price}"

        if prop_type in HOMES:
            bedrooms = property_data["bedrooms"]
            bathrooms = property_data["bathrooms"]

            if bedrooms and (bedrooms >= 1):
                if bedrooms == 1:
                    price_bed_bath_sent += " has 1 bedroom"
                else:
                    price_bed_bath_sent += f" has {bedrooms} bedrooms"
                if bathrooms and (bathrooms >= 1):
                    if bathrooms == 1:
                        price_bed_bath_sent += f" and {bathrooms} bathroom"
                    else:
                        price_bed_bath_sent += f" and {bathrooms} bathrooms"
            else:
                if bathrooms and bathrooms >= 1:
                    if bathrooms == 1:
                        price_bed_bath_sent += f" has 1 bathroom"
                    else:
                        price_bed_bath_sent += f" has {bathrooms} bathrooms"
        price_bed_bath_sent += "."
        return price_bed_bath_sent

    @staticmethod
    def get_unique_property_style(property_data):
        """
        Make unique property collection information style part

        Parameters
        ----------
        property_data : dict
            the data of a property

        Returns
        -------
        style_sent : str
            style part of a unique property collection
        """
        exterior_style = property_data["exterior_style"]
        style_sent = ""
        if property_data["home_type"] in HOMES and exterior_style:
            style_sent = f"Home has {exterior_style} architectural style."
        return style_sent

    @staticmethod
    def get_school(df_collection):
        """
        Make the given collection school part
        Parameters
        ----------
        df_collection : DataFrame
            collection data in Dataframe format

        Returns
        -------
        school_sent : str
            school part of a given collection
        """
        prop_count = len(df_collection)
        school_sent = ""
        schools = []
        if sum(df_collection["private_school"]) > prop_count / 2:
            schools.append("private")
        if sum(df_collection["public_school"]) > prop_count / 2:
            schools.append("public")
        if len(schools) > 0:
            school_part = " and ".join(schools)
            school_sent = f"There are {school_part} schools nearby."
        return school_sent

    @staticmethod
    def get_unique_property_city_neighborhood(property_data):
        """
        Make the unique property collection city and neighborhood part
        Parameters
        ----------
        property_data : dict
            the data of a property

        Returns
        -------
        city_part : str
            city part of unique property collection
        neighborhood_part : str
            neighborhood part of unique property collection
        """
        neighborhood_part = ""
        city = property_data["city"]
        neighborhood = property_data["neighborhood"]
        if city:
            city_part = f", city: {city}"
            if neighborhood:
                neighborhood_part = f", neighborhood: {neighborhood}."
            else:
                neighborhood_part = "."
        else:
            city_part = "."
        return city_part, neighborhood_part

    @staticmethod
    def get_unique_property_features(property_data):
        """
        Make the unique property collection features
        Parameters
        ----------
        property_data : dict
            the data of a property

        Returns
        -------
        feature_sent : str
            feature sentence of unique property collection
        """
        features = property_data["features"]
        feature_sent = ""
        if features:
            feature_part = ", ".join(features)
            feature_sent = f"It has {feature_part}."
        return feature_sent

    def get_unique_property_information(self, property_df):
        """
        Make the general sentence and description of unique property
        Parameters
        ----------
        property_df : DataFrame
            the data od a property in DataFrame format

        Returns
        -------
        type_loc_sent : str
            property type and location sentence of a property
        description : str
            description of a property
        """
        property_data = property_df.to_dict(orient='records')[0]
        property_type = property_data["property_type"]
        state = property_data["state"]
        city_part, neighborhood_part = self.get_unique_property_city_neighborhood(
            property_data)

        type_loc_sent = f"The {property_type} property is in the state: {state}{city_part}{neighborhood_part}"

        price_bed_bath_sent = self.get_unique_property_price_bed_bath(property_data, property_type)

        style_sent = self.get_unique_property_style(property_data)

        school_sent = self.get_school(property_df)

        feature_sent = self.get_unique_property_features(property_data)

        description = f"{price_bed_bath_sent} {style_sent} {feature_sent} {school_sent}"
        return type_loc_sent, description

    def get_multi_property_type_loc(self, df_collection):
        """
        Make the location and property type sentence of multi property collection
        Parameters
        ----------
        df_collection : DataFrame
            multi property collection in DataFrame format

        Returns
        -------
        type_loc_sent : str
            property type and location sentence of multi property collection

        """
        property_type = df_collection.property_type.unique()[0]
        state_part, city_part, neighborhood_part = self.get_locations_parts(
            df_collection)
        type_loc_sent = f"The {property_type} properties are in the {state_part}{city_part}" f"{neighborhood_part}"

        return type_loc_sent

    @staticmethod
    def get_multi_property_price(df_collection, prop_type):
        """
        Make price sentence of multi property collection
        Parameters
        ----------
        df_collection : DataFrame
            collection data in DataFrame format
        prop_type : str
            property type of the collection

        Returns
        -------
        price_sent : str
            price sentence of multi property collection
        """
        price_min = df_collection.price.min()
        price_max = df_collection.price.max()
        if price_min != price_max:
            price_range = f"{price_min} - {price_max}"
            price_sent = f"The {prop_type} are in price range {price_range}."
        else:
            price_sent = f"The {prop_type} cost {price_min}."
        return price_sent

    @staticmethod
    def get_multi_property_style(df_collection):
        """
        Make style sentence of multi property collection
        Parameters
        ----------
        df_collection : DataFrame
            collection data in DataFrame format

        Returns
        -------
        style_sent : str
            style sentence of multi property collection
        """
        homes_count = len(df_collection)
        common_styles_count = df_collection.exterior_style.value_counts()[
            df_collection.exterior_style.value_counts() > homes_count / 3]

        common_style = ", ".join(common_styles_count.keys())

        if common_style and (len(common_styles_count) > 1):
            style_sent = f"Most homes have {common_style} architectural styles. "
        elif common_style and (len(common_styles_count) == 1):
            style_sent = f"All homes have {common_style} architectural style. "
        else:
            style_sent = ""
        return style_sent

    @staticmethod
    def get_multi_property_features(df_collection):
        """
        Make feature sentence of multi property collection

        Parameters
        ----------
        df_collection : DataFrame
            collection data in DataFrame format

        Returns
        -------
        features_sent : str
            feature sentence of multi property collection
        """
        prop_count = len(df_collection)
        homes_features = df_collection.features.dropna()
        features_sent = ""
        if len(homes_features) > prop_count / 2:
            common_features_set = set.intersection(*map(set, homes_features))
            common_features = ', '.join(common_features_set)
            if common_features:
                features_sent = f"Most of them have {common_features}."
        return features_sent

    @staticmethod
    def get_multi_property_bed_bath(df_collection):
        """
        Make bedroom and bathroom sentence of multi property collection

        Parameters
        ----------
        df_collection : DataFrame
            collection data in DataFrame format

        Returns
        -------
        bed_bath_sent : str
            bedroom and bathroom sentence of multi property collection
        """
        bed_bath_sent = ""
        unique_bedrooms = df_collection.bedrooms.dropna().unique()
        if len(unique_bedrooms) > 0:
            bedrooms_min = unique_bedrooms.min()
            bedrooms_max = unique_bedrooms.max()
            if bedrooms_min != bedrooms_max:
                if bedrooms_min == 0 and bedrooms_max == 1:
                    bedrooms_range = f"up to {bedrooms_max} bedroom"
                elif bedrooms_min == 0 and bedrooms_max > 1:
                    bedrooms_range = f"up to {bedrooms_max} bedrooms"
                else:
                    bedrooms_range = f"{bedrooms_min} - {bedrooms_max} bedrooms"
            else:
                if bedrooms_min == 1:
                    bedrooms_range = f"{bedrooms_min} bedroom"
                else:
                    bedrooms_range = f"{bedrooms_min} bedrooms"
            bed_bath_sent = f"Each home has {bedrooms_range}"
        unique_bathrooms = df_collection.bathrooms.dropna().unique()
        if len(unique_bathrooms) > 0:
            bathrooms_min = unique_bathrooms.min()
            bathrooms_max = unique_bathrooms.max()
            if bathrooms_min != bathrooms_max:
                if bathrooms_min == 0 and bathrooms_max == 1:
                    bathrooms_range = f"up to {bathrooms_max} bathroom"
                elif bathrooms_min == 0 and bathrooms_max > 1:
                    bathrooms_range = f"up to {bathrooms_max} bathrooms"
                else:
                    bathrooms_range = f"{bathrooms_min} - {bathrooms_max} bathrooms"
            else:
                if bathrooms_min == 1:
                    bathrooms_range = f"{bathrooms_min} bathroom"
                else:
                    bathrooms_range = f"{bathrooms_min} bathrooms"
            if bed_bath_sent:
                bed_bath_sent += f", {bathrooms_range}"
            else:
                bed_bath_sent = f"Each home has {bathrooms_range}"
        if bed_bath_sent:
            bed_bath_sent += "."

        return bed_bath_sent

    def get_one_type_multi_property_information(self, df_collection):
        """
        Make the description of a property type multi property collection
        Parameters
        ----------
        df_collection : DataFrame
            collection information in DataFrame format

        Returns
        -------
        description : str
            description of a property type multi property collection
        """

        home_description = ""
        land_description = ""
        other_description = ""

        df_homes = df_collection[df_collection["home_type"].isin(HOMES)]
        if len(df_homes) == 1:
            home = df_homes.to_dict(orient='records')[0]
            price_bed_bath_sent = self.get_unique_property_price_bed_bath(
                home, home["home_type"])
            style_sent = self.get_unique_property_style(home)
            school_sent = self.get_school(df_homes)
            feature_sent = self.get_unique_property_features(
                home)
            home_description = f"{price_bed_bath_sent} {style_sent} {feature_sent} {school_sent}"
        elif len(df_homes) > 1:
            price_sent = self.get_multi_property_price(df_homes, "homes")
            bed_bath_sent = self.get_multi_property_bed_bath(df_homes)
            style_sent = self.get_multi_property_style(df_homes)
            feature_sent = self.get_multi_property_features(df_homes)
            school_sent = self.get_school(df_homes)
            home_description = f"{price_sent} {bed_bath_sent} {style_sent}{feature_sent} {school_sent}"

        df_lands = df_collection[df_collection["home_type"] == "Land"]
        if len(df_lands) == 1:
            land = df_lands.to_dict(orient='records')[0]
            price_sent = self.get_unique_property_price_bed_bath(land, land["home_type"])
            school_sent = self.get_school(df_lands)
            land_description = f"{price_sent} {school_sent}"
        elif len(df_lands) > 1:
            price_sent = self.get_multi_property_price(df_lands, "lands")
            school_sent = self.get_school(df_lands)
            land_description = f"{price_sent} {school_sent}"

        df_others = df_collection[df_collection["home_type"] == "Other"]
        if len(df_others) == 1:
            other = df_others.to_dict(orient='records')[0]
            price_sent = self.get_unique_property_price_bed_bath(
                other, "other property")
            school_sent = self.get_school(df_others)
            feature_sent = self.get_unique_property_features(
                other)
            other_description = f"{price_sent} {feature_sent} {school_sent}"
        elif len(df_others) > 1:
            price_sent = self.get_multi_property_price(df_others,
                                                       "others")
            feature_sent = self.get_multi_property_features(df_others)
            school_sent = self.get_school(df_others)
            other_description = f"{price_sent} {feature_sent} {school_sent}"

        description = f"{home_description} {land_description} {other_description}"
        return description

    def get_collection_prompt(self, df_collection):
        """
        Make ChatGPT prompt of mixed collection
        Parameters
        ----------
        df_collection : dataFrame
            data of a mixed collection

        Returns
        -------
        prompt : list
            prompt of a mixed collection
        collection_information : str
            description of mixed collection
        """

        df_sale = df_collection[df_collection["property_type"] == "sale"]
        df_rent = df_collection[df_collection["property_type"] == "rent"]
        sale_count = len(df_sale)
        rent_count = len(df_rent)
        sale_rent = []
        type_loc_sent_sale = ""
        sale_description = ""
        type_loc_sent_rent = ""
        rent_description = ""
        if sale_count == 1:
            sale_rent.append("1 property for sale")
            type_loc_sent_sale, sale_description = self.get_unique_property_information(df_sale)
        elif sale_count > 1:
            sale_rent.append(f"{sale_count} properties for sale")
            type_loc_sent_sale = self.get_multi_property_type_loc(
                df_sale)
            sale_description = self.get_one_type_multi_property_information(df_sale)

        if rent_count == 1:
            sale_rent.append("1 property for rent")
            type_loc_sent_rent, rent_description = self.get_unique_property_information(df_rent)
        elif rent_count > 1:
            sale_rent.append(f"{rent_count} properties for rent")
            type_loc_sent_rent = self.get_multi_property_type_loc(
                df_rent)
            rent_description = self.get_one_type_multi_property_information(
                df_rent)

        sale_rent_part = " and ".join(sale_rent)

        general_sent = f"The collection includes {sale_rent_part}."

        collection_information = f"{general_sent}/n{type_loc_sent_sale} {sale_description}/n{type_loc_sent_rent} " \
                                 f"{rent_description}"
        prompt = [
            {"role": "system",
             "content": "You are an experienced real estate content creator."},
            {"role": "user",
             "content": f"Given the information below, generate 7-8 sentences describing the collection of properties. "
                        f"Do not use any fair housing act violation, including information about families, race, sex, "
                        f"religion, and other sensitive content. "
                        f"Do not mention certain group of people like families, couples, singles, individuals. "
                        f"Use the $ symbol instead of the word dollars. {collection_information}"}]
        return prompt, collection_information

    def generate_invitation(self, collection_data, client_name, agent_name):
        """
        Generate invitation text

        Parameters
        ----------
        collection_data : list
            Properties data in collection
        client_name : str
            Client name
        agent_name : str
            Agent name

        Returns
        -------
        greeting : str
            Client greeting part of email
        system_introduction : str
            Introduction of system
        collection_information : str
            Collection information generated by ChatGPT
        closing : str
            Closing part of email
        full_text : str
            Email full text
        status_code : int
            status code of ChatGPT call
        message : str
            message of ChatGPT call
        """

        collection_information = ""
        status_code = 200
        message = "OK"
        greeting, system_introduction, closing, full_text = self.generate_empty_collection_invitation(client_name,
                                                                                                      agent_name)
        if collection_data:
            collection_data = [
                {**item.dict(), 'home_type': PHYSICAL_PROPERTY_TYPE_TO_HOME_TYPE.get(item.physical_property_type),
                 'physical_property_type': PHYSICAL_PROPERTY_TYPE_MAPPING.get(item.physical_property_type),
                 'property_type': PROPERTY_TYPE_MAPPING.get(item.property_type),
                 'exterior_style': item.exterior_style.name
                 if item.exterior_style and item.exterior_style.probability > STYLE_PROBABILITY_LIMIT else None}
                for item in collection_data
            ]
            df_collection = pd.DataFrame(collection_data)
            property_type_count = len(df_collection["property_type"].unique())
            home_type_count = len(df_collection["home_type"].unique())

            if len(df_collection) == 1:
                prompt, collection_information = self.get_unique_property_prompt(df_collection)
            elif property_type_count == 1 and home_type_count == 1:
                home_type = df_collection["home_type"].unique()[0]
                if home_type in HOMES:
                    prompt, collection_information = self.get_multi_home_collection_prompt(df_collection)
                else:
                    prompt, collection_information = self.get_non_home_collection_prompt(df_collection)
            else:
                prompt, collection_information = self.get_collection_prompt(df_collection)

            if prompt:
                try:
                    collection_information, status_code, message = func_timeout.func_timeout(
                        timeout=COLLECTION_INFORMATION_GENERATION_TIME, func=self.generate_text, args=(prompt,))
                except func_timeout.FunctionTimedOut:
                    pass

                full_text = f"{greeting}\n{system_introduction}\n{collection_information}\n{closing}"
        collection_information = re.sub(r'\s+', ' ', collection_information)
        collection_information = collection_information.strip()
        return greeting, system_introduction, collection_information, closing, full_text, status_code, message

    def generate_location_based_text(self, location, words_count):
        """
        Generate text for the given location

        Parameters
        ----------
        location : str
            a location in the USA
        words_count : int
            the approximate number of words in generated text

        Returns
        -------
        text_title : dict
            generated text and title
        status_code : int
            status_code of ChatGPT response
        message : str
            message of ChatGPT response
        """
        prompt = [
            {"role": "system",
             "content": "You are an experienced real estate content creator."},
            {"role": "user",
             "content": f"Please, generate {words_count} words text about {location}."
                        f"Mention why it can be an attractive place to live. You can also use historical facts related "
                        f"to the location."
                        f"Title the generated text. Don't use any zip, neighborhood, city and state in "
                        f"title. Return your answer in json format with text and title keys."}]
        text_title, status_code, message = self.generate_text(prompt)
        if text_title:
            text_title = ast.literal_eval(text_title)
            text = text_title["text"]
            while len(text) > LOCATION_TEXT_MAX_CHAR_COUNT:
                sentences = [sent.text for sent in nlp(text).sents][:-1]
                text = ' '.join(sentences)
            text_title["text"] = text
        return text_title, status_code, message

    async def generate_google_ad_copy(self, generation_params: GenerationParams,
                                      postprocessing_params: PostprocessingParams,
                                      home_data: GoogleAdCopyHomeData,
                                      important_features: List = None,
                                      unimportant_features: List = None,
                                      generated_adjectives: List = None,
                                      limited_data: bool = False) \
            -> Union[GenerateGoogleAdCopyHeadlines, None]:
        """
        Generate Google Ad copy for property advertisements.

        Parameters
        ----------
        generation_params: GenerationParams
            Schema for text generation params
        postprocessing_params: PostprocessingParams
            Schema for postprocessing params
        home_data: GoogleAdCopyHomeData
            Schema of a property data.
        important_features: List = None
            List of important features for specific property
        unimportant_features: List = None
            List of unimportant features for specific property
        generated_adjectives: List = None
            List of generated adjectives for specific property
        limited_data: bool = False
            Flag weather the data is limited or not

        Returns
        -------
        out : Union[GenerateGoogleAdCopyHeadlines, None]
        """
        mapped_subtype = SUBTYPE_MAPPING.get(home_data.subtype)
        mapped_property_type = PROPERTY_TYPE_MAPPING.get(home_data.property_type)
        walk_score_description = WALK_SCORE_DESCRIPTION_MAPPING.get(home_data.walk_score_description)
        bike_description = TRANSIT_DESCRIPTION_MAPPING.get(home_data.bike_description)
        transit_description = BIKE_DESCRIPTION_MAPPING.get(home_data.transit_description)
        if postprocessing_params.replace_popular_places:
            (home_data.state, home_data.city,
             home_data.neighborhood, home_data.remark) = await self.preprocess_data(state=home_data.state,
                                                                                    city=home_data.city,
                                                                                    neighborhood=home_data.neighborhood,
                                                                                    remark=home_data.remark)
        retries = 0
        if home_data.subtype == 11:
            prompts = [short_headline_prompt_for_multi_family.format(property_type=mapped_property_type,
                                                                     state=home_data.state,
                                                                     city=home_data.city,
                                                                     neighborhood=home_data.neighborhood,
                                                                     important_features=important_features,
                                                                     unimportant_features=unimportant_features,
                                                                     remark=home_data.remark),
                       description_prompt_for_multi_family.format(subtype=mapped_subtype,
                                                                  property_type=mapped_property_type,
                                                                  state=home_data.state,
                                                                  city=home_data.city,
                                                                  neighborhood=home_data.neighborhood,
                                                                  important_features=important_features,
                                                                  unimportant_features=unimportant_features,
                                                                  remark=home_data.remark),
                       long_headline_prompt_for_multi_family.format(subtype=mapped_subtype,
                                                                    property_type=mapped_property_type,
                                                                    state=home_data.state,
                                                                    city=home_data.city,
                                                                    neighborhood=home_data.neighborhood,
                                                                    important_features=important_features,
                                                                    unimportant_features=unimportant_features,
                                                                    remark=home_data.remark,
                                                                    walk_score_description=walk_score_description,
                                                                    bike_description=bike_description,
                                                                    transit_description=transit_description)]
        elif home_data.subtype == 9:
            if home_data.lot_size_area is None or home_data.lot_size_area == 0 or home_data.lot_size_area_unit is None:
                lot_size_area = None
            else:
                lot_size_area = f"{home_data.lot_size_area} {home_data.lot_size_area_unit}"
            prompts = [short_headline_prompt_for_land.format(subtype=mapped_subtype,
                                                             property_type=mapped_property_type,
                                                             state=home_data.state,
                                                             city=home_data.city,
                                                             neighborhood=home_data.neighborhood,
                                                             remark=home_data.remark),
                       description_prompt_for_land.format(subtype=mapped_subtype,
                                                          property_type=mapped_property_type,
                                                          state=home_data.state,
                                                          city=home_data.city,
                                                          neighborhood=home_data.neighborhood,
                                                          remark=home_data.remark,
                                                          lot_size_area=lot_size_area
                                                          ),
                       long_headline_prompt_for_land.format(subtype=mapped_subtype,
                                                            property_type=mapped_property_type,
                                                            state=home_data.state,
                                                            city=home_data.city,
                                                            neighborhood=home_data.neighborhood,
                                                            remark=home_data.remark,
                                                            walk_score_description=walk_score_description,
                                                            bike_description=bike_description,
                                                            transit_description=transit_description,
                                                            lot_size_area=lot_size_area)]
        elif home_data.state is None:
            prompts = [short_headline_without_state_prompt.format(subtype=mapped_subtype,
                                                                  property_type=mapped_property_type,
                                                                  city=home_data.city,
                                                                  neighborhood=home_data.neighborhood,
                                                                  important_features=important_features,
                                                                  unimportant_features=unimportant_features,
                                                                  generated_adjectives=generated_adjectives,
                                                                  remark=home_data.remark),
                       description_without_state_prompt.format(subtype=mapped_subtype,
                                                               property_type=mapped_property_type,
                                                               city=home_data.city,
                                                               neighborhood=home_data.neighborhood,
                                                               important_features=important_features,
                                                               unimportant_features=unimportant_features,
                                                               generated_adjectives=generated_adjectives,
                                                               remark=home_data.remark),
                       long_headline_without_state_prompt.format(subtype=mapped_subtype,
                                                                 property_type=mapped_property_type,
                                                                 city=home_data.city,
                                                                 neighborhood=home_data.neighborhood,
                                                                 bedrooms=home_data.bedrooms,
                                                                 bathrooms=home_data.bathrooms,
                                                                 important_features=important_features,
                                                                 unimportant_features=unimportant_features,
                                                                 generated_adjectives=generated_adjectives,
                                                                 remark=home_data.remark,
                                                                 walk_score_description=walk_score_description,
                                                                 bike_description=bike_description,
                                                                 transit_description=transit_description)]

        elif not limited_data:
            prompts = [short_headline_prompt.format(subtype=mapped_subtype,
                                                    property_type=mapped_property_type,
                                                    state=home_data.state,
                                                    city=home_data.city,
                                                    neighborhood=home_data.neighborhood,
                                                    important_features=important_features,
                                                    unimportant_features=unimportant_features,
                                                    generated_adjectives=generated_adjectives,
                                                    remark=home_data.remark),
                       description_prompt.format(subtype=mapped_subtype,
                                                 property_type=mapped_property_type,
                                                 state=home_data.state,
                                                 city=home_data.city,
                                                 neighborhood=home_data.neighborhood,
                                                 important_features=important_features,
                                                 unimportant_features=unimportant_features,
                                                 generated_adjectives=generated_adjectives,
                                                 remark=home_data.remark),
                       long_headline_prompt.format(subtype=mapped_subtype,
                                                   property_type=mapped_property_type,
                                                   state=home_data.state,
                                                   city=home_data.city,
                                                   neighborhood=home_data.neighborhood,
                                                   bedrooms=home_data.bedrooms,
                                                   bathrooms=home_data.bathrooms,
                                                   important_features=important_features,
                                                   unimportant_features=unimportant_features,
                                                   generated_adjectives=generated_adjectives,
                                                   remark=home_data.remark,
                                                   walk_score_description=walk_score_description,
                                                   bike_description=bike_description,
                                                   transit_description=transit_description)]
        else:
            prompts = [short_headline_limited_data_prompt.format(subtype=mapped_subtype,
                                                                 property_type=mapped_property_type,
                                                                 state=home_data.state,
                                                                 city=home_data.city,
                                                                 neighborhood=home_data.neighborhood,
                                                                 generated_adjectives=generated_adjectives),
                       description_limited_data_prompt.format(subtype=mapped_subtype,
                                                              property_type=mapped_property_type,
                                                              state=home_data.state,
                                                              city=home_data.city,
                                                              neighborhood=home_data.neighborhood,
                                                              generated_adjectives=generated_adjectives),
                       long_headline_limited_data_prompt.format(subtype=mapped_subtype,
                                                                property_type=mapped_property_type,
                                                                state=home_data.state,
                                                                city=home_data.city,
                                                                neighborhood=home_data.neighborhood,
                                                                generated_adjectives=generated_adjectives)]
        while retries < generation_params.max_retries:
            tasks = [self.async_generate_text(system_message=headline_generation_system_message, prompt=prompt,
                                              temperature=generation_params.temperature,
                                              timeout=15) for prompt in prompts]
            responses = await asyncio.gather(*tasks)
            if all(responses):
                processing_tasks = []
                for response, num_of_chars in zip(responses, CHARACTER_LIMITS):
                    processing_task = self.process_responses(response=response, num_of_chars=num_of_chars,
                                                             property_type=mapped_property_type,
                                                             postprocessing_params=postprocessing_params)
                    processing_tasks.append(processing_task)

                processed_responses = await asyncio.gather(*processing_tasks)
                if all(processed_responses):
                    final_headlines = \
                        {key: value for dictionary in processed_responses for key, value in dictionary.items()}
                else:
                    retries += 1
                    continue

                try:
                    headlines = GenerateGoogleAdCopyHeadlines(**final_headlines)
                    return headlines
                except Exception:
                    retries += 1
                    continue
            else:
                retries += 1
                continue
        raise GoogleAdCopyGenerationError('Maximum number of retries exceeded')

    async def process_responses(self, response: str, num_of_chars: int, property_type: str,
                                postprocessing_params: PostprocessingParams) -> Union[Dict, None]:
        """
        Process the responses received from a service.

        Parameters
        ----------
        response : str
            The response received from the service.
        num_of_chars: int
            Maximum number of characters
        property_type : str
            The type of property for processing.
        postprocessing_params: PostprocessingParams
            Schema for postprocessing params

        Returns
        -------
        Returns : Union[Dict, None]
        """
        try:
            response_dict = json.loads(response)
        except Exception:
            return None

        tasks = []
        for key, headlines in response_dict.items():
            task = self.postprocess_headlines(headlines=headlines, num_of_chars=num_of_chars,
                                              property_type=property_type,
                                              postprocessing_params=postprocessing_params)
            tasks.append(task)

        processed_headlines = await asyncio.gather(*tasks)
        return dict(zip(response_dict.keys(), processed_headlines))

    @staticmethod
    async def preprocess_data(state: str, city: Union[str, None],
                              neighborhood: Union[str, None],
                              remark: str, popular_places: List[str] = POPULAR_PLACES,
                              popular_places_states_mapping: Dict[str, str] = POPULAR_PLACES_STATES_MAPPING) -> (
            Tuple)[Union[str, None], Union[str, None], Union[str, None], Union[str, None]]:
        """
        Preprocess data (specifically location and remark) to go to the prompt for generating headlines

        Parameters
        ----------
        state : str
            State of the property
        city : Union[str, None]
            City of the property
        neighborhood : Union[str, None]
            Neighborhood of the property
        remark : str
            The remark of the property
        popular_places: List[str] = POPULAR_PLACES
            Popular places which do not need state
        popular_places_states_mapping: Dict[str, str] = POPULAR_PLACES_STATES_MAPPING
            Mapping that shows which popular place is in which state

        Returns
        -------
        out : Tuple[Union[str, None], Union[str, None], Union[str, None], Union[str, None]]
            A tuple containing preprocessed state, city, neighborhood and remark values
        """

        if neighborhood is not None and neighborhood.lower() in map(str.lower, popular_places):
            if remark is not None:
                state_full_name = popular_places_states_mapping.get(neighborhood.lower())
                remark = re.sub(r'\b' + re.escape(state_full_name) + r'\b', '', remark, flags=re.IGNORECASE)
                remark = re.sub(r'\b' + re.escape(state) + r'\b', '', remark, flags=re.IGNORECASE)
            if city and remark is not None:
                remark = re.sub(r'\b' + re.escape(city) + r'\b', '', remark, flags=re.IGNORECASE)
            return None, None, neighborhood, remark

        elif city is not None and city.lower() in map(str.lower, popular_places):
            if remark is not None:
                state_full_name = popular_places_states_mapping.get(city.lower())
                remark = re.sub(r'\b' + re.escape(state_full_name) + r'\b', '', remark, flags=re.IGNORECASE)
                remark = re.sub(r'\b' + re.escape(state) + r'\b', '', remark, flags=re.IGNORECASE)
            if neighborhood and remark is not None:
                remark = re.sub(r'\b' + re.escape(neighborhood) + r'\b', '', remark, flags=re.IGNORECASE)
            return None, city, None, remark

        return state, city, neighborhood, remark

    async def postprocess_headlines(self, headlines: List[str], num_of_chars: int,
                                    property_type: str, postprocessing_params: PostprocessingParams) -> Tuple:
        """
        Postprocessing of headlines

        Parameters
        ----------
        headlines : List[str]
            Headlines that need to post-processed or not.
        num_of_chars: int
            Maximum number of characters
        property_type : str
            The Property Type
        postprocessing_params: PostprocessingParams
            Schema for postprocessing params

        Returns
        -------
        out : Tuple
        """
        tasks = []
        for headline in headlines:
            task = self.postprocess_single_headline(headline=headline,
                                                    num_of_chars=num_of_chars,
                                                    property_type=property_type,
                                                    exclude_symbols=postprocessing_params.exclude_symbols,
                                                    paraphrase_for_char_limit=postprocessing_params.paraphrase_for_char_limit,
                                                    ensure_title_case=postprocessing_params.ensure_title_case,
                                                    replace_ba_br=postprocessing_params.replace_ba_br,
                                                    replace_sf=postprocessing_params.replace_sf)
            tasks.append(task)

        post_processed_headlines = await asyncio.gather(*tasks)
        return post_processed_headlines

    async def postprocess_single_headline(self, headline: str, num_of_chars: int, property_type: str,
                                          exclude_symbols: bool,
                                          paraphrase_for_char_limit: bool, ensure_title_case: bool,
                                          replace_ba_br: bool, replace_sf: bool) -> str:
        """
        Post process single headline

        Parameters
        ----------
        headline : str
            input headline
        num_of_chars: int
            Maximum number of characters
        property_type : str
            identifier of property type
        exclude_symbols: bool
            Boolean indicating weather to replace/exclude symbols like ?! or not.
        paraphrase_for_char_limit: bool
            Boolean indicating weather to paraphrase sentences depending on char limit or not.
        ensure_title_case : bool
            True for returning titlecase
        replace_ba_br : bool
            True to replace bathrooms with BA, bedrooms with BR
        replace_sf : bool
            True to replace bathrooms square feet

        Returns
        -------
        headline : str
            result
        """
        if paraphrase_for_char_limit:
            headline = await self.paraphrase_char_limit_exceeded_sentences(num_of_chars=num_of_chars,
                                                                           headline=headline,
                                                                           max_recursion_depth=2)

        headline = await self.paraphrase_violations(headline=headline)

        headline = await self.validate_property_type(headline=headline, property_type=property_type)

        if exclude_symbols:
            headline = await self.exclude_symbols(headline=headline)

        if ensure_title_case:
            headline = await self.headline_capitalize(sentence=headline)
        if replace_ba_br:
            headline = await self.replace_with_ba_br(text=headline)
        if replace_sf:
            headline = await self.replace_with_sf(text=headline)
        return headline

    async def paraphrase_char_limit_exceeded_sentences(self, num_of_chars: int, headline: str,
                                                       max_recursion_depth: Optional[int] = 2) -> str:
        """
        Recursively paraphrase sentences where character limit is exceeded.

        Parameters
        ----------
        num_of_chars: int
            Maximum number of characters
        headline : str
            input headline
        max_recursion_depth : Optional[int] = 5
            maximum recursion depth

        Returns
        -------
        headline : str
            result
        """
        if max_recursion_depth <= 0:
            return headline
        if len(headline) > num_of_chars:
            response = await self.async_generate_text(
                system_message=chars_paraphrasing_system_message,
                prompt=chars_paraphrasing_prompt.format(num_of_chars=num_of_chars,
                                                        sentence=headline),
                temperature=0.5,
                timeout=15)
            try:
                paraphrased_headline_dict = json.loads(response)
                headline = paraphrased_headline_dict["paraphrased_sentence"]
                headline = await self.paraphrase_char_limit_exceeded_sentences(num_of_chars=num_of_chars,
                                                                               headline=headline,
                                                                               max_recursion_depth=
                                                                               max_recursion_depth - 1)
            except Exception:
                raise FailedToParaphraseViolations(
                    'Failed to appropriately paraphrase headlines that exceed character limit.')
        return headline

    async def paraphrase_violations(self, headline: str,
                                    max_recursion_depth: Optional[int] = 2) -> str:
        """
        Recursively paraphrase violations if there are any

        Parameters
        ----------
        headline : str
            input headline
        max_recursion_depth : Optional[int] = 5
            maximum recursion depth

        Returns
        -------
        headline : str
            result
        """
        if max_recursion_depth <= 0:
            return headline

        status_of_fhv, fhv_violation = await self.validate_on_fhv(headline)
        if status_of_fhv:
            response = await self.async_generate_text(
                system_message=violation_paraphrasing_system_message,
                prompt=violation_paraphrasing_prompt.format(sentence=headline, fhv_violation=fhv_violation),
                temperature=0.5,
                timeout=15)
            try:
                paraphrased_headline_dict = json.loads(response)
                headline = paraphrased_headline_dict["paraphrased_sentence"]
                headline = await self.paraphrase_violations(headline=headline,
                                                            max_recursion_depth=max_recursion_depth - 1)
            except Exception:
                raise FailedToParaphraseViolations('Failed to appropriately paraphrase violating headlines.')
        return headline

    @staticmethod
    async def validate_on_fhv(headline: str) -> Tuple[bool, Union[str, None]]:
        """
        Passes a sentence to Fair Housing Validator and returns a flag

        Parameters
        ----------
        headline : str
            Sentence that needs to go through Fair Housing Validator

        Returns
        -------
        out : Tuple[bool, Union[str, None]]
            If no violation returns False, otherwise True
        """
        try:
            async with httpx.AsyncClient() as client:
                response_fhv = await client.post(url=secrets.FHV_PREDICT_ENDPOINT,
                                                 json={"text": headline.lower(), "top_k": 1},
                                                 timeout=10)
                response_fhv.raise_for_status()
            fhv_json = response_fhv.json()["sentence_predictions"]
            fhv_labels_lst = [fhv_json[i]["class_probs"] for i in range(len(fhv_json))]

        except Exception:
            raise FHValidationError('Fair Housing Validator Failed to Validate')

        for response in fhv_labels_lst:
            if list(response.keys())[0] != "no_violation":
                return True, list(response.keys())[0]
        return False, None

    @staticmethod
    async def validate_property_type(headline: str, property_type: str) -> str:
        """
        Function to validate sentences for property types (sale, rent)

        Parameters
        ----------
        headline : str
            Sentence to be validated for property type
        property_type : str
            The property type

        Returns
        -------
        out : str
        """
        if property_type == "sale":
            modified_sentence = re.sub(pattern=r'\b(rent|lease)\b', repl='sale', string=headline, flags=re.IGNORECASE)
            modified_sentence = re.sub(pattern=r'\brental\b', repl='residential', string=modified_sentence,
                                       flags=re.IGNORECASE)
            return modified_sentence
        elif property_type == "rent":
            modified_sentence = re.sub(pattern=r'\b(sale|buy)\b', repl='rent', string=headline, flags=re.IGNORECASE)
            modified_sentence = re.sub(pattern=r'\bresidential\b', repl='rental', string=modified_sentence,
                                       flags=re.IGNORECASE)
            return modified_sentence

    @staticmethod
    async def exclude_symbols(headline: str) -> str:
        """
        Modify a headline by replacing certain punctuation, removing specific
        characters, normalizing whitespace, and converting to lowercase.

        Parameters
        ----------
        headline : str
            The input headline to be modified.

        Returns
        -------
        out : str
            The modified headline.
        """
        modified_headline = headline.replace('!', '.').replace('?', '.')

        modified_headline = re.sub(r'\b - \b|\b -\b|\b- \b', ' ', modified_headline)

        characters_to_remove = ['@', '#', '^', '*']
        for char in characters_to_remove:
            modified_headline = modified_headline.replace(char, '')

        modified_headline = re.sub(r'\s+', ' ', modified_headline).strip().lower()

        return modified_headline

    @staticmethod
    async def headline_capitalize(sentence: str,
                                  lower_case_exceptions: Optional[List[str]] = LOWER_CASE_EXCEPTIONS,
                                  title_case_exceptions: Optional[List[str]] = TITLE_CASE_EXCEPTIONS) -> str:
        """
        Capitalize the headline-style sentence, excluding specified exceptions.

        Parameters
        ----------
        sentence : str
            The headline-style sentence to be capitalized.
        lower_case_exceptions : Optional[List[str]] = TITLE_CASE_EXCEPTIONS
            A list of lower case exceptions
        title_case_exceptions : Optional[List[str]] = TITLE_CASE_EXCEPTIONS
            A list of title case exceptions

        Returns
        -------
        out : str
        """
        title_case_words = [word.capitalize() for word in sentence.split(' ')]

        text = ' '.join(title_case_words)

        for exception in lower_case_exceptions:
            pattern = re.compile(r'\b(' + re.escape(exception) + r')\b', flags=re.IGNORECASE)
            text = pattern.sub(exception, text)

        text = text.replace("W/", "w/")
        text = re.sub(r'\bmust see\b', 'Must-see', text, flags=re.IGNORECASE)

        words = text.strip().split(' ', 1)

        if len(words) > 0:
            words[0] = words[0].capitalize()
        text = ' '.join(words)

        for exception in title_case_exceptions:
            pattern = re.compile(r'\b(' + re.escape(exception) + r')\b', flags=re.IGNORECASE)
            text = pattern.sub(exception, text)

        sentences = text.split('. ')
        for i in range(1, len(sentences)):
            if not sentences[i - 1].strip().endswith(('sf', 'sqft')):
                sentences[i] = sentences[i][0].upper() + sentences[i][1:]
        text = '. '.join(sentences)

        sentences = text.split('/')
        for i in range(1, len(sentences)):
            sentences[i] = sentences[i][0].upper() + sentences[i][1:]
        text = '/'.join(sentences)

        if text.endswith('.'):
            text = text[:-1]

        return text

    @staticmethod
    async def replace_with_ba_br(text: str) -> str:
        """
        Replace occurrences of bathroom, bedroom in the input text with standardized abbreviations.

        Parameters
        ----------
        text : str
            The input text to be processed.

        Returns
        -------
        out : str
        """
        bedroom_pattern = (
            re.compile(pattern=r'\b(\d+)\s*-?\s*(bed[-\s]*rooms?|brs?|beds?|bdrms?|bds?)\b', flags=re.IGNORECASE))
        text = bedroom_pattern.sub(repl=r'\1BR', string=text)

        bathroom_pattern = (
            re.compile(pattern=r'\b(\d+)\s*-?\s*(bath[-\s]*rooms?|bas?|baths?|bthrms?)\b', flags=re.IGNORECASE))
        text = bathroom_pattern.sub(repl=r'\1BA', string=text)

        comma_pattern = (
            re.compile(pattern=r'\b(\d*)(BA|BR)\s*,\s*(\d*)\s*(BA|BR)\b', flags=re.IGNORECASE))
        text = comma_pattern.sub(replace_pattern, text)

        slash_pattern = (
            re.compile(pattern=r'\b(\d*)(BA|BR)\s*/\s*(\d*)\s*(BA|BR)\b', flags=re.IGNORECASE))
        text = slash_pattern.sub(replace_pattern, text)

        space_pattern = (
            re.compile(pattern=r'\b(\d*)(BA|BR)\s*(\d*)\s*(BA|BR)\b', flags=re.IGNORECASE))
        text = space_pattern.sub(replace_pattern, text)

        text = re.sub(r'(\d)(BA|BR)', r'\1 \2', text)

        return text

    @staticmethod
    async def replace_with_sf(text: str) -> str:
        """
        Replace occurrences of square feet in the input text with standardized abbreviations.

        Parameters
        ----------
        text : str
            The input text to be processed.

        Returns
        -------
        out : str
        """
        square_feet_pattern_with_digit = re.compile(
            pattern=r'\b(\d+)\s*-?\s*(square[-\s]*feet|square[-\s]*foot|sf|sqft|sq[-\s]*ft)\b',
            flags=re.IGNORECASE)
        text = square_feet_pattern_with_digit.sub(r'\1sf.', text)

        square_feet_pattern = re.compile(
            pattern=r'\b(square[-\s]*feet|square[-\s]*foot|sf|sqft|sq[-\s]*ft)\b',
            flags=re.IGNORECASE)
        text = square_feet_pattern.sub(r'sf.', text)

        square_feet_with_dot_pattern = re.compile(
            pattern=r'\b(\d+)\s*-?\s*(sq\.\s*ft\.)',
            flags=re.IGNORECASE)
        text = square_feet_with_dot_pattern.sub(r'\1sf.', text)

        text = text.replace("..", ".")

        return text

__init__(key, model, examples, features_ranking)

Initiate key, model, tokenizer of ChatGPT and greetings, system introductions, closings texts, tokenizer

Source code in app/handlers.py
def __init__(self, key, model, examples, features_ranking):
    """
    Initiate key, model, tokenizer of ChatGPT and greetings, system introductions, closings texts, tokenizer
    """
    self.key = key
    self.model = model
    self.examples = examples
    self.features_ranking = features_ranking
    self.greetings = self.examples["greetings"]
    self.system_introductions = self.examples["system_introductions"]
    self.closings = self.examples["closings"]
    self.tokenizer = tiktoken.encoding_for_model(self.model)

count_tokens(text)

Count text tokens

Parameters:

Name Type Description Default
text

remarks

required

Returns:

Name Type Description
tokens_count int

count of tokens

Source code in app/handlers.py
def count_tokens(self, text):
    """
    Count text tokens

    Parameters
    ----------
    text: str
        remarks

    Returns
    -------
    tokens_count : int
        count of tokens
    """
    tokens = self.tokenizer.encode(text)
    tokens_count = len(tokens)
    return tokens_count

introduce_system()

Generate the introduction of the system

Returns:

Name Type Description
system_introductions str

email system introduction text

Source code in app/handlers.py
def introduce_system(self):
    """
    Generate the introduction of the system

    Returns
    -------
    system_introductions : str
        email system introduction text
    """
    system_introductions = choice(self.system_introductions)
    return system_introductions

close_email(agent_name)

Generate the closing part of email

Parameters:

Name Type Description Default
agent_name str

agent name

required

Returns:

Name Type Description
email_closing str

email closing part

Source code in app/handlers.py
def close_email(self, agent_name):
    """
    Generate the closing part of email

    Parameters
    ----------
    agent_name : str
        agent name

    Returns
    -------
    email_closing : str
        email closing part
    """
    closing = choice(self.closings)
    email_closing = f"{closing}\n{agent_name}"
    return email_closing

generate_empty_collection_invitation(client_name, agent_name)

Generate invitation for empty collection

Parameters:

Name Type Description Default
client_name str

name of client

required
agent_name str

name of agent

required

Returns:

Name Type Description
greeting str

Client greeting part of email

system_introduction str

Introduction of system

closing str

Closing part of email

full_text str

Email full text

Source code in app/handlers.py
def generate_empty_collection_invitation(self, client_name, agent_name):
    """
    Generate invitation for empty collection

    Parameters
    ----------
    client_name : str
        name of client
    agent_name : str
        name of agent

    Returns
    -------
    greeting : str
        Client greeting part of email
    system_introduction : str
        Introduction of system
    closing : str
        Closing part of email
    full_text : str
        Email full text
    """
    greeting = f"{choice(self.greetings)} {client_name},"
    system_introduction = self.introduce_system()
    closing = self.close_email(agent_name)
    full_text = f"{greeting}\n{system_introduction}\n{closing}"
    return greeting, system_introduction, closing, full_text

generate_text(prompt)

Generate text for the given prompt using ChatGPT

Parameters:

Name Type Description Default
prompt list

prompt for ChatGPT

required

Returns:

Name Type Description
text_title json

generated text and title

status_code int

status_code of ChatGPT response

message str

message of ChatGPT response

Source code in app/handlers.py
def generate_text(self, prompt):
    """
    Generate text for the given prompt using ChatGPT

    Parameters
    ----------
    prompt : list
        prompt for ChatGPT

    Returns
    -------
    text_title : json
        generated text and title
    status_code : int
        status_code of ChatGPT response
    message : str
        message of ChatGPT response
    """
    try:
        openai.api_key = self.key
        response = openai.ChatCompletion.create(model=self.model, messages=prompt)
        generated_text = response.choices[0].message["content"]
        generated_text = generated_text[0].capitalize() + generated_text[1:]
        message = "OK"
        status_code = 200
    except openai.error.APIError as error:
        generated_text = None
        message = error.message,
        status_code = error.status
    return generated_text, status_code, message

async_generate_text(system_message, prompt, temperature, timeout) async

Asynchronously generate text for the given prompt using ChatGPT.

Parameters:

Name Type Description Default
system_message List[Dict]

The message to help generate text

required
prompt List[Dict[str, Union[str, Any]]

Prompt for ChatGPT, structured as a list of message dictionaries.

required
temperature float

Temperature controls the randomness of the text that GPT generates

required
timeout int

Timeout parameter which decides when to throw a timeout error

required

Returns:

Name Type Description
out str
Source code in app/handlers.py
async def async_generate_text(self, system_message: List[Dict],
                              prompt: str, temperature: float,
                              timeout: int) -> str:
    """
    Asynchronously generate text for the given prompt using ChatGPT.

    Parameters
    ----------
    system_message: List[Dict]
        The message to help generate text
    prompt : List[Dict[str, Union[str, Any]]
        Prompt for ChatGPT, structured as a list of message dictionaries.
    temperature : float
        Temperature controls the randomness of the text that GPT generates
    timeout : int
        Timeout parameter which decides when to throw a timeout error

    Returns
    -------
    out : str
    """
    try:
        openai.api_key = self.key
        response = await openai.ChatCompletion.acreate(model=self.model,
                                                       messages=make_prompt(system_message, prompt),
                                                       response_format={"type": "json_object"},
                                                       temperature=temperature,
                                                       timeout=timeout,
                                                       frequency_penalty=1,
                                                       presence_penalty=1)
        generated_text = response.choices[0].message["content"]
    except openai.error.APIError as error:
        raise Exception(f"API Error: {error.message}, Status Code: {error.status}")
    return generated_text

get_locations_parts(df_collection) staticmethod

Generate the state city and neighborhood parts

Parameters:

Name Type Description Default
df_collection DataFrame

collection data

required

Returns:

Name Type Description
state_part str

state part of general sentence in prompt

city_part str

city part of general sentence in prompt

neighborhood_part str

neighborhood part of general sentence in prompt

Source code in app/handlers.py
@staticmethod
def get_locations_parts(df_collection):
    """
    Generate the state city and neighborhood parts

    Parameters
    ----------
    df_collection : DataFrame
        collection data

    Returns
    -------
    state_part : str
        state part of general sentence in prompt
    city_part : str
        city part of general sentence in prompt
    neighborhood_part : str
        neighborhood part of general sentence in prompt
    """
    unique_states = df_collection.state.dropna().unique()
    state = ', '.join(unique_states)
    if len(unique_states) == 1:
        state_part = f"state: {state}"
    else:
        state_part = f"states: {state}"
    unique_cities = df_collection.city.dropna().unique()
    neighborhood_part = ""
    if (len(unique_cities) > 0) and (len(unique_cities) <= 3):
        city = ', '.join(unique_cities)
        if len(unique_cities) == 1:
            city_part = f", city: {city}"
        else:
            city_part = f", cities: {city}"
        unique_neighborhoods = df_collection.neighborhood.dropna().unique()
        if (len(unique_neighborhoods) > 0) and (len(unique_neighborhoods) <= 3):
            neighborhood = ', '.join(unique_neighborhoods)
            if len(unique_neighborhoods) == 1:
                neighborhood_part = f", neighborhood: {neighborhood}."
            else:
                neighborhood_part = f", neighborhoods: {neighborhood}."
        else:
            neighborhood_part = "."
    else:
        city_part = "."
    return state_part, city_part, neighborhood_part

get_price_sentence(df_collection) staticmethod

Generate the price sentence of prompt.

Parameters:

Name Type Description Default
df_collection DataFrame

collection data

required

Returns:

Name Type Description
price_sent str

sentence about the prices of collection properties

Source code in app/handlers.py
@staticmethod
def get_price_sentence(df_collection):
    """
    Generate the price sentence of prompt.

    Parameters
    ----------
    df_collection : DataFrame
        collection data

    Returns
    -------
    price_sent : str
        sentence about the prices of collection properties
    """
    price_min = df_collection.price.min()
    price_max = df_collection.price.max()
    if len(df_collection) == 1:
        price_sent = f"The property costs {price_min}."
    else:
        if price_min != price_max:
            price_range = f"{price_min} - {price_max}"
            price_sent = f"Properties are in price range {price_range}."
        else:
            price_sent = f"Properties cost {price_min}."
    return price_sent

get_non_home_collection_prompt(df_collection)

Make ChatGPT prompt for non_home collection

Parameters:

Name Type Description Default
df_collection DataFrame

collection's homes data in DataFrame format

required

Returns:

Name Type Description
prompt list

prompt for non_home collection

collection_information str

general information of not home collection

Source code in app/handlers.py
def get_non_home_collection_prompt(self, df_collection):
    """
    Make ChatGPT prompt for non_home collection

    Parameters
    ----------
    df_collection : DataFrame
        collection's homes data in DataFrame format

    Returns
    -------
    prompt : list
        prompt for non_home collection
    collection_information : str
        general information of not home collection
    """
    prompt = None
    property_type = df_collection["property_type"].unique()[0]
    home_type = df_collection["home_type"].unique()[0]
    if home_type == "Other":
        home_type = "properties"
    state_part, city_part, neighborhood_part = self.get_locations_parts(df_collection)

    general_sent = f"The collection includes properties for {property_type} in the " \
                   f"{state_part}{city_part}{neighborhood_part}"

    price_sent = self.get_price_sentence(df_collection)

    school_sent = self.get_school(df_collection)

    collection_information = f"{general_sent} {price_sent} {school_sent}"
    all_remarks_none = all(df_collection["remark"].isna())
    if not all_remarks_none:
        i = len(df_collection)
        remarks = '\n\n\n '.join(df_collection.iloc[:i].remark.dropna())
        while self.count_tokens(remarks) > REMARKS_CHAR_MAX_COUNT_IN_PROMPT:
            i -= 1
            remarks = '\n\n\n '.join(df_collection.iloc[:i].remarks.dropna())
        prompt = [
            {"role": "system",
             "content": "You are an experienced real estate content creator."},
            {"role": "user",
             "content": f"Given the information below, generate 3-4 sentences describing the collection of "
                        f"{home_type}. Do not use any fair housing act violation, including information "
                        f"about families, race, sex, religion, and other sensitive content. "
                        f"Do not mention certain group of people like families, couples, singles, individuals. "
                        f"Use the $ symbol instead of the word dollars. "
                        f"Summarize location based common features, sizes and common usage of all collection. "
                        f"The remarks are separated by\n\n\n{collection_information}\n{remarks}"}]

    return prompt, collection_information

get_multi_home_collection_prompt(df_collection)

Make ChatGPT prompt for collection of multiple homes

Parameters:

Name Type Description Default
df_collection DataFrame

collection's homes data in DataFrame format

required

Returns:

Name Type Description
prompt list

prompt for the multi home collection

collection_information str

general information of multi home collection

Source code in app/handlers.py
def get_multi_home_collection_prompt(self, df_collection):
    """
    Make ChatGPT prompt for collection of multiple homes

    Parameters
    ----------
    df_collection : DataFrame
        collection's homes data in DataFrame format

    Returns
    -------
    prompt : list
        prompt for the multi home collection
    collection_information : str
        general information of multi home collection
    """
    property_type = df_collection["property_type"].unique()[0]
    state_part, city_part, neighborhood_part = self.get_locations_parts(df_collection)
    subtype = ', '.join(df_collection.physical_property_type.unique())

    general_sent = f"The collection includes {subtype} homes for {property_type} in the " \
                   f"{state_part}{city_part}{neighborhood_part}"

    price_sent = self.get_multi_property_price(df_collection, "homes")
    bed_bath_sent = self.get_multi_property_bed_bath(df_collection)

    style_sent = self.get_multi_property_style(df_collection)
    feature_sent = self.get_multi_property_features(df_collection)
    school_sent = self.get_school(df_collection)
    collection_information = f"{general_sent} {price_sent} {bed_bath_sent} {style_sent} {school_sent} " \
                             f"{feature_sent}"

    prompt = [
        {"role": "system",
         "content": "You are an experienced real estate content creator."},
        {"role": "user",
         "content": f"Given the information below, generate 3-4 sentences describing the collection of homes. "
                    f"Do not use any fair housing act violation, including information about families, race, sex, "
                    f"religion, and other sensitive content. "
                    f"Do not mention certain group of people like families, couples, singles, individuals. "
                    f"Use the $ symbol instead of the word dollars. {collection_information}"}]
    return prompt, collection_information

get_unique_property_prompt(df_collection)

Make ChatGPT prompt for collection of unique property

Parameters:

Name Type Description Default
df_collection DataFrame

collection's homes data in DataFrame format

required

Returns:

Name Type Description
prompt list

prompt for the unique property collection

collection_information str

general information of unique property collection

Source code in app/handlers.py
def get_unique_property_prompt(self, df_collection):
    """
    Make ChatGPT prompt for collection of unique property

    Parameters
    ----------
    df_collection : DataFrame
        collection's homes data in DataFrame format

    Returns
    -------
    prompt : list
        prompt for the unique property collection
    collection_information : str
        general information of unique property collection
    """
    property_data = df_collection.to_dict(orient='records')[0]
    if property_data["home_type"] == "Other":
        prop_type = "property"
    else:
        prop_type = property_data["home_type"]
    property_type = property_data["property_type"]
    state = property_data["state"]
    city_part, neighborhood_part = self.get_unique_property_city_neighborhood(
        property_data)

    general_sent = f"The collection is a {prop_type} for {property_type} in the " \
                   f"state: {state}{city_part}{neighborhood_part}"

    price_bed_bath_sent = self.get_unique_property_price_bed_bath(
        property_data, prop_type)
    style_sent = self.get_unique_property_style(property_data)
    school_sent = self.get_school(df_collection)

    collection_information = f"{general_sent} {price_bed_bath_sent} {style_sent} {school_sent}"
    remark = property_data["remark"]
    if remark:
        remark_part = f"It also has the following remark: {remark}"
    else:
        remark_part = ""
    prompt = [
        {"role": "system",
         "content": "You are an experienced real estate content creator."},
        {"role": "user",
         "content": f"Given the information below, generate 3-4 sentences describing the collection of a "
                    f"{prop_type}. Do not use any fair housing act violation, including information about families,"
                    f" race, sex, religion, and other sensitive content. "
                    f"Do not mention certain group of people like families, couples, singles, individuals. "
                    f"Use the $ symbol instead of the word dollars. {collection_information} {remark_part}"}]
    return prompt, collection_information

get_unique_property_price_bed_bath(property_data, prop_type) staticmethod

Make unique property collection information price bedroom and bathroom part

Parameters:

Name Type Description Default
property_data dict

the data of a property

required
prop_type str

the home type of the property

required

Returns:

Name Type Description
price_bed_bath_sent str

price, bedroom and bathroom part of unique property collection

Source code in app/handlers.py
@staticmethod
def get_unique_property_price_bed_bath(property_data, prop_type):
    """
    Make unique property collection information price bedroom and bathroom part

    Parameters
    ----------
    property_data : dict
        the data of a property
    prop_type : str
        the home type of the property
    Returns
    -------
    price_bed_bath_sent : str
        price, bedroom and bathroom part of unique property collection
    """

    price = property_data["price"]
    price_bed_bath_sent = f"The {prop_type} costs {price}"

    if prop_type in HOMES:
        bedrooms = property_data["bedrooms"]
        bathrooms = property_data["bathrooms"]

        if bedrooms and (bedrooms >= 1):
            if bedrooms == 1:
                price_bed_bath_sent += " has 1 bedroom"
            else:
                price_bed_bath_sent += f" has {bedrooms} bedrooms"
            if bathrooms and (bathrooms >= 1):
                if bathrooms == 1:
                    price_bed_bath_sent += f" and {bathrooms} bathroom"
                else:
                    price_bed_bath_sent += f" and {bathrooms} bathrooms"
        else:
            if bathrooms and bathrooms >= 1:
                if bathrooms == 1:
                    price_bed_bath_sent += f" has 1 bathroom"
                else:
                    price_bed_bath_sent += f" has {bathrooms} bathrooms"
    price_bed_bath_sent += "."
    return price_bed_bath_sent

get_unique_property_style(property_data) staticmethod

Make unique property collection information style part

Parameters:

Name Type Description Default
property_data dict

the data of a property

required

Returns:

Name Type Description
style_sent str

style part of a unique property collection

Source code in app/handlers.py
@staticmethod
def get_unique_property_style(property_data):
    """
    Make unique property collection information style part

    Parameters
    ----------
    property_data : dict
        the data of a property

    Returns
    -------
    style_sent : str
        style part of a unique property collection
    """
    exterior_style = property_data["exterior_style"]
    style_sent = ""
    if property_data["home_type"] in HOMES and exterior_style:
        style_sent = f"Home has {exterior_style} architectural style."
    return style_sent

get_school(df_collection) staticmethod

Make the given collection school part

Parameters:

Name Type Description Default
df_collection DataFrame

collection data in Dataframe format

required

Returns:

Name Type Description
school_sent str

school part of a given collection

Source code in app/handlers.py
@staticmethod
def get_school(df_collection):
    """
    Make the given collection school part
    Parameters
    ----------
    df_collection : DataFrame
        collection data in Dataframe format

    Returns
    -------
    school_sent : str
        school part of a given collection
    """
    prop_count = len(df_collection)
    school_sent = ""
    schools = []
    if sum(df_collection["private_school"]) > prop_count / 2:
        schools.append("private")
    if sum(df_collection["public_school"]) > prop_count / 2:
        schools.append("public")
    if len(schools) > 0:
        school_part = " and ".join(schools)
        school_sent = f"There are {school_part} schools nearby."
    return school_sent

get_unique_property_city_neighborhood(property_data) staticmethod

Make the unique property collection city and neighborhood part

Parameters:

Name Type Description Default
property_data dict

the data of a property

required

Returns:

Name Type Description
city_part str

city part of unique property collection

neighborhood_part str

neighborhood part of unique property collection

Source code in app/handlers.py
@staticmethod
def get_unique_property_city_neighborhood(property_data):
    """
    Make the unique property collection city and neighborhood part
    Parameters
    ----------
    property_data : dict
        the data of a property

    Returns
    -------
    city_part : str
        city part of unique property collection
    neighborhood_part : str
        neighborhood part of unique property collection
    """
    neighborhood_part = ""
    city = property_data["city"]
    neighborhood = property_data["neighborhood"]
    if city:
        city_part = f", city: {city}"
        if neighborhood:
            neighborhood_part = f", neighborhood: {neighborhood}."
        else:
            neighborhood_part = "."
    else:
        city_part = "."
    return city_part, neighborhood_part

get_unique_property_features(property_data) staticmethod

Make the unique property collection features

Parameters:

Name Type Description Default
property_data dict

the data of a property

required

Returns:

Name Type Description
feature_sent str

feature sentence of unique property collection

Source code in app/handlers.py
@staticmethod
def get_unique_property_features(property_data):
    """
    Make the unique property collection features
    Parameters
    ----------
    property_data : dict
        the data of a property

    Returns
    -------
    feature_sent : str
        feature sentence of unique property collection
    """
    features = property_data["features"]
    feature_sent = ""
    if features:
        feature_part = ", ".join(features)
        feature_sent = f"It has {feature_part}."
    return feature_sent

get_unique_property_information(property_df)

Make the general sentence and description of unique property

Parameters:

Name Type Description Default
property_df DataFrame

the data od a property in DataFrame format

required

Returns:

Name Type Description
type_loc_sent str

property type and location sentence of a property

description str

description of a property

Source code in app/handlers.py
def get_unique_property_information(self, property_df):
    """
    Make the general sentence and description of unique property
    Parameters
    ----------
    property_df : DataFrame
        the data od a property in DataFrame format

    Returns
    -------
    type_loc_sent : str
        property type and location sentence of a property
    description : str
        description of a property
    """
    property_data = property_df.to_dict(orient='records')[0]
    property_type = property_data["property_type"]
    state = property_data["state"]
    city_part, neighborhood_part = self.get_unique_property_city_neighborhood(
        property_data)

    type_loc_sent = f"The {property_type} property is in the state: {state}{city_part}{neighborhood_part}"

    price_bed_bath_sent = self.get_unique_property_price_bed_bath(property_data, property_type)

    style_sent = self.get_unique_property_style(property_data)

    school_sent = self.get_school(property_df)

    feature_sent = self.get_unique_property_features(property_data)

    description = f"{price_bed_bath_sent} {style_sent} {feature_sent} {school_sent}"
    return type_loc_sent, description

get_multi_property_type_loc(df_collection)

Make the location and property type sentence of multi property collection

Parameters:

Name Type Description Default
df_collection DataFrame

multi property collection in DataFrame format

required

Returns:

Name Type Description
type_loc_sent str

property type and location sentence of multi property collection

Source code in app/handlers.py
def get_multi_property_type_loc(self, df_collection):
    """
    Make the location and property type sentence of multi property collection
    Parameters
    ----------
    df_collection : DataFrame
        multi property collection in DataFrame format

    Returns
    -------
    type_loc_sent : str
        property type and location sentence of multi property collection

    """
    property_type = df_collection.property_type.unique()[0]
    state_part, city_part, neighborhood_part = self.get_locations_parts(
        df_collection)
    type_loc_sent = f"The {property_type} properties are in the {state_part}{city_part}" f"{neighborhood_part}"

    return type_loc_sent

get_multi_property_price(df_collection, prop_type) staticmethod

Make price sentence of multi property collection

Parameters:

Name Type Description Default
df_collection DataFrame

collection data in DataFrame format

required
prop_type str

property type of the collection

required

Returns:

Name Type Description
price_sent str

price sentence of multi property collection

Source code in app/handlers.py
@staticmethod
def get_multi_property_price(df_collection, prop_type):
    """
    Make price sentence of multi property collection
    Parameters
    ----------
    df_collection : DataFrame
        collection data in DataFrame format
    prop_type : str
        property type of the collection

    Returns
    -------
    price_sent : str
        price sentence of multi property collection
    """
    price_min = df_collection.price.min()
    price_max = df_collection.price.max()
    if price_min != price_max:
        price_range = f"{price_min} - {price_max}"
        price_sent = f"The {prop_type} are in price range {price_range}."
    else:
        price_sent = f"The {prop_type} cost {price_min}."
    return price_sent

get_multi_property_style(df_collection) staticmethod

Make style sentence of multi property collection

Parameters:

Name Type Description Default
df_collection DataFrame

collection data in DataFrame format

required

Returns:

Name Type Description
style_sent str

style sentence of multi property collection

Source code in app/handlers.py
@staticmethod
def get_multi_property_style(df_collection):
    """
    Make style sentence of multi property collection
    Parameters
    ----------
    df_collection : DataFrame
        collection data in DataFrame format

    Returns
    -------
    style_sent : str
        style sentence of multi property collection
    """
    homes_count = len(df_collection)
    common_styles_count = df_collection.exterior_style.value_counts()[
        df_collection.exterior_style.value_counts() > homes_count / 3]

    common_style = ", ".join(common_styles_count.keys())

    if common_style and (len(common_styles_count) > 1):
        style_sent = f"Most homes have {common_style} architectural styles. "
    elif common_style and (len(common_styles_count) == 1):
        style_sent = f"All homes have {common_style} architectural style. "
    else:
        style_sent = ""
    return style_sent

get_multi_property_features(df_collection) staticmethod

Make feature sentence of multi property collection

Parameters:

Name Type Description Default
df_collection DataFrame

collection data in DataFrame format

required

Returns:

Name Type Description
features_sent str

feature sentence of multi property collection

Source code in app/handlers.py
@staticmethod
def get_multi_property_features(df_collection):
    """
    Make feature sentence of multi property collection

    Parameters
    ----------
    df_collection : DataFrame
        collection data in DataFrame format

    Returns
    -------
    features_sent : str
        feature sentence of multi property collection
    """
    prop_count = len(df_collection)
    homes_features = df_collection.features.dropna()
    features_sent = ""
    if len(homes_features) > prop_count / 2:
        common_features_set = set.intersection(*map(set, homes_features))
        common_features = ', '.join(common_features_set)
        if common_features:
            features_sent = f"Most of them have {common_features}."
    return features_sent

get_multi_property_bed_bath(df_collection) staticmethod

Make bedroom and bathroom sentence of multi property collection

Parameters:

Name Type Description Default
df_collection DataFrame

collection data in DataFrame format

required

Returns:

Name Type Description
bed_bath_sent str

bedroom and bathroom sentence of multi property collection

Source code in app/handlers.py
@staticmethod
def get_multi_property_bed_bath(df_collection):
    """
    Make bedroom and bathroom sentence of multi property collection

    Parameters
    ----------
    df_collection : DataFrame
        collection data in DataFrame format

    Returns
    -------
    bed_bath_sent : str
        bedroom and bathroom sentence of multi property collection
    """
    bed_bath_sent = ""
    unique_bedrooms = df_collection.bedrooms.dropna().unique()
    if len(unique_bedrooms) > 0:
        bedrooms_min = unique_bedrooms.min()
        bedrooms_max = unique_bedrooms.max()
        if bedrooms_min != bedrooms_max:
            if bedrooms_min == 0 and bedrooms_max == 1:
                bedrooms_range = f"up to {bedrooms_max} bedroom"
            elif bedrooms_min == 0 and bedrooms_max > 1:
                bedrooms_range = f"up to {bedrooms_max} bedrooms"
            else:
                bedrooms_range = f"{bedrooms_min} - {bedrooms_max} bedrooms"
        else:
            if bedrooms_min == 1:
                bedrooms_range = f"{bedrooms_min} bedroom"
            else:
                bedrooms_range = f"{bedrooms_min} bedrooms"
        bed_bath_sent = f"Each home has {bedrooms_range}"
    unique_bathrooms = df_collection.bathrooms.dropna().unique()
    if len(unique_bathrooms) > 0:
        bathrooms_min = unique_bathrooms.min()
        bathrooms_max = unique_bathrooms.max()
        if bathrooms_min != bathrooms_max:
            if bathrooms_min == 0 and bathrooms_max == 1:
                bathrooms_range = f"up to {bathrooms_max} bathroom"
            elif bathrooms_min == 0 and bathrooms_max > 1:
                bathrooms_range = f"up to {bathrooms_max} bathrooms"
            else:
                bathrooms_range = f"{bathrooms_min} - {bathrooms_max} bathrooms"
        else:
            if bathrooms_min == 1:
                bathrooms_range = f"{bathrooms_min} bathroom"
            else:
                bathrooms_range = f"{bathrooms_min} bathrooms"
        if bed_bath_sent:
            bed_bath_sent += f", {bathrooms_range}"
        else:
            bed_bath_sent = f"Each home has {bathrooms_range}"
    if bed_bath_sent:
        bed_bath_sent += "."

    return bed_bath_sent

get_one_type_multi_property_information(df_collection)

Make the description of a property type multi property collection

Parameters:

Name Type Description Default
df_collection DataFrame

collection information in DataFrame format

required

Returns:

Name Type Description
description str

description of a property type multi property collection

Source code in app/handlers.py
def get_one_type_multi_property_information(self, df_collection):
    """
    Make the description of a property type multi property collection
    Parameters
    ----------
    df_collection : DataFrame
        collection information in DataFrame format

    Returns
    -------
    description : str
        description of a property type multi property collection
    """

    home_description = ""
    land_description = ""
    other_description = ""

    df_homes = df_collection[df_collection["home_type"].isin(HOMES)]
    if len(df_homes) == 1:
        home = df_homes.to_dict(orient='records')[0]
        price_bed_bath_sent = self.get_unique_property_price_bed_bath(
            home, home["home_type"])
        style_sent = self.get_unique_property_style(home)
        school_sent = self.get_school(df_homes)
        feature_sent = self.get_unique_property_features(
            home)
        home_description = f"{price_bed_bath_sent} {style_sent} {feature_sent} {school_sent}"
    elif len(df_homes) > 1:
        price_sent = self.get_multi_property_price(df_homes, "homes")
        bed_bath_sent = self.get_multi_property_bed_bath(df_homes)
        style_sent = self.get_multi_property_style(df_homes)
        feature_sent = self.get_multi_property_features(df_homes)
        school_sent = self.get_school(df_homes)
        home_description = f"{price_sent} {bed_bath_sent} {style_sent}{feature_sent} {school_sent}"

    df_lands = df_collection[df_collection["home_type"] == "Land"]
    if len(df_lands) == 1:
        land = df_lands.to_dict(orient='records')[0]
        price_sent = self.get_unique_property_price_bed_bath(land, land["home_type"])
        school_sent = self.get_school(df_lands)
        land_description = f"{price_sent} {school_sent}"
    elif len(df_lands) > 1:
        price_sent = self.get_multi_property_price(df_lands, "lands")
        school_sent = self.get_school(df_lands)
        land_description = f"{price_sent} {school_sent}"

    df_others = df_collection[df_collection["home_type"] == "Other"]
    if len(df_others) == 1:
        other = df_others.to_dict(orient='records')[0]
        price_sent = self.get_unique_property_price_bed_bath(
            other, "other property")
        school_sent = self.get_school(df_others)
        feature_sent = self.get_unique_property_features(
            other)
        other_description = f"{price_sent} {feature_sent} {school_sent}"
    elif len(df_others) > 1:
        price_sent = self.get_multi_property_price(df_others,
                                                   "others")
        feature_sent = self.get_multi_property_features(df_others)
        school_sent = self.get_school(df_others)
        other_description = f"{price_sent} {feature_sent} {school_sent}"

    description = f"{home_description} {land_description} {other_description}"
    return description

get_collection_prompt(df_collection)

Make ChatGPT prompt of mixed collection

Parameters:

Name Type Description Default
df_collection dataFrame

data of a mixed collection

required

Returns:

Name Type Description
prompt list

prompt of a mixed collection

collection_information str

description of mixed collection

Source code in app/handlers.py
def get_collection_prompt(self, df_collection):
    """
    Make ChatGPT prompt of mixed collection
    Parameters
    ----------
    df_collection : dataFrame
        data of a mixed collection

    Returns
    -------
    prompt : list
        prompt of a mixed collection
    collection_information : str
        description of mixed collection
    """

    df_sale = df_collection[df_collection["property_type"] == "sale"]
    df_rent = df_collection[df_collection["property_type"] == "rent"]
    sale_count = len(df_sale)
    rent_count = len(df_rent)
    sale_rent = []
    type_loc_sent_sale = ""
    sale_description = ""
    type_loc_sent_rent = ""
    rent_description = ""
    if sale_count == 1:
        sale_rent.append("1 property for sale")
        type_loc_sent_sale, sale_description = self.get_unique_property_information(df_sale)
    elif sale_count > 1:
        sale_rent.append(f"{sale_count} properties for sale")
        type_loc_sent_sale = self.get_multi_property_type_loc(
            df_sale)
        sale_description = self.get_one_type_multi_property_information(df_sale)

    if rent_count == 1:
        sale_rent.append("1 property for rent")
        type_loc_sent_rent, rent_description = self.get_unique_property_information(df_rent)
    elif rent_count > 1:
        sale_rent.append(f"{rent_count} properties for rent")
        type_loc_sent_rent = self.get_multi_property_type_loc(
            df_rent)
        rent_description = self.get_one_type_multi_property_information(
            df_rent)

    sale_rent_part = " and ".join(sale_rent)

    general_sent = f"The collection includes {sale_rent_part}."

    collection_information = f"{general_sent}/n{type_loc_sent_sale} {sale_description}/n{type_loc_sent_rent} " \
                             f"{rent_description}"
    prompt = [
        {"role": "system",
         "content": "You are an experienced real estate content creator."},
        {"role": "user",
         "content": f"Given the information below, generate 7-8 sentences describing the collection of properties. "
                    f"Do not use any fair housing act violation, including information about families, race, sex, "
                    f"religion, and other sensitive content. "
                    f"Do not mention certain group of people like families, couples, singles, individuals. "
                    f"Use the $ symbol instead of the word dollars. {collection_information}"}]
    return prompt, collection_information

generate_invitation(collection_data, client_name, agent_name)

Generate invitation text

Parameters:

Name Type Description Default
collection_data list

Properties data in collection

required
client_name str

Client name

required
agent_name str

Agent name

required

Returns:

Name Type Description
greeting str

Client greeting part of email

system_introduction str

Introduction of system

collection_information str

Collection information generated by ChatGPT

closing str

Closing part of email

full_text str

Email full text

status_code int

status code of ChatGPT call

message str

message of ChatGPT call

Source code in app/handlers.py
def generate_invitation(self, collection_data, client_name, agent_name):
    """
    Generate invitation text

    Parameters
    ----------
    collection_data : list
        Properties data in collection
    client_name : str
        Client name
    agent_name : str
        Agent name

    Returns
    -------
    greeting : str
        Client greeting part of email
    system_introduction : str
        Introduction of system
    collection_information : str
        Collection information generated by ChatGPT
    closing : str
        Closing part of email
    full_text : str
        Email full text
    status_code : int
        status code of ChatGPT call
    message : str
        message of ChatGPT call
    """

    collection_information = ""
    status_code = 200
    message = "OK"
    greeting, system_introduction, closing, full_text = self.generate_empty_collection_invitation(client_name,
                                                                                                  agent_name)
    if collection_data:
        collection_data = [
            {**item.dict(), 'home_type': PHYSICAL_PROPERTY_TYPE_TO_HOME_TYPE.get(item.physical_property_type),
             'physical_property_type': PHYSICAL_PROPERTY_TYPE_MAPPING.get(item.physical_property_type),
             'property_type': PROPERTY_TYPE_MAPPING.get(item.property_type),
             'exterior_style': item.exterior_style.name
             if item.exterior_style and item.exterior_style.probability > STYLE_PROBABILITY_LIMIT else None}
            for item in collection_data
        ]
        df_collection = pd.DataFrame(collection_data)
        property_type_count = len(df_collection["property_type"].unique())
        home_type_count = len(df_collection["home_type"].unique())

        if len(df_collection) == 1:
            prompt, collection_information = self.get_unique_property_prompt(df_collection)
        elif property_type_count == 1 and home_type_count == 1:
            home_type = df_collection["home_type"].unique()[0]
            if home_type in HOMES:
                prompt, collection_information = self.get_multi_home_collection_prompt(df_collection)
            else:
                prompt, collection_information = self.get_non_home_collection_prompt(df_collection)
        else:
            prompt, collection_information = self.get_collection_prompt(df_collection)

        if prompt:
            try:
                collection_information, status_code, message = func_timeout.func_timeout(
                    timeout=COLLECTION_INFORMATION_GENERATION_TIME, func=self.generate_text, args=(prompt,))
            except func_timeout.FunctionTimedOut:
                pass

            full_text = f"{greeting}\n{system_introduction}\n{collection_information}\n{closing}"
    collection_information = re.sub(r'\s+', ' ', collection_information)
    collection_information = collection_information.strip()
    return greeting, system_introduction, collection_information, closing, full_text, status_code, message

generate_location_based_text(location, words_count)

Generate text for the given location

Parameters:

Name Type Description Default
location str

a location in the USA

required
words_count int

the approximate number of words in generated text

required

Returns:

Name Type Description
text_title dict

generated text and title

status_code int

status_code of ChatGPT response

message str

message of ChatGPT response

Source code in app/handlers.py
def generate_location_based_text(self, location, words_count):
    """
    Generate text for the given location

    Parameters
    ----------
    location : str
        a location in the USA
    words_count : int
        the approximate number of words in generated text

    Returns
    -------
    text_title : dict
        generated text and title
    status_code : int
        status_code of ChatGPT response
    message : str
        message of ChatGPT response
    """
    prompt = [
        {"role": "system",
         "content": "You are an experienced real estate content creator."},
        {"role": "user",
         "content": f"Please, generate {words_count} words text about {location}."
                    f"Mention why it can be an attractive place to live. You can also use historical facts related "
                    f"to the location."
                    f"Title the generated text. Don't use any zip, neighborhood, city and state in "
                    f"title. Return your answer in json format with text and title keys."}]
    text_title, status_code, message = self.generate_text(prompt)
    if text_title:
        text_title = ast.literal_eval(text_title)
        text = text_title["text"]
        while len(text) > LOCATION_TEXT_MAX_CHAR_COUNT:
            sentences = [sent.text for sent in nlp(text).sents][:-1]
            text = ' '.join(sentences)
        text_title["text"] = text
    return text_title, status_code, message

generate_google_ad_copy(generation_params, postprocessing_params, home_data, important_features=None, unimportant_features=None, generated_adjectives=None, limited_data=False) async

Generate Google Ad copy for property advertisements.

Parameters:

Name Type Description Default
generation_params GenerationParams

Schema for text generation params

required
postprocessing_params PostprocessingParams

Schema for postprocessing params

required
home_data GoogleAdCopyHomeData

Schema of a property data.

required
important_features List

List of important features for specific property

None
unimportant_features List

List of unimportant features for specific property

None
generated_adjectives List

List of generated adjectives for specific property

None
limited_data bool

Flag weather the data is limited or not

False

Returns:

Name Type Description
out Union[GenerateGoogleAdCopyHeadlines, None]
Source code in app/handlers.py
async def generate_google_ad_copy(self, generation_params: GenerationParams,
                                  postprocessing_params: PostprocessingParams,
                                  home_data: GoogleAdCopyHomeData,
                                  important_features: List = None,
                                  unimportant_features: List = None,
                                  generated_adjectives: List = None,
                                  limited_data: bool = False) \
        -> Union[GenerateGoogleAdCopyHeadlines, None]:
    """
    Generate Google Ad copy for property advertisements.

    Parameters
    ----------
    generation_params: GenerationParams
        Schema for text generation params
    postprocessing_params: PostprocessingParams
        Schema for postprocessing params
    home_data: GoogleAdCopyHomeData
        Schema of a property data.
    important_features: List = None
        List of important features for specific property
    unimportant_features: List = None
        List of unimportant features for specific property
    generated_adjectives: List = None
        List of generated adjectives for specific property
    limited_data: bool = False
        Flag weather the data is limited or not

    Returns
    -------
    out : Union[GenerateGoogleAdCopyHeadlines, None]
    """
    mapped_subtype = SUBTYPE_MAPPING.get(home_data.subtype)
    mapped_property_type = PROPERTY_TYPE_MAPPING.get(home_data.property_type)
    walk_score_description = WALK_SCORE_DESCRIPTION_MAPPING.get(home_data.walk_score_description)
    bike_description = TRANSIT_DESCRIPTION_MAPPING.get(home_data.bike_description)
    transit_description = BIKE_DESCRIPTION_MAPPING.get(home_data.transit_description)
    if postprocessing_params.replace_popular_places:
        (home_data.state, home_data.city,
         home_data.neighborhood, home_data.remark) = await self.preprocess_data(state=home_data.state,
                                                                                city=home_data.city,
                                                                                neighborhood=home_data.neighborhood,
                                                                                remark=home_data.remark)
    retries = 0
    if home_data.subtype == 11:
        prompts = [short_headline_prompt_for_multi_family.format(property_type=mapped_property_type,
                                                                 state=home_data.state,
                                                                 city=home_data.city,
                                                                 neighborhood=home_data.neighborhood,
                                                                 important_features=important_features,
                                                                 unimportant_features=unimportant_features,
                                                                 remark=home_data.remark),
                   description_prompt_for_multi_family.format(subtype=mapped_subtype,
                                                              property_type=mapped_property_type,
                                                              state=home_data.state,
                                                              city=home_data.city,
                                                              neighborhood=home_data.neighborhood,
                                                              important_features=important_features,
                                                              unimportant_features=unimportant_features,
                                                              remark=home_data.remark),
                   long_headline_prompt_for_multi_family.format(subtype=mapped_subtype,
                                                                property_type=mapped_property_type,
                                                                state=home_data.state,
                                                                city=home_data.city,
                                                                neighborhood=home_data.neighborhood,
                                                                important_features=important_features,
                                                                unimportant_features=unimportant_features,
                                                                remark=home_data.remark,
                                                                walk_score_description=walk_score_description,
                                                                bike_description=bike_description,
                                                                transit_description=transit_description)]
    elif home_data.subtype == 9:
        if home_data.lot_size_area is None or home_data.lot_size_area == 0 or home_data.lot_size_area_unit is None:
            lot_size_area = None
        else:
            lot_size_area = f"{home_data.lot_size_area} {home_data.lot_size_area_unit}"
        prompts = [short_headline_prompt_for_land.format(subtype=mapped_subtype,
                                                         property_type=mapped_property_type,
                                                         state=home_data.state,
                                                         city=home_data.city,
                                                         neighborhood=home_data.neighborhood,
                                                         remark=home_data.remark),
                   description_prompt_for_land.format(subtype=mapped_subtype,
                                                      property_type=mapped_property_type,
                                                      state=home_data.state,
                                                      city=home_data.city,
                                                      neighborhood=home_data.neighborhood,
                                                      remark=home_data.remark,
                                                      lot_size_area=lot_size_area
                                                      ),
                   long_headline_prompt_for_land.format(subtype=mapped_subtype,
                                                        property_type=mapped_property_type,
                                                        state=home_data.state,
                                                        city=home_data.city,
                                                        neighborhood=home_data.neighborhood,
                                                        remark=home_data.remark,
                                                        walk_score_description=walk_score_description,
                                                        bike_description=bike_description,
                                                        transit_description=transit_description,
                                                        lot_size_area=lot_size_area)]
    elif home_data.state is None:
        prompts = [short_headline_without_state_prompt.format(subtype=mapped_subtype,
                                                              property_type=mapped_property_type,
                                                              city=home_data.city,
                                                              neighborhood=home_data.neighborhood,
                                                              important_features=important_features,
                                                              unimportant_features=unimportant_features,
                                                              generated_adjectives=generated_adjectives,
                                                              remark=home_data.remark),
                   description_without_state_prompt.format(subtype=mapped_subtype,
                                                           property_type=mapped_property_type,
                                                           city=home_data.city,
                                                           neighborhood=home_data.neighborhood,
                                                           important_features=important_features,
                                                           unimportant_features=unimportant_features,
                                                           generated_adjectives=generated_adjectives,
                                                           remark=home_data.remark),
                   long_headline_without_state_prompt.format(subtype=mapped_subtype,
                                                             property_type=mapped_property_type,
                                                             city=home_data.city,
                                                             neighborhood=home_data.neighborhood,
                                                             bedrooms=home_data.bedrooms,
                                                             bathrooms=home_data.bathrooms,
                                                             important_features=important_features,
                                                             unimportant_features=unimportant_features,
                                                             generated_adjectives=generated_adjectives,
                                                             remark=home_data.remark,
                                                             walk_score_description=walk_score_description,
                                                             bike_description=bike_description,
                                                             transit_description=transit_description)]

    elif not limited_data:
        prompts = [short_headline_prompt.format(subtype=mapped_subtype,
                                                property_type=mapped_property_type,
                                                state=home_data.state,
                                                city=home_data.city,
                                                neighborhood=home_data.neighborhood,
                                                important_features=important_features,
                                                unimportant_features=unimportant_features,
                                                generated_adjectives=generated_adjectives,
                                                remark=home_data.remark),
                   description_prompt.format(subtype=mapped_subtype,
                                             property_type=mapped_property_type,
                                             state=home_data.state,
                                             city=home_data.city,
                                             neighborhood=home_data.neighborhood,
                                             important_features=important_features,
                                             unimportant_features=unimportant_features,
                                             generated_adjectives=generated_adjectives,
                                             remark=home_data.remark),
                   long_headline_prompt.format(subtype=mapped_subtype,
                                               property_type=mapped_property_type,
                                               state=home_data.state,
                                               city=home_data.city,
                                               neighborhood=home_data.neighborhood,
                                               bedrooms=home_data.bedrooms,
                                               bathrooms=home_data.bathrooms,
                                               important_features=important_features,
                                               unimportant_features=unimportant_features,
                                               generated_adjectives=generated_adjectives,
                                               remark=home_data.remark,
                                               walk_score_description=walk_score_description,
                                               bike_description=bike_description,
                                               transit_description=transit_description)]
    else:
        prompts = [short_headline_limited_data_prompt.format(subtype=mapped_subtype,
                                                             property_type=mapped_property_type,
                                                             state=home_data.state,
                                                             city=home_data.city,
                                                             neighborhood=home_data.neighborhood,
                                                             generated_adjectives=generated_adjectives),
                   description_limited_data_prompt.format(subtype=mapped_subtype,
                                                          property_type=mapped_property_type,
                                                          state=home_data.state,
                                                          city=home_data.city,
                                                          neighborhood=home_data.neighborhood,
                                                          generated_adjectives=generated_adjectives),
                   long_headline_limited_data_prompt.format(subtype=mapped_subtype,
                                                            property_type=mapped_property_type,
                                                            state=home_data.state,
                                                            city=home_data.city,
                                                            neighborhood=home_data.neighborhood,
                                                            generated_adjectives=generated_adjectives)]
    while retries < generation_params.max_retries:
        tasks = [self.async_generate_text(system_message=headline_generation_system_message, prompt=prompt,
                                          temperature=generation_params.temperature,
                                          timeout=15) for prompt in prompts]
        responses = await asyncio.gather(*tasks)
        if all(responses):
            processing_tasks = []
            for response, num_of_chars in zip(responses, CHARACTER_LIMITS):
                processing_task = self.process_responses(response=response, num_of_chars=num_of_chars,
                                                         property_type=mapped_property_type,
                                                         postprocessing_params=postprocessing_params)
                processing_tasks.append(processing_task)

            processed_responses = await asyncio.gather(*processing_tasks)
            if all(processed_responses):
                final_headlines = \
                    {key: value for dictionary in processed_responses for key, value in dictionary.items()}
            else:
                retries += 1
                continue

            try:
                headlines = GenerateGoogleAdCopyHeadlines(**final_headlines)
                return headlines
            except Exception:
                retries += 1
                continue
        else:
            retries += 1
            continue
    raise GoogleAdCopyGenerationError('Maximum number of retries exceeded')

process_responses(response, num_of_chars, property_type, postprocessing_params) async

Process the responses received from a service.

Parameters:

Name Type Description Default
response str

The response received from the service.

required
num_of_chars int

Maximum number of characters

required
property_type str

The type of property for processing.

required
postprocessing_params PostprocessingParams

Schema for postprocessing params

required

Returns:

Name Type Description
Returns Union[Dict, None]
Source code in app/handlers.py
async def process_responses(self, response: str, num_of_chars: int, property_type: str,
                            postprocessing_params: PostprocessingParams) -> Union[Dict, None]:
    """
    Process the responses received from a service.

    Parameters
    ----------
    response : str
        The response received from the service.
    num_of_chars: int
        Maximum number of characters
    property_type : str
        The type of property for processing.
    postprocessing_params: PostprocessingParams
        Schema for postprocessing params

    Returns
    -------
    Returns : Union[Dict, None]
    """
    try:
        response_dict = json.loads(response)
    except Exception:
        return None

    tasks = []
    for key, headlines in response_dict.items():
        task = self.postprocess_headlines(headlines=headlines, num_of_chars=num_of_chars,
                                          property_type=property_type,
                                          postprocessing_params=postprocessing_params)
        tasks.append(task)

    processed_headlines = await asyncio.gather(*tasks)
    return dict(zip(response_dict.keys(), processed_headlines))

preprocess_data(state, city, neighborhood, remark, popular_places=POPULAR_PLACES, popular_places_states_mapping=POPULAR_PLACES_STATES_MAPPING) async staticmethod

Preprocess data (specifically location and remark) to go to the prompt for generating headlines

Parameters:

Name Type Description Default
state str

State of the property

required
city Union[str, None]

City of the property

required
neighborhood Union[str, None]

Neighborhood of the property

required
remark str

The remark of the property

required
popular_places List[str]

Popular places which do not need state

POPULAR_PLACES
popular_places_states_mapping Dict[str, str]

Mapping that shows which popular place is in which state

POPULAR_PLACES_STATES_MAPPING

Returns:

Name Type Description
out Tuple[Union[str, None], Union[str, None], Union[str, None], Union[str, None]]

A tuple containing preprocessed state, city, neighborhood and remark values

Source code in app/handlers.py
@staticmethod
async def preprocess_data(state: str, city: Union[str, None],
                          neighborhood: Union[str, None],
                          remark: str, popular_places: List[str] = POPULAR_PLACES,
                          popular_places_states_mapping: Dict[str, str] = POPULAR_PLACES_STATES_MAPPING) -> (
        Tuple)[Union[str, None], Union[str, None], Union[str, None], Union[str, None]]:
    """
    Preprocess data (specifically location and remark) to go to the prompt for generating headlines

    Parameters
    ----------
    state : str
        State of the property
    city : Union[str, None]
        City of the property
    neighborhood : Union[str, None]
        Neighborhood of the property
    remark : str
        The remark of the property
    popular_places: List[str] = POPULAR_PLACES
        Popular places which do not need state
    popular_places_states_mapping: Dict[str, str] = POPULAR_PLACES_STATES_MAPPING
        Mapping that shows which popular place is in which state

    Returns
    -------
    out : Tuple[Union[str, None], Union[str, None], Union[str, None], Union[str, None]]
        A tuple containing preprocessed state, city, neighborhood and remark values
    """

    if neighborhood is not None and neighborhood.lower() in map(str.lower, popular_places):
        if remark is not None:
            state_full_name = popular_places_states_mapping.get(neighborhood.lower())
            remark = re.sub(r'\b' + re.escape(state_full_name) + r'\b', '', remark, flags=re.IGNORECASE)
            remark = re.sub(r'\b' + re.escape(state) + r'\b', '', remark, flags=re.IGNORECASE)
        if city and remark is not None:
            remark = re.sub(r'\b' + re.escape(city) + r'\b', '', remark, flags=re.IGNORECASE)
        return None, None, neighborhood, remark

    elif city is not None and city.lower() in map(str.lower, popular_places):
        if remark is not None:
            state_full_name = popular_places_states_mapping.get(city.lower())
            remark = re.sub(r'\b' + re.escape(state_full_name) + r'\b', '', remark, flags=re.IGNORECASE)
            remark = re.sub(r'\b' + re.escape(state) + r'\b', '', remark, flags=re.IGNORECASE)
        if neighborhood and remark is not None:
            remark = re.sub(r'\b' + re.escape(neighborhood) + r'\b', '', remark, flags=re.IGNORECASE)
        return None, city, None, remark

    return state, city, neighborhood, remark

postprocess_headlines(headlines, num_of_chars, property_type, postprocessing_params) async

Postprocessing of headlines

Parameters:

Name Type Description Default
headlines List[str]

Headlines that need to post-processed or not.

required
num_of_chars int

Maximum number of characters

required
property_type str

The Property Type

required
postprocessing_params PostprocessingParams

Schema for postprocessing params

required

Returns:

Name Type Description
out Tuple
Source code in app/handlers.py
async def postprocess_headlines(self, headlines: List[str], num_of_chars: int,
                                property_type: str, postprocessing_params: PostprocessingParams) -> Tuple:
    """
    Postprocessing of headlines

    Parameters
    ----------
    headlines : List[str]
        Headlines that need to post-processed or not.
    num_of_chars: int
        Maximum number of characters
    property_type : str
        The Property Type
    postprocessing_params: PostprocessingParams
        Schema for postprocessing params

    Returns
    -------
    out : Tuple
    """
    tasks = []
    for headline in headlines:
        task = self.postprocess_single_headline(headline=headline,
                                                num_of_chars=num_of_chars,
                                                property_type=property_type,
                                                exclude_symbols=postprocessing_params.exclude_symbols,
                                                paraphrase_for_char_limit=postprocessing_params.paraphrase_for_char_limit,
                                                ensure_title_case=postprocessing_params.ensure_title_case,
                                                replace_ba_br=postprocessing_params.replace_ba_br,
                                                replace_sf=postprocessing_params.replace_sf)
        tasks.append(task)

    post_processed_headlines = await asyncio.gather(*tasks)
    return post_processed_headlines

postprocess_single_headline(headline, num_of_chars, property_type, exclude_symbols, paraphrase_for_char_limit, ensure_title_case, replace_ba_br, replace_sf) async

Post process single headline

Parameters:

Name Type Description Default
headline str

input headline

required
num_of_chars int

Maximum number of characters

required
property_type str

identifier of property type

required
exclude_symbols bool

Boolean indicating weather to replace/exclude symbols like ?! or not.

required
paraphrase_for_char_limit bool

Boolean indicating weather to paraphrase sentences depending on char limit or not.

required
ensure_title_case bool

True for returning titlecase

required
replace_ba_br bool

True to replace bathrooms with BA, bedrooms with BR

required
replace_sf bool

True to replace bathrooms square feet

required

Returns:

Name Type Description
headline str

result

Source code in app/handlers.py
async def postprocess_single_headline(self, headline: str, num_of_chars: int, property_type: str,
                                      exclude_symbols: bool,
                                      paraphrase_for_char_limit: bool, ensure_title_case: bool,
                                      replace_ba_br: bool, replace_sf: bool) -> str:
    """
    Post process single headline

    Parameters
    ----------
    headline : str
        input headline
    num_of_chars: int
        Maximum number of characters
    property_type : str
        identifier of property type
    exclude_symbols: bool
        Boolean indicating weather to replace/exclude symbols like ?! or not.
    paraphrase_for_char_limit: bool
        Boolean indicating weather to paraphrase sentences depending on char limit or not.
    ensure_title_case : bool
        True for returning titlecase
    replace_ba_br : bool
        True to replace bathrooms with BA, bedrooms with BR
    replace_sf : bool
        True to replace bathrooms square feet

    Returns
    -------
    headline : str
        result
    """
    if paraphrase_for_char_limit:
        headline = await self.paraphrase_char_limit_exceeded_sentences(num_of_chars=num_of_chars,
                                                                       headline=headline,
                                                                       max_recursion_depth=2)

    headline = await self.paraphrase_violations(headline=headline)

    headline = await self.validate_property_type(headline=headline, property_type=property_type)

    if exclude_symbols:
        headline = await self.exclude_symbols(headline=headline)

    if ensure_title_case:
        headline = await self.headline_capitalize(sentence=headline)
    if replace_ba_br:
        headline = await self.replace_with_ba_br(text=headline)
    if replace_sf:
        headline = await self.replace_with_sf(text=headline)
    return headline

paraphrase_char_limit_exceeded_sentences(num_of_chars, headline, max_recursion_depth=2) async

Recursively paraphrase sentences where character limit is exceeded.

Parameters:

Name Type Description Default
num_of_chars int

Maximum number of characters

required
headline str

input headline

required
max_recursion_depth Optional[int] = 5

maximum recursion depth

2

Returns:

Name Type Description
headline str

result

Source code in app/handlers.py
async def paraphrase_char_limit_exceeded_sentences(self, num_of_chars: int, headline: str,
                                                   max_recursion_depth: Optional[int] = 2) -> str:
    """
    Recursively paraphrase sentences where character limit is exceeded.

    Parameters
    ----------
    num_of_chars: int
        Maximum number of characters
    headline : str
        input headline
    max_recursion_depth : Optional[int] = 5
        maximum recursion depth

    Returns
    -------
    headline : str
        result
    """
    if max_recursion_depth <= 0:
        return headline
    if len(headline) > num_of_chars:
        response = await self.async_generate_text(
            system_message=chars_paraphrasing_system_message,
            prompt=chars_paraphrasing_prompt.format(num_of_chars=num_of_chars,
                                                    sentence=headline),
            temperature=0.5,
            timeout=15)
        try:
            paraphrased_headline_dict = json.loads(response)
            headline = paraphrased_headline_dict["paraphrased_sentence"]
            headline = await self.paraphrase_char_limit_exceeded_sentences(num_of_chars=num_of_chars,
                                                                           headline=headline,
                                                                           max_recursion_depth=
                                                                           max_recursion_depth - 1)
        except Exception:
            raise FailedToParaphraseViolations(
                'Failed to appropriately paraphrase headlines that exceed character limit.')
    return headline

paraphrase_violations(headline, max_recursion_depth=2) async

Recursively paraphrase violations if there are any

Parameters:

Name Type Description Default
headline str

input headline

required
max_recursion_depth Optional[int] = 5

maximum recursion depth

2

Returns:

Name Type Description
headline str

result

Source code in app/handlers.py
async def paraphrase_violations(self, headline: str,
                                max_recursion_depth: Optional[int] = 2) -> str:
    """
    Recursively paraphrase violations if there are any

    Parameters
    ----------
    headline : str
        input headline
    max_recursion_depth : Optional[int] = 5
        maximum recursion depth

    Returns
    -------
    headline : str
        result
    """
    if max_recursion_depth <= 0:
        return headline

    status_of_fhv, fhv_violation = await self.validate_on_fhv(headline)
    if status_of_fhv:
        response = await self.async_generate_text(
            system_message=violation_paraphrasing_system_message,
            prompt=violation_paraphrasing_prompt.format(sentence=headline, fhv_violation=fhv_violation),
            temperature=0.5,
            timeout=15)
        try:
            paraphrased_headline_dict = json.loads(response)
            headline = paraphrased_headline_dict["paraphrased_sentence"]
            headline = await self.paraphrase_violations(headline=headline,
                                                        max_recursion_depth=max_recursion_depth - 1)
        except Exception:
            raise FailedToParaphraseViolations('Failed to appropriately paraphrase violating headlines.')
    return headline

validate_on_fhv(headline) async staticmethod

Passes a sentence to Fair Housing Validator and returns a flag

Parameters:

Name Type Description Default
headline str

Sentence that needs to go through Fair Housing Validator

required

Returns:

Name Type Description
out Tuple[bool, Union[str, None]]

If no violation returns False, otherwise True

Source code in app/handlers.py
@staticmethod
async def validate_on_fhv(headline: str) -> Tuple[bool, Union[str, None]]:
    """
    Passes a sentence to Fair Housing Validator and returns a flag

    Parameters
    ----------
    headline : str
        Sentence that needs to go through Fair Housing Validator

    Returns
    -------
    out : Tuple[bool, Union[str, None]]
        If no violation returns False, otherwise True
    """
    try:
        async with httpx.AsyncClient() as client:
            response_fhv = await client.post(url=secrets.FHV_PREDICT_ENDPOINT,
                                             json={"text": headline.lower(), "top_k": 1},
                                             timeout=10)
            response_fhv.raise_for_status()
        fhv_json = response_fhv.json()["sentence_predictions"]
        fhv_labels_lst = [fhv_json[i]["class_probs"] for i in range(len(fhv_json))]

    except Exception:
        raise FHValidationError('Fair Housing Validator Failed to Validate')

    for response in fhv_labels_lst:
        if list(response.keys())[0] != "no_violation":
            return True, list(response.keys())[0]
    return False, None

validate_property_type(headline, property_type) async staticmethod

Function to validate sentences for property types (sale, rent)

Parameters:

Name Type Description Default
headline str

Sentence to be validated for property type

required
property_type str

The property type

required

Returns:

Name Type Description
out str
Source code in app/handlers.py
@staticmethod
async def validate_property_type(headline: str, property_type: str) -> str:
    """
    Function to validate sentences for property types (sale, rent)

    Parameters
    ----------
    headline : str
        Sentence to be validated for property type
    property_type : str
        The property type

    Returns
    -------
    out : str
    """
    if property_type == "sale":
        modified_sentence = re.sub(pattern=r'\b(rent|lease)\b', repl='sale', string=headline, flags=re.IGNORECASE)
        modified_sentence = re.sub(pattern=r'\brental\b', repl='residential', string=modified_sentence,
                                   flags=re.IGNORECASE)
        return modified_sentence
    elif property_type == "rent":
        modified_sentence = re.sub(pattern=r'\b(sale|buy)\b', repl='rent', string=headline, flags=re.IGNORECASE)
        modified_sentence = re.sub(pattern=r'\bresidential\b', repl='rental', string=modified_sentence,
                                   flags=re.IGNORECASE)
        return modified_sentence

exclude_symbols(headline) async staticmethod

Modify a headline by replacing certain punctuation, removing specific characters, normalizing whitespace, and converting to lowercase.

Parameters:

Name Type Description Default
headline str

The input headline to be modified.

required

Returns:

Name Type Description
out str

The modified headline.

Source code in app/handlers.py
@staticmethod
async def exclude_symbols(headline: str) -> str:
    """
    Modify a headline by replacing certain punctuation, removing specific
    characters, normalizing whitespace, and converting to lowercase.

    Parameters
    ----------
    headline : str
        The input headline to be modified.

    Returns
    -------
    out : str
        The modified headline.
    """
    modified_headline = headline.replace('!', '.').replace('?', '.')

    modified_headline = re.sub(r'\b - \b|\b -\b|\b- \b', ' ', modified_headline)

    characters_to_remove = ['@', '#', '^', '*']
    for char in characters_to_remove:
        modified_headline = modified_headline.replace(char, '')

    modified_headline = re.sub(r'\s+', ' ', modified_headline).strip().lower()

    return modified_headline

headline_capitalize(sentence, lower_case_exceptions=LOWER_CASE_EXCEPTIONS, title_case_exceptions=TITLE_CASE_EXCEPTIONS) async staticmethod

Capitalize the headline-style sentence, excluding specified exceptions.

Parameters:

Name Type Description Default
sentence str

The headline-style sentence to be capitalized.

required
lower_case_exceptions Optional[List[str]] = TITLE_CASE_EXCEPTIONS

A list of lower case exceptions

LOWER_CASE_EXCEPTIONS
title_case_exceptions Optional[List[str]] = TITLE_CASE_EXCEPTIONS

A list of title case exceptions

TITLE_CASE_EXCEPTIONS

Returns:

Name Type Description
out str
Source code in app/handlers.py
@staticmethod
async def headline_capitalize(sentence: str,
                              lower_case_exceptions: Optional[List[str]] = LOWER_CASE_EXCEPTIONS,
                              title_case_exceptions: Optional[List[str]] = TITLE_CASE_EXCEPTIONS) -> str:
    """
    Capitalize the headline-style sentence, excluding specified exceptions.

    Parameters
    ----------
    sentence : str
        The headline-style sentence to be capitalized.
    lower_case_exceptions : Optional[List[str]] = TITLE_CASE_EXCEPTIONS
        A list of lower case exceptions
    title_case_exceptions : Optional[List[str]] = TITLE_CASE_EXCEPTIONS
        A list of title case exceptions

    Returns
    -------
    out : str
    """
    title_case_words = [word.capitalize() for word in sentence.split(' ')]

    text = ' '.join(title_case_words)

    for exception in lower_case_exceptions:
        pattern = re.compile(r'\b(' + re.escape(exception) + r')\b', flags=re.IGNORECASE)
        text = pattern.sub(exception, text)

    text = text.replace("W/", "w/")
    text = re.sub(r'\bmust see\b', 'Must-see', text, flags=re.IGNORECASE)

    words = text.strip().split(' ', 1)

    if len(words) > 0:
        words[0] = words[0].capitalize()
    text = ' '.join(words)

    for exception in title_case_exceptions:
        pattern = re.compile(r'\b(' + re.escape(exception) + r')\b', flags=re.IGNORECASE)
        text = pattern.sub(exception, text)

    sentences = text.split('. ')
    for i in range(1, len(sentences)):
        if not sentences[i - 1].strip().endswith(('sf', 'sqft')):
            sentences[i] = sentences[i][0].upper() + sentences[i][1:]
    text = '. '.join(sentences)

    sentences = text.split('/')
    for i in range(1, len(sentences)):
        sentences[i] = sentences[i][0].upper() + sentences[i][1:]
    text = '/'.join(sentences)

    if text.endswith('.'):
        text = text[:-1]

    return text

replace_with_ba_br(text) async staticmethod

Replace occurrences of bathroom, bedroom in the input text with standardized abbreviations.

Parameters:

Name Type Description Default
text str

The input text to be processed.

required

Returns:

Name Type Description
out str
Source code in app/handlers.py
@staticmethod
async def replace_with_ba_br(text: str) -> str:
    """
    Replace occurrences of bathroom, bedroom in the input text with standardized abbreviations.

    Parameters
    ----------
    text : str
        The input text to be processed.

    Returns
    -------
    out : str
    """
    bedroom_pattern = (
        re.compile(pattern=r'\b(\d+)\s*-?\s*(bed[-\s]*rooms?|brs?|beds?|bdrms?|bds?)\b', flags=re.IGNORECASE))
    text = bedroom_pattern.sub(repl=r'\1BR', string=text)

    bathroom_pattern = (
        re.compile(pattern=r'\b(\d+)\s*-?\s*(bath[-\s]*rooms?|bas?|baths?|bthrms?)\b', flags=re.IGNORECASE))
    text = bathroom_pattern.sub(repl=r'\1BA', string=text)

    comma_pattern = (
        re.compile(pattern=r'\b(\d*)(BA|BR)\s*,\s*(\d*)\s*(BA|BR)\b', flags=re.IGNORECASE))
    text = comma_pattern.sub(replace_pattern, text)

    slash_pattern = (
        re.compile(pattern=r'\b(\d*)(BA|BR)\s*/\s*(\d*)\s*(BA|BR)\b', flags=re.IGNORECASE))
    text = slash_pattern.sub(replace_pattern, text)

    space_pattern = (
        re.compile(pattern=r'\b(\d*)(BA|BR)\s*(\d*)\s*(BA|BR)\b', flags=re.IGNORECASE))
    text = space_pattern.sub(replace_pattern, text)

    text = re.sub(r'(\d)(BA|BR)', r'\1 \2', text)

    return text

replace_with_sf(text) async staticmethod

Replace occurrences of square feet in the input text with standardized abbreviations.

Parameters:

Name Type Description Default
text str

The input text to be processed.

required

Returns:

Name Type Description
out str
Source code in app/handlers.py
@staticmethod
async def replace_with_sf(text: str) -> str:
    """
    Replace occurrences of square feet in the input text with standardized abbreviations.

    Parameters
    ----------
    text : str
        The input text to be processed.

    Returns
    -------
    out : str
    """
    square_feet_pattern_with_digit = re.compile(
        pattern=r'\b(\d+)\s*-?\s*(square[-\s]*feet|square[-\s]*foot|sf|sqft|sq[-\s]*ft)\b',
        flags=re.IGNORECASE)
    text = square_feet_pattern_with_digit.sub(r'\1sf.', text)

    square_feet_pattern = re.compile(
        pattern=r'\b(square[-\s]*feet|square[-\s]*foot|sf|sqft|sq[-\s]*ft)\b',
        flags=re.IGNORECASE)
    text = square_feet_pattern.sub(r'sf.', text)

    square_feet_with_dot_pattern = re.compile(
        pattern=r'\b(\d+)\s*-?\s*(sq\.\s*ft\.)',
        flags=re.IGNORECASE)
    text = square_feet_with_dot_pattern.sub(r'\1sf.', text)

    text = text.replace("..", ".")

    return text